From f7ae438db84db9bfd7dd201fc1a654cacc8a1197 Mon Sep 17 00:00:00 2001 From: Irene Zhang <iyzhang@cs.washington.edu> Date: Wed, 24 Jun 2015 09:42:26 -0700 Subject: [PATCH] moving and adding some more storage level files --- store/txnstore/client.cc | 284 +++++++++++++++ store/txnstore/client.h | 100 ++++++ store/{backend => txnstore/lib}/lockstore.cc | 0 store/{backend => txnstore/lib}/lockstore.h | 0 store/{backend => txnstore/lib}/occstore.cc | 0 store/{backend => txnstore/lib}/occstore.h | 0 store/txnstore/server.cc | 357 +++++++++++++++++++ store/txnstore/server.h | 52 +++ store/txnstore/span-proto.proto | 46 +++ store/txnstore/spanclient.cc | 299 ++++++++++++++++ store/txnstore/spanclient.h | 100 ++++++ 11 files changed, 1238 insertions(+) create mode 100644 store/txnstore/client.cc create mode 100644 store/txnstore/client.h rename store/{backend => txnstore/lib}/lockstore.cc (100%) rename store/{backend => txnstore/lib}/lockstore.h (100%) rename store/{backend => txnstore/lib}/occstore.cc (100%) rename store/{backend => txnstore/lib}/occstore.h (100%) create mode 100644 store/txnstore/server.cc create mode 100644 store/txnstore/server.h create mode 100644 store/txnstore/span-proto.proto create mode 100644 store/txnstore/spanclient.cc create mode 100644 store/txnstore/spanclient.h diff --git a/store/txnstore/client.cc b/store/txnstore/client.cc new file mode 100644 index 0000000..84e3ef9 --- /dev/null +++ b/store/txnstore/client.cc @@ -0,0 +1,284 @@ +// -*- mode: c++; c-file-style: "k&r"; c-basic-offset: 4 -*- +// vim: set ts=4 sw=4: +/*********************************************************************** + * + * spanstore/client.cc: + * SpanStore client implementation. + * + **********************************************************************/ + +#include "spanstore/client.h" + +using namespace std; + +namespace spanstore { + +Client::Client(Mode mode, string configPath, int nShards, + int closestReplica, TrueTime timeServer) + : transport(0.0, 0.0, 0), mode(mode), timeServer(timeServer) +{ + // Initialize all state here; + client_id = 0; + while (client_id == 0) { + random_device rd; + mt19937_64 gen(rd()); + uniform_int_distribution<uint64_t> dis; + client_id = dis(gen); + } + t_id = (client_id/10000)*10000; + + nshards = nShards; + bclient.reserve(nshards); + + Debug("Initializing SpanStore client with id [%lu]", client_id); + + /* Start a client for time stamp server. */ + if (mode == MODE_OCC) { + string tssConfigPath = configPath + ".tss.config"; + ifstream tssConfigStream(tssConfigPath); + if (tssConfigStream.fail()) { + fprintf(stderr, "unable to read configuration file: %s\n", + tssConfigPath.c_str()); + } + specpaxos::Configuration tssConfig(tssConfigStream); + tss = new specpaxos::vr::VRClient(tssConfig, &transport); + } + + /* Start a client for each shard. */ + for (int i = 0; i < nShards; i++) { + string shardConfigPath = configPath + to_string(i) + ".config"; + SpanClient *spanclient = new SpanClient(mode, shardConfigPath, + &transport, client_id, i, closestReplica); + bclient[i] = new BufferClient(spanclient); + } + + /* Run the transport in a new thread. */ + clientTransport = new thread(&Client::run_client, this); + + Debug("SpanStore client [%lu] created!", client_id); +} + +Client::~Client() +{ + transport.Stop(); + delete tss; + for (auto b : bclient) { + delete b; + } + clientTransport->join(); +} + +/* Runs the transport event loop. */ +void +Client::run_client() +{ + transport.Run(); +} + +/* Begins a transaction. All subsequent operations before a commit() or + * abort() are part of this transaction. + * + * Return a TID for the transaction. + */ +void +Client::Begin() +{ + Debug("BEGIN Transaction"); + t_id++; + participants.clear(); + commit_sleep = -1; + for (int i = 0; i < nshards; i++) { + bclient[i]->Begin(t_id); + } +} + +/* Returns the value corresponding to the supplied key. */ +int +Client::Get(const string &key, string &value) +{ + // Contact the appropriate shard to get the value. + int i = key_to_shard(key, nshards); + + // If needed, add this shard to set of participants and send BEGIN. + if (participants.find(i) == participants.end()) { + participants.insert(i); + } + + // Send the GET operation to appropriate shard. + Promise promise(GET_TIMEOUT); + + bclient[i]->Get(key, &promise); + value = promise.GetValue(); + + return promise.GetReply(); +} + +/* Sets the value corresponding to the supplied key. */ +int +Client::Put(const string &key, const string &value) +{ + // Contact the appropriate shard to set the value. + int i = key_to_shard(key, nshards); + + // If needed, add this shard to set of participants and send BEGIN. + if (participants.find(i) == participants.end()) { + participants.insert(i); + } + + Promise promise(PUT_TIMEOUT); + + // Buffering, so no need to wait. + bclient[i]->Put(key, value, &promise); + return promise.GetReply(); +} + +int +Client::Prepare(uint64_t &ts) +{ + int status; + + // 1. Send commit-prepare to all shards. + Debug("PREPARE Transaction"); + list<Promise *> promises; + + for (auto p : participants) { + Debug("Sending prepare to shard [%d]", p); + promises.push_back(new Promise(PREPARE_TIMEOUT)); + bclient[p]->Prepare(Timestamp(),promises.back()); + } + + // In the meantime ... go get a timestamp for OCC + if (mode == MODE_OCC) { + // Have to go to timestamp server + unique_lock<mutex> lk(cv_m); + + Debug("Sending request to TimeStampServer"); + tss->Invoke("", bind(&Client::tssCallback, this, + placeholders::_1, + placeholders::_2)); + + Debug("Waiting for TSS reply"); + cv.wait(lk); + ts = stol(replica_reply, NULL, 10); + Debug("TSS reply received: %lu", ts); + } + + // 2. Wait for reply from all shards. (abort on timeout) + Debug("Waiting for PREPARE replies"); + + status = REPLY_OK; + for (auto p : promises) { + // If any shard returned false, abort the transaction. + if (p->GetReply() != REPLY_OK) { + if (status != REPLY_FAIL) { + status = p->GetReply(); + } + } + // Also, find the max of all prepare timestamp returned. + if (p->GetTimestamp().getTimestamp() > ts) { + ts = p->GetTimestamp().getTimestamp(); + } + delete p; + } + return status; +} + +/* Attempts to commit the ongoing transaction. */ +bool +Client::Commit() +{ + // Implementing 2 Phase Commit + uint64_t ts = 0; + int status; + + for (int i = 0; i < COMMIT_RETRIES; i++) { + status = Prepare(ts); + if (status == REPLY_OK || status == REPLY_FAIL) { + break; + } + } + + if (status == REPLY_OK) { + // For Spanner like systems, calculate timestamp. + if (mode == MODE_SPAN_OCC || mode == MODE_SPAN_LOCK) { + uint64_t now, err; + struct timeval t1, t2; + + gettimeofday(&t1, NULL); + timeServer.GetTimeAndError(now, err); + + if (now > ts) { + ts = now; + } else { + uint64_t diff = ((ts >> 32) - (now >> 32))*1000000 + + ((ts & 0xffffffff) - (now & 0xffffffff)); + err += diff; + } + + commit_sleep = (int)err; + + // how good are we at waking up on time? + Debug("Commit wait sleep: %lu", err); + if (err > 1000000) + Warning("Sleeping for too long! %lu; now,ts: %lu,%lu", err, now, ts); + if (err > 150) { + usleep(err-150); + } + // fine grained busy-wait + while (1) { + gettimeofday(&t2, NULL); + if ((t2.tv_sec-t1.tv_sec)*1000000 + + (t2.tv_usec-t1.tv_usec) > (int64_t)err) { + break; + } + } + } + + // Send commits + Debug("COMMIT Transaction at [%lu]", ts); + + for (auto p : participants) { + Debug("Sending commit to shard [%d]", p); + bclient[p]->Commit(ts); + } + return true; + } + + // 4. If not, send abort to all shards. + Abort(); + return false; +} + +/* Aborts the ongoing transaction. */ +void +Client::Abort() +{ + Debug("ABORT Transaction"); + for (auto p : participants) { + bclient[p]->Abort(); + } +} + +/* Return statistics of most recent transaction. */ +vector<int> +Client::Stats() +{ + vector<int> v; + return v; +} + +/* Callback from a tss replica upon any request. */ +void +Client::tssCallback(const string &request, const string &reply) +{ + lock_guard<mutex> lock(cv_m); + Debug("Received TSS callback [%s]", reply.c_str()); + + // Copy reply to "replica_reply". + replica_reply = reply; + + // Wake up thread waiting for the reply. + cv.notify_all(); +} + +} // namespace spanstore diff --git a/store/txnstore/client.h b/store/txnstore/client.h new file mode 100644 index 0000000..5498ffc --- /dev/null +++ b/store/txnstore/client.h @@ -0,0 +1,100 @@ +// -*- mode: c++; c-file-style: "k&r"; c-basic-offset: 4 -*- +// vim: set ts=4 sw=4: +/*********************************************************************** + * + * spanstore/client.h: + * SpanStore client interface. + * + **********************************************************************/ + +#ifndef _SPAN_CLIENT_H_ +#define _SPAN_CLIENT_H_ + +#include "paxos-lib/lib/assert.h" +#include "paxos-lib/lib/message.h" +#include "paxos-lib/lib/configuration.h" +#include "paxos-lib/lib/udptransport.h" +#include "paxos-lib/common/client.h" +#include "paxos-lib/vr/client.h" +#include "common/client.h" +#include "common/bufferclient.h" +#include "common/truetime.h" +#include "common/txnstore.h" +#include "spanstore/spanclient.h" +#include "spanstore/span-proto.pb.h" + +#include <condition_variable> +#include <mutex> +#include <string> +#include <set> +#include <thread> + +namespace spanstore { + +class Client : public ::Client +{ +public: + Client(Mode mode, string configPath, int nshards, + int closestReplica, TrueTime timeServer); + ~Client(); + + // Overriding functions from ::Client + void Begin(); + int Get(const string &key, string &value); + int Put(const string &key, const string &value); + bool Commit(); + void Abort(); + std::vector<int> Stats(); + +private: + /* Private helper functions. */ + void run_client(); // Runs the transport event loop. + + // timestamp server call back + void tssCallback(const string &request, const string &reply); + + // local Prepare function + int Prepare(uint64_t &ts); + + // Unique ID for this client. + uint64_t client_id; + + // Ongoing transaction ID. + uint64_t t_id; + + // Number of shards in SpanStore. + long nshards; + + // List of participants in the ongoing transaction. + std::set<int> participants; + + // Transport used by paxos client proxies. + UDPTransport transport; + + // Thread running the transport event loop. + std::thread *clientTransport; + + // Buffering client for each shard. + std::vector<BufferClient *> bclient; + + // Mode in which spanstore runs. + Mode mode; + + // Timestamp server shard. + specpaxos::Client *tss; + + // TrueTime server. + TrueTime timeServer; + + // Synchronization variables. + std::condition_variable cv; + std::mutex cv_m; + string replica_reply; + + // Time spend sleeping for commit. + int commit_sleep; +}; + +} // namespace spanstore + +#endif /* _SPAN_CLIENT_H_ */ diff --git a/store/backend/lockstore.cc b/store/txnstore/lib/lockstore.cc similarity index 100% rename from store/backend/lockstore.cc rename to store/txnstore/lib/lockstore.cc diff --git a/store/backend/lockstore.h b/store/txnstore/lib/lockstore.h similarity index 100% rename from store/backend/lockstore.h rename to store/txnstore/lib/lockstore.h diff --git a/store/backend/occstore.cc b/store/txnstore/lib/occstore.cc similarity index 100% rename from store/backend/occstore.cc rename to store/txnstore/lib/occstore.cc diff --git a/store/backend/occstore.h b/store/txnstore/lib/occstore.h similarity index 100% rename from store/backend/occstore.h rename to store/txnstore/lib/occstore.h diff --git a/store/txnstore/server.cc b/store/txnstore/server.cc new file mode 100644 index 0000000..87f26b3 --- /dev/null +++ b/store/txnstore/server.cc @@ -0,0 +1,357 @@ +// -*- mode: c++; c-file-style: "k&r"; c-basic-offset: 4 -*- +// vim: set ts=4 sw=4: +/*********************************************************************** + * + * spanstore/server.cc: + * Implementation of a single SpanStore server. + * + **********************************************************************/ + +#include "spanstore/server.h" + +using namespace std; + +namespace spanstore { + +using namespace proto; + +Server::Server(Mode mode, uint64_t skew, uint64_t error) : mode(mode) +{ + timeServer = TrueTime(skew, error); + + switch (mode) { + case MODE_LOCK: + case MODE_SPAN_LOCK: + store = new spanstore::LockStore(); + break; + case MODE_OCC: + case MODE_SPAN_OCC: + store = new spanstore::OCCStore(); + break; + default: + NOT_REACHABLE(); + } +} + +Server::~Server() +{ + delete store; +} + +void +Server::LeaderUpcall(opnum_t opnum, const string &str1, bool &replicate, string &str2) +{ + Debug("Received LeaderUpcall: %lu %s", opnum, str1.c_str()); + + Request request; + Reply reply; + int status; + + request.ParseFromString(str1); + + switch (request.op()) { + case spanstore::proto::Request::GET: + if (request.get().has_timestamp()) { + pair<Timestamp, string> val; + status = store->Get(request.txnid(), request.get().key(), + request.get().timestamp(), val); + if (status == 0) { + reply.set_value(val.second); + } + } else { + pair<Timestamp, string> val; + status = store->Get(request.txnid(), request.get().key(), val); + if (status == 0) { + reply.set_value(val.second); + reply.set_timestamp(val.first.getTimestamp()); + } + } + replicate = false; + reply.set_status(status); + reply.SerializeToString(&str2); + break; + case spanstore::proto::Request::PREPARE: + // Prepare is the only case that is conditionally run at the leader + status = store->Prepare(request.txnid(), + Transaction(request.prepare().txn())); + + // if prepared, then replicate result + if (status == 0) { + replicate = true; + // get a prepare timestamp and send along to replicas + if (mode == MODE_SPAN_LOCK || mode == MODE_SPAN_OCC) { + request.mutable_prepare()->set_timestamp(timeServer.GetTime()); + } + request.SerializeToString(&str2); + } else { + // if abort, don't replicate + replicate = false; + reply.set_status(status); + reply.SerializeToString(&str2); + } + break; + case spanstore::proto::Request::COMMIT: + replicate = true; + str2 = str1; + break; + case spanstore::proto::Request::ABORT: + replicate = true; + str2 = str1; + break; + default: + Panic("Unrecognized operation."); + } +} + +/* Gets called when a command is issued using client.Invoke(...) to this + * replica group. + * opnum is the operation number. + * str1 is the request string passed by the client. + * str2 is the reply which will be sent back to the client. + */ +void +Server::ReplicaUpcall(opnum_t opnum, + const string &str1, + string &str2) +{ + Debug("Received Upcall: %lu %s", opnum, str1.c_str()); + Request request; + Reply reply; + int status = 0; + + request.ParseFromString(str1); + + switch (request.op()) { + case spanstore::proto::Request::GET: + return; + case spanstore::proto::Request::PREPARE: + // get a prepare timestamp and return to client + store->Prepare(request.txnid(), + Transaction(request.prepare().txn())); + if (mode == MODE_SPAN_LOCK || mode == MODE_SPAN_OCC) { + reply.set_timestamp(request.prepare().timestamp()); + } + break; + case spanstore::proto::Request::COMMIT: + store->Commit(request.txnid(), request.commit().timestamp()); + break; + case spanstore::proto::Request::ABORT: + store->Abort(request.txnid(), Transaction(request.abort().txn())); + break; + default: + Panic("Unrecognized operation."); + } + reply.set_status(status); + reply.SerializeToString(&str2); +} + +void +Server::UnloggedUpcall(const string &str1, string &str2) +{ + Request request; + Reply reply; + int status; + + request.ParseFromString(str1); + + ASSERT(request.op() == spanstore::proto::Request::GET); + + if (request.get().has_timestamp()) { + pair<Timestamp, string> val; + status = store->Get(request.txnid(), request.get().key(), + request.get().timestamp(), val); + if (status == 0) { + reply.set_value(val.second); + } + } else { + pair<Timestamp, string> val; + status = store->Get(request.txnid(), request.get().key(), val); + if (status == 0) { + reply.set_value(val.second); + reply.set_timestamp(val.first.getTimestamp()); + } + } + + reply.set_status(status); + reply.SerializeToString(&str2); +} + +void +Server::Load(const string &key, const string &value, const Timestamp timestamp) +{ + store->Load(key, value, timestamp); +} + +} // namespace spanstore + +int +main(int argc, char **argv) +{ + int index = -1; + unsigned int myShard=0, maxShard=1, nKeys=1; + const char *configPath = NULL; + const char *keyPath = NULL; + uint64_t skew = 0, error = 0; + spanstore::Mode mode; + + // Parse arguments + int opt; + while ((opt = getopt(argc, argv, "c:i:m:e:s:f:n:N:k:")) != -1) { + switch (opt) { + case 'c': + configPath = optarg; + break; + + case 'i': + { + char *strtolPtr; + index = strtoul(optarg, &strtolPtr, 10); + if ((*optarg == '\0') || (*strtolPtr != '\0') || (index < 0)) + { + fprintf(stderr, "option -i requires a numeric arg\n"); + } + break; + } + + case 'm': + { + if (strcasecmp(optarg, "lock") == 0) { + mode = spanstore::MODE_LOCK; + } else if (strcasecmp(optarg, "occ") == 0) { + mode = spanstore::MODE_OCC; + } else if (strcasecmp(optarg, "span-lock") == 0) { + mode = spanstore::MODE_SPAN_LOCK; + } else if (strcasecmp(optarg, "span-occ") == 0) { + mode = spanstore::MODE_SPAN_OCC; + } else { + fprintf(stderr, "unknown mode '%s'\n", optarg); + } + break; + } + + case 's': + { + char *strtolPtr; + skew = strtoul(optarg, &strtolPtr, 10); + if ((*optarg == '\0') || (*strtolPtr != '\0') || (skew < 0)) + { + fprintf(stderr, "option -s requires a numeric arg\n"); + } + break; + } + + case 'e': + { + char *strtolPtr; + error = strtoul(optarg, &strtolPtr, 10); + if ((*optarg == '\0') || (*strtolPtr != '\0') || (error < 0)) + { + fprintf(stderr, "option -e requires a numeric arg\n"); + } + break; + } + + case 'k': + { + char *strtolPtr; + nKeys = strtoul(optarg, &strtolPtr, 10); + if ((*optarg == '\0') || (*strtolPtr != '\0') || (nKeys < 0)) + { + fprintf(stderr, "option -e requires a numeric arg\n"); + } + break; + } + + case 'n': + { + char *strtolPtr; + myShard = strtoul(optarg, &strtolPtr, 10); + if ((*optarg == '\0') || (*strtolPtr != '\0') || (myShard < 0)) + { + fprintf(stderr, "option -e requires a numeric arg\n"); + } + break; + } + + case 'N': + { + char *strtolPtr; + maxShard = strtoul(optarg, &strtolPtr, 10); + if ((*optarg == '\0') || (*strtolPtr != '\0') || (maxShard <= 0)) + { + fprintf(stderr, "option -e requires a numeric arg\n"); + } + break; + } + + case 'f': // Load keys from file + { + keyPath = optarg; + break; + } + + default: + fprintf(stderr, "Unknown argument %s\n", argv[optind]); + } + + + } + + if (!configPath) { + fprintf(stderr, "option -c is required\n"); + } + + if (index == -1) { + fprintf(stderr, "option -i is required\n"); + } + + if (mode == spanstore::MODE_UNKNOWN) { + fprintf(stderr, "option -m is required\n"); + } + + // Load configuration + std::ifstream configStream(configPath); + if (configStream.fail()) { + fprintf(stderr, "unable to read configuration file: %s\n", configPath); + } + specpaxos::Configuration config(configStream); + + if (index >= config.n) { + fprintf(stderr, "replica index %d is out of bounds; " + "only %d replicas defined\n", index, config.n); + } + + UDPTransport transport(0.0, 0.0, 0); + + spanstore::Server server(mode, skew, error); + specpaxos::vr::VRReplica replica(config, index, &transport, 1, &server); + + if (keyPath) { + string key; + ifstream in; + in.open(keyPath); + if (!in) { + fprintf(stderr, "Could not read keys from: %s\n", keyPath); + exit(0); + } + + for (unsigned int i = 0; i < nKeys; i++) { + getline(in, key); + + uint64_t hash = 5381; + const char* str = key.c_str(); + for (unsigned int j = 0; j < key.length(); j++) { + hash = ((hash << 5) + hash) + (uint64_t)str[j]; + } + + if (hash % maxShard == myShard) { + server.Load(key, "null", Timestamp()); + } + } + in.close(); + } + + transport.Run(); + + return 0; +} diff --git a/store/txnstore/server.h b/store/txnstore/server.h new file mode 100644 index 0000000..b72fa9f --- /dev/null +++ b/store/txnstore/server.h @@ -0,0 +1,52 @@ +// -*- mode: c++; c-file-style: "k&r"; c-basic-offset: 4 -*- +// vim: set ts=4 sw=4: +/*********************************************************************** + * + * spanstore/replica.h: + * A single SpanStore server replica. + * + **********************************************************************/ + +#ifndef _SPAN_SERVER_H_ +#define _SPAN_SERVER_H_ + +#include "paxos-lib/lib/configuration.h" +#include "paxos-lib/common/replica.h" +#include "paxos-lib/lib/udptransport.h" +#include "paxos-lib/vr/replica.h" +#include "common/truetime.h" +#include "common/txnstore.h" +#include "spanstore/occstore.h" +#include "spanstore/lockstore.h" +#include "spanstore/span-proto.pb.h" + +namespace spanstore { + +enum Mode { + MODE_UNKNOWN, + MODE_OCC, + MODE_LOCK, + MODE_SPAN_OCC, + MODE_SPAN_LOCK +}; + +class Server : public specpaxos::AppReplica +{ +public: + Server(Mode mode, uint64_t skew, uint64_t error); + virtual ~Server(); + + virtual void LeaderUpcall(opnum_t opnum, const string &str1, bool &replicate, string &str2); + virtual void ReplicaUpcall(opnum_t opnum, const string &str1, string &str2); + virtual void UnloggedUpcall(const string &str1, string &str2); + void Load(const string &key, const string &value, const Timestamp timestamp); + +private: + Mode mode; + TxnStore *store; + TrueTime timeServer; +}; + +} // namespace spanstore + +#endif /* _SPAN_SERVER_H_ */ diff --git a/store/txnstore/span-proto.proto b/store/txnstore/span-proto.proto new file mode 100644 index 0000000..a25c69f --- /dev/null +++ b/store/txnstore/span-proto.proto @@ -0,0 +1,46 @@ +import "common/common-proto.proto"; + +package spanstore.proto; + +message GetMessage { + required string key = 1; + optional TimestampMessage timestamp = 2; +} + +message PrepareMessage { + required TransactionMessage txn = 1; + optional uint64 timestamp = 2; +} + +message CommitMessage { + required uint64 timestamp = 1; +} + +message AbortMessage { + required TransactionMessage txn = 1; +} + +message Request { + enum Operation { + GET = 1; + PREPARE = 2; + COMMIT = 3; + ABORT = 4; + } + required Operation op = 1; + required uint64 txnid = 2; + optional GetMessage get = 3; + optional PrepareMessage prepare = 4; + optional CommitMessage commit = 5; + optional AbortMessage abort = 6; +} + +message Reply { + // 0 = OK + // -1 = failed + // -2 = retry + // -3 = abstain/no reply + required int32 status = 1; + optional string value = 2; + optional uint64 timestamp = 3; +} diff --git a/store/txnstore/spanclient.cc b/store/txnstore/spanclient.cc new file mode 100644 index 0000000..65f321f --- /dev/null +++ b/store/txnstore/spanclient.cc @@ -0,0 +1,299 @@ +// -*- mode: c++; c-file-style: "k&r"; c-basic-offset: 4 -*- +// vim: set ts=4 sw=4: +/*********************************************************************** + * + * spanstore/spanclient.cc: + * Single shard SpanStore client implementation. + * + **********************************************************************/ + +#include "spanstore/spanclient.h" +#include "common/txnstore.h" + +namespace spanstore { + +using namespace std; +using namespace proto; + +SpanClient::SpanClient(Mode mode, const string &configPath, + Transport *transport, uint64_t client_id, int + shard, int closestReplica) + : transport(transport), client_id(client_id), shard(shard) +{ + ifstream configStream(configPath); + if (configStream.fail()) { + fprintf(stderr, "unable to read configuration file: %s\n", + configPath.c_str()); + } + specpaxos::Configuration config(configStream); + + client = new specpaxos::vr::VRClient(config, transport); + + if (mode == MODE_OCC || mode == MODE_SPAN_OCC) { + if (closestReplica == -1) { + replica = client_id % config.n; + } else { + replica = closestReplica; + } + Debug("Sending unlogged to replica %i", replica); + } else { + replica = 0; + } + + waiting = NULL; + blockingBegin = NULL; +} + +SpanClient::~SpanClient() +{ + delete client; +} + +/* Sends BEGIN to a single shard indexed by i. */ +void +SpanClient::Begin(uint64_t id) +{ + Debug("[shard %i] BEGIN: %lu", shard, id); + + // Wait for any previous pending requests. + if (blockingBegin != NULL) { + blockingBegin->GetReply(); + delete blockingBegin; + blockingBegin = NULL; + } +} + +/* Returns the value corresponding to the supplied key. */ +void +SpanClient::Get(uint64_t id, const string &key, Promise *promise) +{ + // Send the GET operation to appropriate shard. + Debug("[shard %i] Sending GET [%s]", shard, key.c_str()); + + // create request + string request_str; + Request request; + request.set_op(Request::GET); + request.set_txnid(id); + request.mutable_get()->set_key(key); + request.SerializeToString(&request_str); + + // set to 1 second by default + int timeout = (promise != NULL) ? promise->GetTimeout() : 1000; + + transport->Timer(0, [=]() { + waiting = promise; + client->InvokeUnlogged(replica, + request_str, + bind(&SpanClient::GetCallback, + this, + placeholders::_1, + placeholders::_2), + bind(&SpanClient::GetTimeout, + this), + timeout); // timeout in ms + }); +} + +void +SpanClient::Get(uint64_t id, const string &key, + const Timestamp ×tamp, Promise *promise) +{ + // Send the GET operation to appropriate shard. + Debug("[shard %i] Sending GET [%s]", shard, key.c_str()); + + // create request + string request_str; + Request request; + request.set_op(Request::GET); + request.set_txnid(id); + request.mutable_get()->set_key(key); + timestamp.serialize(request.mutable_get()->mutable_timestamp()); + request.SerializeToString(&request_str); + + // set to 1 second by default + int timeout = (promise != NULL) ? promise->GetTimeout() : 1000; + + transport->Timer(0, [=]() { + waiting = promise; + client->InvokeUnlogged(replica, + request_str, + bind(&SpanClient::GetCallback, + this, + placeholders::_1, + placeholders::_2), + bind(&SpanClient::GetTimeout, + this), + timeout); // timeout in ms + }); +} + +void +SpanClient::Prepare(uint64_t id, const Transaction &txn, + const Timestamp ×tamp, Promise *promise) +{ + Debug("[shard %i] Sending PREPARE: %lu", shard, id); + + // create prepare request + string request_str; + Request request; + request.set_op(Request::PREPARE); + request.set_txnid(id); + txn.serialize(request.mutable_prepare()->mutable_txn()); + request.SerializeToString(&request_str); + + transport->Timer(0, [=]() { + waiting = promise; + client->Invoke(request_str, + bind(&SpanClient::PrepareCallback, + this, + placeholders::_1, + placeholders::_2)); + }); +} + +void +SpanClient::Commit(uint64_t id, const Transaction &txn, + uint64_t timestamp, Promise *promise) +{ + + Debug("[shard %i] Sending COMMIT: %lu", shard, id); + + // create commit request + string request_str; + Request request; + request.set_op(Request::COMMIT); + request.set_txnid(id); + request.mutable_commit()->set_timestamp(timestamp); + request.SerializeToString(&request_str); + + blockingBegin = new Promise(COMMIT_TIMEOUT); + transport->Timer(0, [=]() { + waiting = promise; + + client->Invoke(request_str, + bind(&SpanClient::CommitCallback, + this, + placeholders::_1, + placeholders::_2)); + }); +} + +/* Aborts the ongoing transaction. */ +void +SpanClient::Abort(uint64_t id, const Transaction &txn, Promise *promise) +{ + Debug("[shard %i] Sending ABORT: %lu", shard, id); + + // create abort request + string request_str; + Request request; + request.set_op(Request::ABORT); + request.set_txnid(id); + txn.serialize(request.mutable_abort()->mutable_txn()); + request.SerializeToString(&request_str); + + blockingBegin = new Promise(ABORT_TIMEOUT); + transport->Timer(0, [=]() { + waiting = promise; + + client->Invoke(request_str, + bind(&SpanClient::AbortCallback, + this, + placeholders::_1, + placeholders::_2)); + }); +} + +void +SpanClient::GetTimeout() +{ + if (waiting != NULL) { + Promise *w = waiting; + waiting = NULL; + w->Reply(REPLY_TIMEOUT); + } +} + +/* Callback from a shard replica on get operation completion. */ +void +SpanClient::GetCallback(const string &request_str, const string &reply_str) +{ + /* Replies back from a shard. */ + Reply reply; + reply.ParseFromString(reply_str); + + Debug("[shard %i] Received GET callback [%d]", shard, reply.status()); + if (waiting != NULL) { + Promise *w = waiting; + waiting = NULL; + if (reply.has_timestamp()) { + w->Reply(reply.status(), Timestamp(reply.timestamp()), reply.value()); + } else { + w->Reply(reply.status(), reply.value()); + } + } +} + +/* Callback from a shard replica on prepare operation completion. */ +void +SpanClient::PrepareCallback(const string &request_str, const string &reply_str) +{ + Reply reply; + + reply.ParseFromString(reply_str); + Debug("[shard %i] Received PREPARE callback [%d]", shard, reply.status()); + + if (waiting != NULL) { + Promise *w = waiting; + waiting = NULL; + if (reply.has_timestamp()) { + w->Reply(reply.status(), Timestamp(reply.timestamp(), 0)); + } else { + w->Reply(reply.status(), Timestamp()); + } + } +} + +/* Callback from a shard replica on commit operation completion. */ +void +SpanClient::CommitCallback(const string &request_str, const string &reply_str) +{ + // COMMITs always succeed. + Reply reply; + reply.ParseFromString(reply_str); + ASSERT(reply.status() == REPLY_OK); + + ASSERT(blockingBegin != NULL); + blockingBegin->Reply(0); + + if (waiting != NULL) { + Promise *w = waiting; + waiting = NULL; + w->Reply(reply.status()); + } + Debug("[shard %i] Received COMMIT callback [%d]", shard, reply.status()); +} + +/* Callback from a shard replica on abort operation completion. */ +void +SpanClient::AbortCallback(const string &request_str, const string &reply_str) +{ + // ABORTs always succeed. + Reply reply; + reply.ParseFromString(reply_str); + ASSERT(reply.status() == REPLY_OK); + + ASSERT(blockingBegin != NULL); + blockingBegin->Reply(0); + + if (waiting != NULL) { + Promise *w = waiting; + waiting = NULL; + w->Reply(reply.status()); + } + Debug("[shard %i] Received ABORT callback [%d]", shard, reply.status()); +} + + +} // namespace spanstore diff --git a/store/txnstore/spanclient.h b/store/txnstore/spanclient.h new file mode 100644 index 0000000..0bf12cd --- /dev/null +++ b/store/txnstore/spanclient.h @@ -0,0 +1,100 @@ +// -*- mode: c++; c-file-style: "k&r"; c-basic-offset: 4 -*- +// vim: set ts=4 sw=4: +/*********************************************************************** + * + * spanstore/spanclient.h: + * Single shard SpanStore client interface. + * + **********************************************************************/ + +#ifndef _SPAN_TXN_CLIENT_H_ +#define _SPAN_TXN_CLIENT_H_ + +#include "paxos-lib/lib/assert.h" +#include "paxos-lib/lib/message.h" +#include "paxos-lib/lib/transport.h" +#include "paxos-lib/common/client.h" +#include "paxos-lib/vr/client.h" +#include "common/txnclient.h" +#include "common/timestamp.h" +#include "common/transaction.h" +#include "spanstore/span-proto.pb.h" + +#include <string> +#include <mutex> +#include <condition_variable> + +namespace spanstore { + +enum Mode { + MODE_UNKNOWN, + MODE_OCC, + MODE_LOCK, + MODE_SPAN_OCC, + MODE_SPAN_LOCK +}; + +class SpanClient : public TxnClient +{ +public: + /* Constructor needs path to shard config. */ + SpanClient(Mode mode, + const std::string &configPath, + Transport *transport, + uint64_t client_id, + int shard, + int closestReplica); + ~SpanClient(); + + // Overriding from TxnClient + void Begin(uint64_t id); + void Get(uint64_t id, + const std::string &key, + Promise *promise = NULL); + void Get(uint64_t id, + const std::string &key, + const Timestamp ×tamp, + Promise *promise = NULL); + void Prepare(uint64_t id, + const Transaction &txn, + const Timestamp ×tamp = Timestamp(), + Promise *promise = NULL); + void Commit(uint64_t id, + const Transaction &txn, + uint64_t timestamp, + Promise *promise = NULL); + void Abort(uint64_t id, + const Transaction &txn, + Promise *promise = NULL); + +private: + Transport *transport; // Transport layer. + uint64_t client_id; // Unique ID for this client. + int shard; // which shard this client accesses + int replica; // which replica to use for reads + + specpaxos::Client *client; // Client proxy. + Promise *waiting; // waiting thread + Promise *blockingBegin; // block until finished + + /* Timeout for Get requests, which only go to one replica. */ + void GetTimeout(); + + /* Callbacks for hearing back from a shard for an operation. */ + void GetCallback(const std::string &, const std::string &); + void PrepareCallback(const std::string &, const std::string &); + void CommitCallback(const std::string &, const std::string &); + void AbortCallback(const std::string &, const std::string &); + + /* Helper Functions for starting and finishing requests */ + void StartRequest(); + void WaitForResponse(); + void FinishRequest(const std::string &reply_str); + void FinishRequest(); + int SendGet(const std::string &request_str); + +}; + +} // namespace spanstore + +#endif /* _SPAN_TXN_CLIENT_H_ */ -- GitLab