// -*- mode: c++; c-file-style: "k&r"; c-basic-offset: 4 -*-
// vim: set ts=4 sw=4:
+// vim: set ts=4 sw=4:
*
+ * spanstore/client.cc:
+ *   SpanStore client implementation.
+ *
+ **********************************************************************/
+#include "spanstore/client.h"
+using namespace std;
+namespace spanstore {
+Client::Client(Mode mode, string configPath, int nShards,
+                int closestReplica, TrueTime timeServer)
+    : transport(0.0, 0.0, 0), mode(mode), timeServer(timeServer)
+    // Initialize all state here;
+    client_id = 0;
+    while (client_id == 0) {
+        random_device rd;
+        mt19937_64 gen(rd());
+        uniform_int_distribution<uint64_t> dis;
+        client_id = dis(gen);
+    }
+    t_id = (client_id/10000)*10000;
+    nshards = nShards;
+    bclient.reserve(nshards);
+    Debug("Initializing SpanStore client with id [%lu]", client_id);
+    /* Start a client for time stamp server. */
+    if (mode == MODE_OCC) {
+        string tssConfigPath = configPath + ".tss.config";
+        ifstream tssConfigStream(tssConfigPath);
+        if (tssConfigStream.fail()) {
+            fprintf(stderr, "unable to read configuration file: %s\n",
+                    tssConfigPath.c_str());
+        }
+        specpaxos::Configuration tssConfig(tssConfigStream);
+        tss = new specpaxos::vr::VRClient(tssConfig, &transport);
+    }
+    /* Start a client for each shard. */
+    for (int i = 0; i < nShards; i++) {
+        string shardConfigPath = configPath + to_string(i) + ".config";
+        SpanClient *spanclient = new SpanClient(mode, shardConfigPath,
+            &transport, client_id, i, closestReplica);
+        bclient[i] = new BufferClient(spanclient);
+    }
+    /* Run the transport in a new thread. */
+    clientTransport = new thread(&Client::run_client, this);
+    Debug("SpanStore client [%lu] created!", client_id);
+    transport.Stop();
+    delete tss;
+    for (auto b : bclient) {
+        delete b;
+    }
+    clientTransport->join();
+/* Runs the transport event loop. */
+    transport.Run();
+/* Begins a transaction. All subsequent operations before a commit() or
+ * abort() are part of this transaction.
+ *
+ * Return a TID for the transaction.
+ */
+    Debug("BEGIN Transaction");
+    t_id++;
+    participants.clear();
+    commit_sleep = -1;
+    for (int i = 0; i < nshards; i++) {
+        bclient[i]->Begin(t_id);
+    }
+/* Returns the value corresponding to the supplied key. */
+Client::Get(const string &key, string &value)
+    // Contact the appropriate shard to get the value.
+    int i = key_to_shard(key, nshards);
+    // If needed, add this shard to set of participants and send BEGIN.
+    if (participants.find(i) == participants.end()) {
+        participants.insert(i);
+    }
+    // Send the GET operation to appropriate shard.
+    Promise promise(GET_TIMEOUT);
+    bclient[i]->Get(key, &promise);
+    value = promise.GetValue();
+    return promise.GetReply();
+/* Sets the value corresponding to the supplied key. */
+Client::Put(const string &key, const string &value)
+    // Contact the appropriate shard to set the value.
+    int i = key_to_shard(key, nshards);
+    // If needed, add this shard to set of participants and send BEGIN.
+    if (participants.find(i) == participants.end()) {
+        participants.insert(i);
+    }
+    Promise promise(PUT_TIMEOUT);
+    // Buffering, so no need to wait.
+    bclient[i]->Put(key, value, &promise);
+    return promise.GetReply();
+Client::Prepare(uint64_t &ts)
+    int status;
+    // 1. Send commit-prepare to all shards.
+    Debug("PREPARE Transaction");
+    list<Promise *> promises;
+    for (auto p : participants) {
+        Debug("Sending prepare to shard [%d]", p);
+        promises.push_back(new Promise(PREPARE_TIMEOUT));
+        bclient[p]->Prepare(Timestamp(),promises.back());
+    }
+    // In the meantime ... go get a timestamp for OCC
+    if (mode == MODE_OCC) {
+        // Have to go to timestamp server
+        unique_lock<mutex> lk(cv_m);
+        Debug("Sending request to TimeStampServer");
+        tss->Invoke("", bind(&Client::tssCallback, this,
+                             placeholders::_1,
+                             placeholders::_2));
+        Debug("Waiting for TSS reply");
+        cv.wait(lk);
+        ts = stol(replica_reply, NULL, 10);
+        Debug("TSS reply received: %lu", ts);
+    }
+    // 2. Wait for reply from all shards. (abort on timeout)
+    Debug("Waiting for PREPARE replies");
+    status = REPLY_OK;
+    for (auto p : promises) {
+        // If any shard returned false, abort the transaction.
+        if (p->GetReply() != REPLY_OK) {
+            if (status != REPLY_FAIL) {
+                status = p->GetReply();
+            }
+        }
+        // Also, find the max of all prepare timestamp returned.
+        if (p->GetTimestamp().getTimestamp() > ts) {
+            ts = p->GetTimestamp().getTimestamp();
+        }
+        delete p;
+    }
+    return status;
+/* Attempts to commit the ongoing transaction. */
+    // Implementing 2 Phase Commit
+    uint64_t ts = 0;
+    int status;
+    for (int i = 0; i < COMMIT_RETRIES; i++) {
+        status = Prepare(ts);
+        if (status == REPLY_OK || status == REPLY_FAIL) {
+            break;
+        }
+    }
+    if (status == REPLY_OK) {
+        // For Spanner like systems, calculate timestamp.
+        if (mode == MODE_SPAN_OCC || mode == MODE_SPAN_LOCK) {
+            uint64_t now, err;
+            struct timeval t1, t2;
+            gettimeofday(&t1, NULL);
+            timeServer.GetTimeAndError(now, err);
+            if (now > ts) {
+                ts = now;
+            } else {
+                uint64_t diff = ((ts >> 32) - (now >> 32))*1000000 +
+                        ((ts & 0xffffffff) - (now & 0xffffffff));
+                err += diff;
+            }
+            commit_sleep = (int)err;
+            // how good are we at waking up on time?
+            Debug("Commit wait sleep: %lu", err);
+            if (err > 1000000)
+                Warning("Sleeping for too long! %lu; now,ts: %lu,%lu", err, now, ts);
+            if (err > 150) {
+                usleep(err-150);
+            }
+            // fine grained busy-wait
+            while (1) {
+                gettimeofday(&t2, NULL);
+                if ((t2.tv_sec-t1.tv_sec)*1000000 +
+                    (t2.tv_usec-t1.tv_usec) > (int64_t)err) {
+                    break;
+                }
+            }
+        }
+        // Send commits
+        Debug("COMMIT Transaction at [%lu]", ts);
+        for (auto p : participants) {
+            Debug("Sending commit to shard [%d]", p);
+            bclient[p]->Commit(ts);
+        }
+        return true;
+    }
+    // 4. If not, send abort to all shards.
+    Abort();
+    return false;
+/* Aborts the ongoing transaction. */
+    Debug("ABORT Transaction");
+    for (auto p : participants) {
+        bclient[p]->Abort();
+    }
+/* Return statistics of most recent transaction. */
+    vector<int> v;
+    return v;
+/* Callback from a tss replica upon any request. */
+Client::tssCallback(const string &request, const string &reply)
+    lock_guard<mutex> lock(cv_m);
+    Debug("Received TSS callback [%s]", reply.c_str());
+    // Copy reply to "replica_reply".
+    replica_reply = reply;
+    // Wake up thread waiting for the reply.
+    cv.notify_all();
+} // namespace spanstore
// -*- mode: c++; c-file-style: "k&r"; c-basic-offset: 4 -*-
// vim: set ts=4 sw=4:
+// vim: set ts=4 sw=4:
*
+ * spanstore/client.h:
+ *   SpanStore client interface.
+ *
+ **********************************************************************/
+#ifndef _SPAN_CLIENT_H_
+#define _SPAN_CLIENT_H_
+#include "paxos-lib/lib/assert.h"
+#include "paxos-lib/lib/message.h"
+#include "paxos-lib/lib/configuration.h"
+#include "paxos-lib/lib/udptransport.h"
+#include "paxos-lib/common/client.h"
+#include "paxos-lib/vr/client.h"
+#include "common/client.h"
+#include "common/bufferclient.h"
+#include "common/truetime.h"
+#include "common/txnstore.h"
+#include "spanstore/spanclient.h"
+#include "spanstore/span-proto.pb.h"
+#include <condition_variable>
+#include <mutex>
+#include <string>
+#include <set>
+#include <thread>
+namespace spanstore {
+class Client : public ::Client
+    Client(Mode mode, string configPath, int nshards,
+            int closestReplica, TrueTime timeServer);
+    ~Client();
+    // Overriding functions from ::Client
+    void Begin();
+    int Get(const string &key, string &value);
+    int Put(const string &key, const string &value);
+    bool Commit();
+    void Abort();
+    std::vector<int> Stats();
+    /* Private helper functions. */
+    void run_client(); // Runs the transport event loop.
+    // timestamp server call back
+    void tssCallback(const string &request, const string &reply);
+    // local Prepare function
+    int Prepare(uint64_t &ts);
+    // Unique ID for this client.
+    uint64_t client_id;
+    // Ongoing transaction ID.
+    uint64_t t_id;
+    // Number of shards in SpanStore.
+    long nshards;
+    // List of participants in the ongoing transaction.
+    std::set<int> participants;
+    // Transport used by paxos client proxies.
+    UDPTransport transport;
+    // Thread running the transport event loop.
+    std::thread *clientTransport;
+    // Buffering client for each shard.
+    std::vector<BufferClient *> bclient;
+    // Mode in which spanstore runs.
+    Mode mode;
+    // Timestamp server shard.
+    specpaxos::Client *tss; 
+    // TrueTime server.
+    TrueTime timeServer;
+    // Synchronization variables.
+    std::condition_variable cv;
+    std::mutex cv_m;
+    string replica_reply;
+    // Time spend sleeping for commit.
+    int commit_sleep;
+} // namespace spanstore
+#endif /* _SPAN_CLIENT_H_ */
// -*- mode: c++; c-file-style: "k&r"; c-basic-offset: 4 -*-
// vim: set ts=4 sw=4:
+// vim: set ts=4 sw=4:
*
+ * spanstore/server.cc:
+ *   Implementation of a single SpanStore server.
+ *
+ **********************************************************************/
+#include "spanstore/server.h"
+using namespace std;
+namespace spanstore {
+using namespace proto;
+Server::Server(Mode mode, uint64_t skew, uint64_t error) : mode(mode)
+    timeServer = TrueTime(skew, error);
+    switch (mode) {
+    case MODE_LOCK:
+    case MODE_SPAN_LOCK:
+        store = new spanstore::LockStore();
+        break;
+    case MODE_OCC:
+    case MODE_SPAN_OCC:
+        store = new spanstore::OCCStore();
+        break;
+    default:
+        NOT_REACHABLE();
+    }
+    delete store;
+Server::LeaderUpcall(opnum_t opnum, const string &str1, bool &replicate, string &str2)
+    Debug("Received LeaderUpcall: %lu %s", opnum, str1.c_str());
+    Request request;
+    Reply reply;
+    int status;
+    request.ParseFromString(str1);
+    switch (request.op()) {
+    case spanstore::proto::Request::GET:
+        if (request.get().has_timestamp()) {
+            pair<Timestamp, string> val;
+            status = store->Get(request.txnid(), request.get().key(),
+                               request.get().timestamp(), val);
+            if (status == 0) {
+                reply.set_value(val.second);
+            }
+        } else {
+            pair<Timestamp, string> val;
+            status = store->Get(request.txnid(), request.get().key(), val);
+            if (status == 0) {
+                reply.set_value(val.second);
+                reply.set_timestamp(val.first.getTimestamp());
+            }
+        }
+        replicate = false;
+        reply.set_status(status);
+        reply.SerializeToString(&str2);
+        break;
+    case spanstore::proto::Request::PREPARE:
+        // Prepare is the only case that is conditionally run at the leader
+        status = store->Prepare(request.txnid(),
+                                Transaction(request.prepare().txn()));
+        // if prepared, then replicate result
+        if (status == 0) {
+            replicate = true;
+            // get a prepare timestamp and send along to replicas
+            if (mode == MODE_SPAN_LOCK || mode == MODE_SPAN_OCC) {
+                request.mutable_prepare()->set_timestamp(timeServer.GetTime());
+            }
+            request.SerializeToString(&str2);
+        } else {
+            // if abort, don't replicate
+            replicate = false;
+            reply.set_status(status);
+            reply.SerializeToString(&str2);
+        }
+        break;
+    case spanstore::proto::Request::COMMIT:
+        replicate = true;
+        str2 = str1;
+        break;
+    case spanstore::proto::Request::ABORT:
+        replicate = true;
+        str2 = str1;
+        break;
+    default:
+        Panic("Unrecognized operation.");
+    }
+/* Gets called when a command is issued using client.Invoke(...) to this
+ * replica group. 
+ * opnum is the operation number.
+ * str1 is the request string passed by the client.
+ * str2 is the reply which will be sent back to the client.
+ */
+Server::ReplicaUpcall(opnum_t opnum,
+              const string &str1,
+              string &str2)
+    Debug("Received Upcall: %lu %s", opnum, str1.c_str());
+    Request request;
+    Reply reply;
+    int status = 0;
+    request.ParseFromString(str1);
+    switch (request.op()) {
+    case spanstore::proto::Request::GET:
+        return;
+    case spanstore::proto::Request::PREPARE:
+        // get a prepare timestamp and return to client
+        store->Prepare(request.txnid(),
+                       Transaction(request.prepare().txn()));
+        if (mode == MODE_SPAN_LOCK || mode == MODE_SPAN_OCC) {
+            reply.set_timestamp(request.prepare().timestamp());
+        }
+        break;
+    case spanstore::proto::Request::COMMIT:
+        store->Commit(request.txnid(), request.commit().timestamp());
+        break;
+    case spanstore::proto::Request::ABORT:
+        store->Abort(request.txnid(), Transaction(request.abort().txn()));
+        break;
+    default:
+        Panic("Unrecognized operation.");
+    }
+    reply.set_status(status);
+    reply.SerializeToString(&str2);
+Server::UnloggedUpcall(const string &str1, string &str2)
+    Request request;
+    Reply reply;
+    int status;
+    request.ParseFromString(str1);
+    ASSERT(request.op() == spanstore::proto::Request::GET);
+    if (request.get().has_timestamp()) {
+        pair<Timestamp, string> val;
+        status = store->Get(request.txnid(), request.get().key(),
+                            request.get().timestamp(), val);
+        if (status == 0) {
+            reply.set_value(val.second);
+        }
+    } else {
+        pair<Timestamp, string> val;
+        status = store->Get(request.txnid(), request.get().key(), val);
+        if (status == 0) {
+            reply.set_value(val.second);
+            reply.set_timestamp(val.first.getTimestamp());
+        }
+    }
+    reply.set_status(status);
+    reply.SerializeToString(&str2);
+Server::Load(const string &key, const string &value, const Timestamp timestamp)
+    store->Load(key, value, timestamp);
+} // namespace spanstore
+main(int argc, char **argv)
+    int index = -1;
+    unsigned int myShard=0, maxShard=1, nKeys=1;
+    const char *configPath = NULL;
+    const char *keyPath = NULL;
+    uint64_t skew = 0, error = 0;
+    spanstore::Mode mode;
+    // Parse arguments
+    int opt;
+    while ((opt = getopt(argc, argv, "c:i:m:e:s:f:n:N:k:")) != -1) {
+        switch (opt) {
+        case 'c':
+            configPath = optarg;
+            break;
+        case 'i':
+        {
+            char *strtolPtr;
+            index = strtoul(optarg, &strtolPtr, 10);
+            if ((*optarg == '\0') || (*strtolPtr != '\0') || (index < 0))
+            {
+                fprintf(stderr, "option -i requires a numeric arg\n");
+            }
+            break;
+        }
+        case 'm':
+        {
+            if (strcasecmp(optarg, "lock") == 0) {
+                mode = spanstore::MODE_LOCK;
+            } else if (strcasecmp(optarg, "occ") == 0) {
+                mode = spanstore::MODE_OCC;
+            } else if (strcasecmp(optarg, "span-lock") == 0) {
+                mode = spanstore::MODE_SPAN_LOCK;
+            } else if (strcasecmp(optarg, "span-occ") == 0) {
+                mode = spanstore::MODE_SPAN_OCC;
+            } else {
+                fprintf(stderr, "unknown mode '%s'\n", optarg);
+            }
+            break;
+        }
+        case 's':
+        {
+            char *strtolPtr;
+            skew = strtoul(optarg, &strtolPtr, 10);
+            if ((*optarg == '\0') || (*strtolPtr != '\0') || (skew < 0))
+            {
+                fprintf(stderr, "option -s requires a numeric arg\n");
+            }
+            break;
+        }
+        case 'e':
+        {
+            char *strtolPtr;
+            error = strtoul(optarg, &strtolPtr, 10);
+            if ((*optarg == '\0') || (*strtolPtr != '\0') || (error < 0))
+            {
+                fprintf(stderr, "option -e requires a numeric arg\n");
+            }
+            break;
+        }
+        case 'k':
+        {
+            char *strtolPtr;
+            nKeys = strtoul(optarg, &strtolPtr, 10);
+            if ((*optarg == '\0') || (*strtolPtr != '\0') || (nKeys < 0))
+            {
+                fprintf(stderr, "option -e requires a numeric arg\n");
+            }
+            break;
+        }
+        case 'n':
+        {
+            char *strtolPtr;
+            myShard = strtoul(optarg, &strtolPtr, 10);
+            if ((*optarg == '\0') || (*strtolPtr != '\0') || (myShard < 0))
+            {
+                fprintf(stderr, "option -e requires a numeric arg\n");
+            }
+            break;
+        }
+        case 'N':
+        {
+            char *strtolPtr;
+            maxShard = strtoul(optarg, &strtolPtr, 10);
+            if ((*optarg == '\0') || (*strtolPtr != '\0') || (maxShard <= 0))
+            {
+                fprintf(stderr, "option -e requires a numeric arg\n");
+            }
+            break;
+        }
+        case 'f':   // Load keys from file
+        {
+            keyPath = optarg;
+            break;
+        }
+        default:
+            fprintf(stderr, "Unknown argument %s\n", argv[optind]);
+        }
+    }
+    if (!configPath) {
+        fprintf(stderr, "option -c is required\n");
+    }
+    if (index == -1) {
+        fprintf(stderr, "option -i is required\n");
+    }
+    if (mode == spanstore::MODE_UNKNOWN) {
+        fprintf(stderr, "option -m is required\n");
+    }
+    // Load configuration
+    std::ifstream configStream(configPath);
+    if (configStream.fail()) {
+        fprintf(stderr, "unable to read configuration file: %s\n", configPath);
+    }
+    specpaxos::Configuration config(configStream);
+    if (index >= config.n) {
+        fprintf(stderr, "replica index %d is out of bounds; "
+                "only %d replicas defined\n", index, config.n);
+    }
+    UDPTransport transport(0.0, 0.0, 0);
+    spanstore::Server server(mode, skew, error);
+    specpaxos::vr::VRReplica replica(config, index, &transport, 1, &server);
+    if (keyPath) {
+        string key;
+        ifstream in;
+        in.open(keyPath);
+        if (!in) {
+            fprintf(stderr, "Could not read keys from: %s\n", keyPath);
+            exit(0);
+        }
+        for (unsigned int i = 0; i < nKeys; i++) {
+            getline(in, key);
+            uint64_t hash = 5381;
+            const char* str = key.c_str();
+            for (unsigned int j = 0; j < key.length(); j++) {
+                hash = ((hash << 5) + hash) + (uint64_t)str[j];
+            }
+            if (hash % maxShard == myShard) {
+                server.Load(key, "null", Timestamp());
+            }
+        }
+        in.close();
+    }
+    transport.Run();
+    return 0;
// -*- mode: c++; c-file-style: "k&r"; c-basic-offset: 4 -*-
// vim: set ts=4 sw=4:
+// vim: set ts=4 sw=4:
*
+ * spanstore/replica.h:
+ *   A single SpanStore server replica.
+ *
+ **********************************************************************/
+#ifndef _SPAN_SERVER_H_
+#define _SPAN_SERVER_H_
+#include "paxos-lib/lib/configuration.h"
+#include "paxos-lib/common/replica.h"
+#include "paxos-lib/lib/udptransport.h"
+#include "paxos-lib/vr/replica.h"
+#include "common/truetime.h"
+#include "common/txnstore.h"
+#include "spanstore/occstore.h"
+#include "spanstore/lockstore.h"
+#include "spanstore/span-proto.pb.h"
+namespace spanstore {
+enum Mode {
+    MODE_OCC,
+class Server : public specpaxos::AppReplica
+    Server(Mode mode, uint64_t skew, uint64_t error);
+    virtual ~Server();
+    virtual void LeaderUpcall(opnum_t opnum, const string &str1, bool &replicate, string &str2);
+    virtual void ReplicaUpcall(opnum_t opnum, const string &str1, string &str2);
+    virtual void UnloggedUpcall(const string &str1, string &str2);
+    void Load(const string &key, const string &value, const Timestamp timestamp);
+    Mode mode;
+    TxnStore *store;
+    TrueTime timeServer;
+} // namespace spanstore
+#endif /* _SPAN_SERVER_H_ */
+import "common/common-proto.proto";
+package spanstore.proto;
+message GetMessage {
+    required string key = 1;
+    optional TimestampMessage timestamp = 2;
+message PrepareMessage {
+    required TransactionMessage txn = 1;
+    optional uint64 timestamp = 2;
+message CommitMessage {
+    required uint64 timestamp = 1;
+message AbortMessage {
+     required TransactionMessage txn = 1;
+message Request {
+     enum Operation {
+          GET = 1;
+          PREPARE = 2;
+          COMMIT = 3;
+          ABORT = 4;
+     }	
+     required Operation op = 1;
+     required uint64 txnid = 2;
+     optional GetMessage get = 3;
+     optional PrepareMessage prepare = 4;
+     optional CommitMessage commit = 5;
+     optional AbortMessage abort = 6;
+message Reply {
+     // 0 = OK
+     // -1 = failed
+     // -2 = retry
+     // -3 = abstain/no reply
+     required int32 status = 1;
+     optional string value = 2;
+     optional uint64 timestamp = 3;
// -*- mode: c++; c-file-style: "k&r"; c-basic-offset: 4 -*-
// vim: set ts=4 sw=4:
+// vim: set ts=4 sw=4:
*
+ * spanstore/spanclient.cc:
+ *   Single shard SpanStore client implementation.
+ *
+ **********************************************************************/
+#include "spanstore/spanclient.h"
+#include "common/txnstore.h"
+namespace spanstore {
+using namespace std;
+using namespace proto;
+SpanClient::SpanClient(Mode mode, const string &configPath,
+                       Transport *transport, uint64_t client_id, int
+                       shard, int closestReplica)
+    : transport(transport), client_id(client_id), shard(shard)
+    ifstream configStream(configPath);
+    if (configStream.fail()) {
+        fprintf(stderr, "unable to read configuration file: %s\n",
+                configPath.c_str());
+    }
+    specpaxos::Configuration config(configStream);
+    client = new specpaxos::vr::VRClient(config, transport);
+    if (mode == MODE_OCC || mode == MODE_SPAN_OCC) {
+        if (closestReplica == -1) {
+            replica = client_id % config.n;
+        } else {
+            replica = closestReplica;
+        }
+        Debug("Sending unlogged to replica %i", replica);
+    } else {
+        replica = 0;
+    }
+    waiting = NULL;
+    blockingBegin = NULL;
+    delete client;
+/* Sends BEGIN to a single shard indexed by i. */
+SpanClient::Begin(uint64_t id)
+    Debug("[shard %i] BEGIN: %lu", shard, id);
+    // Wait for any previous pending requests.
+    if (blockingBegin != NULL) {
+        blockingBegin->GetReply();
+        delete blockingBegin;
+        blockingBegin = NULL;
+    }
+/* Returns the value corresponding to the supplied key. */
+SpanClient::Get(uint64_t id, const string &key, Promise *promise)
+    // Send the GET operation to appropriate shard.
+    Debug("[shard %i] Sending GET [%s]", shard, key.c_str());
+    // create request
+    string request_str;
+    Request request;
+    request.set_op(Request::GET);
+    request.set_txnid(id);
+    request.mutable_get()->set_key(key);
+    request.SerializeToString(&request_str);
+    // set to 1 second by default
+    int timeout = (promise != NULL) ? promise->GetTimeout() : 1000;
+    transport->Timer(0, [=]() {
+	    waiting = promise;    
+        client->InvokeUnlogged(replica,
+                               request_str,
+                               bind(&SpanClient::GetCallback,
+                                    this,
+                                    placeholders::_1,
+                                    placeholders::_2),
+                               bind(&SpanClient::GetTimeout,
+                                    this),
+                               timeout); // timeout in ms
+    });
+SpanClient::Get(uint64_t id, const string &key,
+                const Timestamp &timestamp, Promise *promise)
+    // Send the GET operation to appropriate shard.
+    Debug("[shard %i] Sending GET [%s]", shard, key.c_str());
+    // create request
+    string request_str;
+    Request request;
+    request.set_op(Request::GET);
+    request.set_txnid(id);
+    request.mutable_get()->set_key(key);
+    timestamp.serialize(request.mutable_get()->mutable_timestamp());
+    request.SerializeToString(&request_str);
+    // set to 1 second by default
+    int timeout = (promise != NULL) ? promise->GetTimeout() : 1000;
+    transport->Timer(0, [=]() {
+	    waiting = promise;
+            client->InvokeUnlogged(replica,
+                                   request_str,
+                                   bind(&SpanClient::GetCallback,
+                                        this,
+                                        placeholders::_1,
+                                        placeholders::_2),
+                                   bind(&SpanClient::GetTimeout,
+                                        this),
+                                   timeout); // timeout in ms
+        });
+SpanClient::Prepare(uint64_t id, const Transaction &txn,
+                    const Timestamp &timestamp, Promise *promise)
+    Debug("[shard %i] Sending PREPARE: %lu", shard, id);
+    // create prepare request
+    string request_str;
+    Request request;
+    request.set_op(Request::PREPARE);
+    request.set_txnid(id);
+    txn.serialize(request.mutable_prepare()->mutable_txn());
+    request.SerializeToString(&request_str);
+    transport->Timer(0, [=]() {
+	    waiting = promise;
+            client->Invoke(request_str,
+                           bind(&SpanClient::PrepareCallback,
+                                this,
+                                placeholders::_1,
+                                placeholders::_2));
+        });
+SpanClient::Commit(uint64_t id, const Transaction &txn,
+                   uint64_t timestamp, Promise *promise)
+    Debug("[shard %i] Sending COMMIT: %lu", shard, id);
+    // create commit request
+    string request_str;
+    Request request;
+    request.set_op(Request::COMMIT);
+    request.set_txnid(id);
+    request.mutable_commit()->set_timestamp(timestamp);
+    request.SerializeToString(&request_str);
+    blockingBegin = new Promise(COMMIT_TIMEOUT);
+    transport->Timer(0, [=]() {
+        waiting = promise;
+        client->Invoke(request_str,
+            bind(&SpanClient::CommitCallback,
+                this,
+                placeholders::_1,
+                placeholders::_2));
+    });
+/* Aborts the ongoing transaction. */
+SpanClient::Abort(uint64_t id, const Transaction &txn, Promise *promise)
+    Debug("[shard %i] Sending ABORT: %lu", shard, id);
+    // create abort request
+    string request_str;
+    Request request;
+    request.set_op(Request::ABORT);
+    request.set_txnid(id);
+    txn.serialize(request.mutable_abort()->mutable_txn());
+    request.SerializeToString(&request_str);
+    blockingBegin = new Promise(ABORT_TIMEOUT);
+    transport->Timer(0, [=]() {
+	    waiting = promise;
+	    client->Invoke(request_str,
+			   bind(&SpanClient::AbortCallback,
+				this,
+				placeholders::_1,
+				placeholders::_2));
+    });
+    if (waiting != NULL) {
+        Promise *w = waiting;
+        waiting = NULL;
+        w->Reply(REPLY_TIMEOUT);
+    }
+/* Callback from a shard replica on get operation completion. */
+SpanClient::GetCallback(const string &request_str, const string &reply_str)
+    /* Replies back from a shard. */
+    Reply reply;
+    reply.ParseFromString(reply_str);
+    Debug("[shard %i] Received GET callback [%d]", shard, reply.status());
+    if (waiting != NULL) {
+        Promise *w = waiting;
+        waiting = NULL;
+        if (reply.has_timestamp()) {
+            w->Reply(reply.status(), Timestamp(reply.timestamp()), reply.value());
+        } else {
+            w->Reply(reply.status(), reply.value());
+        }
+    }
+/* Callback from a shard replica on prepare operation completion. */
+SpanClient::PrepareCallback(const string &request_str, const string &reply_str)
+    Reply reply;
+    reply.ParseFromString(reply_str);
+    Debug("[shard %i] Received PREPARE callback [%d]", shard, reply.status());
+    if (waiting != NULL) {
+        Promise *w = waiting;
+        waiting = NULL;
+        if (reply.has_timestamp()) {
+            w->Reply(reply.status(), Timestamp(reply.timestamp(), 0));
+        } else {
+            w->Reply(reply.status(), Timestamp());
+        }
+    }
+/* Callback from a shard replica on commit operation completion. */
+SpanClient::CommitCallback(const string &request_str, const string &reply_str)
+    // COMMITs always succeed.
+    Reply reply;
+    reply.ParseFromString(reply_str);
+    ASSERT(reply.status() == REPLY_OK);
+    ASSERT(blockingBegin != NULL);
+    blockingBegin->Reply(0);
+    if (waiting != NULL) {
+        Promise *w = waiting;
+        waiting = NULL;
+        w->Reply(reply.status());
+    }
+    Debug("[shard %i] Received COMMIT callback [%d]", shard, reply.status());
+/* Callback from a shard replica on abort operation completion. */
+SpanClient::AbortCallback(const string &request_str, const string &reply_str)
+    // ABORTs always succeed.
+    Reply reply;
+    reply.ParseFromString(reply_str);
+    ASSERT(reply.status() == REPLY_OK);
+    ASSERT(blockingBegin != NULL);
+    blockingBegin->Reply(0);
+    if (waiting != NULL) {
+        Promise *w = waiting;
+        waiting = NULL;
+        w->Reply(reply.status());
+    }
+    Debug("[shard %i] Received ABORT callback [%d]", shard, reply.status());
+} // namespace spanstore
// -*- mode: c++; c-file-style: "k&r"; c-basic-offset: 4 -*-
// vim: set ts=4 sw=4:
+// vim: set ts=4 sw=4:
*
+ * spanstore/spanclient.h:
+ *   Single shard SpanStore client interface.
+ *
+ **********************************************************************/
+#ifndef _SPAN_TXN_CLIENT_H_
+#define _SPAN_TXN_CLIENT_H_
+#include "paxos-lib/lib/assert.h"
+#include "paxos-lib/lib/message.h"
+#include "paxos-lib/lib/transport.h"
+#include "paxos-lib/common/client.h"
+#include "paxos-lib/vr/client.h"
+#include "common/txnclient.h"
+#include "common/timestamp.h"
+#include "common/transaction.h"
+#include "spanstore/span-proto.pb.h"
+#include <string>
+#include <mutex>
+#include <condition_variable>
+namespace spanstore {
+enum Mode {
+    MODE_OCC,
+class SpanClient : public TxnClient
+    /* Constructor needs path to shard config. */
+    SpanClient(Mode mode,
+        const std::string &configPath, 
+        Transport *transport,
+        uint64_t client_id,
+        int shard,
+        int closestReplica);
+    ~SpanClient();
+    // Overriding from TxnClient
+    void Begin(uint64_t id);
+    void Get(uint64_t id,
+            const std::string &key,
+            Promise *promise = NULL);
+    void Get(uint64_t id,
+            const std::string &key,
+            const Timestamp &timestamp, 
+            Promise *promise = NULL);
+    void Prepare(uint64_t id, 
+                 const Transaction &txn,
+                 const Timestamp &timestamp = Timestamp(),
+                 Promise *promise = NULL);
+    void Commit(uint64_t id, 
+                const Transaction &txn,
+                uint64_t timestamp,
+                Promise *promise = NULL);
+    void Abort(uint64_t id, 
+               const Transaction &txn,
+               Promise *promise = NULL);
+    Transport *transport; // Transport layer.
+    uint64_t client_id; // Unique ID for this client.
+    int shard; // which shard this client accesses
+    int replica; // which replica to use for reads
+    specpaxos::Client *client; // Client proxy.
+    Promise *waiting; // waiting thread
+    Promise *blockingBegin; // block until finished 
+    /* Timeout for Get requests, which only go to one replica. */
+    void GetTimeout();
+    /* Callbacks for hearing back from a shard for an operation. */
+    void GetCallback(const std::string &, const std::string &);
+    void PrepareCallback(const std::string &, const std::string &);
+    void CommitCallback(const std::string &, const std::string &);
+    void AbortCallback(const std::string &, const std::string &);
+    /* Helper Functions for starting and finishing requests */
+    void StartRequest();
+    void WaitForResponse();
+    void FinishRequest(const std::string &reply_str);
+    void FinishRequest();
+    int SendGet(const std::string &request_str);
+} // namespace spanstore
+#endif /* _SPAN_TXN_CLIENT_H_ */