Skip to content
Snippets Groups Projects
Commit f7ae438d authored by Irene Y Zhang's avatar Irene Y Zhang
Browse files

moving and adding some more storage level files

parent c13504ca
No related branches found
No related tags found
No related merge requests found
// -*- mode: c++; c-file-style: "k&r"; c-basic-offset: 4 -*-
// vim: set ts=4 sw=4:
/***********************************************************************
*
* spanstore/client.cc:
* SpanStore client implementation.
*
**********************************************************************/
#include "spanstore/client.h"
using namespace std;
namespace spanstore {
Client::Client(Mode mode, string configPath, int nShards,
int closestReplica, TrueTime timeServer)
: transport(0.0, 0.0, 0), mode(mode), timeServer(timeServer)
{
// Initialize all state here;
client_id = 0;
while (client_id == 0) {
random_device rd;
mt19937_64 gen(rd());
uniform_int_distribution<uint64_t> dis;
client_id = dis(gen);
}
t_id = (client_id/10000)*10000;
nshards = nShards;
bclient.reserve(nshards);
Debug("Initializing SpanStore client with id [%lu]", client_id);
/* Start a client for time stamp server. */
if (mode == MODE_OCC) {
string tssConfigPath = configPath + ".tss.config";
ifstream tssConfigStream(tssConfigPath);
if (tssConfigStream.fail()) {
fprintf(stderr, "unable to read configuration file: %s\n",
tssConfigPath.c_str());
}
specpaxos::Configuration tssConfig(tssConfigStream);
tss = new specpaxos::vr::VRClient(tssConfig, &transport);
}
/* Start a client for each shard. */
for (int i = 0; i < nShards; i++) {
string shardConfigPath = configPath + to_string(i) + ".config";
SpanClient *spanclient = new SpanClient(mode, shardConfigPath,
&transport, client_id, i, closestReplica);
bclient[i] = new BufferClient(spanclient);
}
/* Run the transport in a new thread. */
clientTransport = new thread(&Client::run_client, this);
Debug("SpanStore client [%lu] created!", client_id);
}
Client::~Client()
{
transport.Stop();
delete tss;
for (auto b : bclient) {
delete b;
}
clientTransport->join();
}
/* Runs the transport event loop. */
void
Client::run_client()
{
transport.Run();
}
/* Begins a transaction. All subsequent operations before a commit() or
* abort() are part of this transaction.
*
* Return a TID for the transaction.
*/
void
Client::Begin()
{
Debug("BEGIN Transaction");
t_id++;
participants.clear();
commit_sleep = -1;
for (int i = 0; i < nshards; i++) {
bclient[i]->Begin(t_id);
}
}
/* Returns the value corresponding to the supplied key. */
int
Client::Get(const string &key, string &value)
{
// Contact the appropriate shard to get the value.
int i = key_to_shard(key, nshards);
// If needed, add this shard to set of participants and send BEGIN.
if (participants.find(i) == participants.end()) {
participants.insert(i);
}
// Send the GET operation to appropriate shard.
Promise promise(GET_TIMEOUT);
bclient[i]->Get(key, &promise);
value = promise.GetValue();
return promise.GetReply();
}
/* Sets the value corresponding to the supplied key. */
int
Client::Put(const string &key, const string &value)
{
// Contact the appropriate shard to set the value.
int i = key_to_shard(key, nshards);
// If needed, add this shard to set of participants and send BEGIN.
if (participants.find(i) == participants.end()) {
participants.insert(i);
}
Promise promise(PUT_TIMEOUT);
// Buffering, so no need to wait.
bclient[i]->Put(key, value, &promise);
return promise.GetReply();
}
int
Client::Prepare(uint64_t &ts)
{
int status;
// 1. Send commit-prepare to all shards.
Debug("PREPARE Transaction");
list<Promise *> promises;
for (auto p : participants) {
Debug("Sending prepare to shard [%d]", p);
promises.push_back(new Promise(PREPARE_TIMEOUT));
bclient[p]->Prepare(Timestamp(),promises.back());
}
// In the meantime ... go get a timestamp for OCC
if (mode == MODE_OCC) {
// Have to go to timestamp server
unique_lock<mutex> lk(cv_m);
Debug("Sending request to TimeStampServer");
tss->Invoke("", bind(&Client::tssCallback, this,
placeholders::_1,
placeholders::_2));
Debug("Waiting for TSS reply");
cv.wait(lk);
ts = stol(replica_reply, NULL, 10);
Debug("TSS reply received: %lu", ts);
}
// 2. Wait for reply from all shards. (abort on timeout)
Debug("Waiting for PREPARE replies");
status = REPLY_OK;
for (auto p : promises) {
// If any shard returned false, abort the transaction.
if (p->GetReply() != REPLY_OK) {
if (status != REPLY_FAIL) {
status = p->GetReply();
}
}
// Also, find the max of all prepare timestamp returned.
if (p->GetTimestamp().getTimestamp() > ts) {
ts = p->GetTimestamp().getTimestamp();
}
delete p;
}
return status;
}
/* Attempts to commit the ongoing transaction. */
bool
Client::Commit()
{
// Implementing 2 Phase Commit
uint64_t ts = 0;
int status;
for (int i = 0; i < COMMIT_RETRIES; i++) {
status = Prepare(ts);
if (status == REPLY_OK || status == REPLY_FAIL) {
break;
}
}
if (status == REPLY_OK) {
// For Spanner like systems, calculate timestamp.
if (mode == MODE_SPAN_OCC || mode == MODE_SPAN_LOCK) {
uint64_t now, err;
struct timeval t1, t2;
gettimeofday(&t1, NULL);
timeServer.GetTimeAndError(now, err);
if (now > ts) {
ts = now;
} else {
uint64_t diff = ((ts >> 32) - (now >> 32))*1000000 +
((ts & 0xffffffff) - (now & 0xffffffff));
err += diff;
}
commit_sleep = (int)err;
// how good are we at waking up on time?
Debug("Commit wait sleep: %lu", err);
if (err > 1000000)
Warning("Sleeping for too long! %lu; now,ts: %lu,%lu", err, now, ts);
if (err > 150) {
usleep(err-150);
}
// fine grained busy-wait
while (1) {
gettimeofday(&t2, NULL);
if ((t2.tv_sec-t1.tv_sec)*1000000 +
(t2.tv_usec-t1.tv_usec) > (int64_t)err) {
break;
}
}
}
// Send commits
Debug("COMMIT Transaction at [%lu]", ts);
for (auto p : participants) {
Debug("Sending commit to shard [%d]", p);
bclient[p]->Commit(ts);
}
return true;
}
// 4. If not, send abort to all shards.
Abort();
return false;
}
/* Aborts the ongoing transaction. */
void
Client::Abort()
{
Debug("ABORT Transaction");
for (auto p : participants) {
bclient[p]->Abort();
}
}
/* Return statistics of most recent transaction. */
vector<int>
Client::Stats()
{
vector<int> v;
return v;
}
/* Callback from a tss replica upon any request. */
void
Client::tssCallback(const string &request, const string &reply)
{
lock_guard<mutex> lock(cv_m);
Debug("Received TSS callback [%s]", reply.c_str());
// Copy reply to "replica_reply".
replica_reply = reply;
// Wake up thread waiting for the reply.
cv.notify_all();
}
} // namespace spanstore
// -*- mode: c++; c-file-style: "k&r"; c-basic-offset: 4 -*-
// vim: set ts=4 sw=4:
/***********************************************************************
*
* spanstore/client.h:
* SpanStore client interface.
*
**********************************************************************/
#ifndef _SPAN_CLIENT_H_
#define _SPAN_CLIENT_H_
#include "paxos-lib/lib/assert.h"
#include "paxos-lib/lib/message.h"
#include "paxos-lib/lib/configuration.h"
#include "paxos-lib/lib/udptransport.h"
#include "paxos-lib/common/client.h"
#include "paxos-lib/vr/client.h"
#include "common/client.h"
#include "common/bufferclient.h"
#include "common/truetime.h"
#include "common/txnstore.h"
#include "spanstore/spanclient.h"
#include "spanstore/span-proto.pb.h"
#include <condition_variable>
#include <mutex>
#include <string>
#include <set>
#include <thread>
namespace spanstore {
class Client : public ::Client
{
public:
Client(Mode mode, string configPath, int nshards,
int closestReplica, TrueTime timeServer);
~Client();
// Overriding functions from ::Client
void Begin();
int Get(const string &key, string &value);
int Put(const string &key, const string &value);
bool Commit();
void Abort();
std::vector<int> Stats();
private:
/* Private helper functions. */
void run_client(); // Runs the transport event loop.
// timestamp server call back
void tssCallback(const string &request, const string &reply);
// local Prepare function
int Prepare(uint64_t &ts);
// Unique ID for this client.
uint64_t client_id;
// Ongoing transaction ID.
uint64_t t_id;
// Number of shards in SpanStore.
long nshards;
// List of participants in the ongoing transaction.
std::set<int> participants;
// Transport used by paxos client proxies.
UDPTransport transport;
// Thread running the transport event loop.
std::thread *clientTransport;
// Buffering client for each shard.
std::vector<BufferClient *> bclient;
// Mode in which spanstore runs.
Mode mode;
// Timestamp server shard.
specpaxos::Client *tss;
// TrueTime server.
TrueTime timeServer;
// Synchronization variables.
std::condition_variable cv;
std::mutex cv_m;
string replica_reply;
// Time spend sleeping for commit.
int commit_sleep;
};
} // namespace spanstore
#endif /* _SPAN_CLIENT_H_ */
File moved
File moved
File moved
File moved
// -*- mode: c++; c-file-style: "k&r"; c-basic-offset: 4 -*-
// vim: set ts=4 sw=4:
/***********************************************************************
*
* spanstore/server.cc:
* Implementation of a single SpanStore server.
*
**********************************************************************/
#include "spanstore/server.h"
using namespace std;
namespace spanstore {
using namespace proto;
Server::Server(Mode mode, uint64_t skew, uint64_t error) : mode(mode)
{
timeServer = TrueTime(skew, error);
switch (mode) {
case MODE_LOCK:
case MODE_SPAN_LOCK:
store = new spanstore::LockStore();
break;
case MODE_OCC:
case MODE_SPAN_OCC:
store = new spanstore::OCCStore();
break;
default:
NOT_REACHABLE();
}
}
Server::~Server()
{
delete store;
}
void
Server::LeaderUpcall(opnum_t opnum, const string &str1, bool &replicate, string &str2)
{
Debug("Received LeaderUpcall: %lu %s", opnum, str1.c_str());
Request request;
Reply reply;
int status;
request.ParseFromString(str1);
switch (request.op()) {
case spanstore::proto::Request::GET:
if (request.get().has_timestamp()) {
pair<Timestamp, string> val;
status = store->Get(request.txnid(), request.get().key(),
request.get().timestamp(), val);
if (status == 0) {
reply.set_value(val.second);
}
} else {
pair<Timestamp, string> val;
status = store->Get(request.txnid(), request.get().key(), val);
if (status == 0) {
reply.set_value(val.second);
reply.set_timestamp(val.first.getTimestamp());
}
}
replicate = false;
reply.set_status(status);
reply.SerializeToString(&str2);
break;
case spanstore::proto::Request::PREPARE:
// Prepare is the only case that is conditionally run at the leader
status = store->Prepare(request.txnid(),
Transaction(request.prepare().txn()));
// if prepared, then replicate result
if (status == 0) {
replicate = true;
// get a prepare timestamp and send along to replicas
if (mode == MODE_SPAN_LOCK || mode == MODE_SPAN_OCC) {
request.mutable_prepare()->set_timestamp(timeServer.GetTime());
}
request.SerializeToString(&str2);
} else {
// if abort, don't replicate
replicate = false;
reply.set_status(status);
reply.SerializeToString(&str2);
}
break;
case spanstore::proto::Request::COMMIT:
replicate = true;
str2 = str1;
break;
case spanstore::proto::Request::ABORT:
replicate = true;
str2 = str1;
break;
default:
Panic("Unrecognized operation.");
}
}
/* Gets called when a command is issued using client.Invoke(...) to this
* replica group.
* opnum is the operation number.
* str1 is the request string passed by the client.
* str2 is the reply which will be sent back to the client.
*/
void
Server::ReplicaUpcall(opnum_t opnum,
const string &str1,
string &str2)
{
Debug("Received Upcall: %lu %s", opnum, str1.c_str());
Request request;
Reply reply;
int status = 0;
request.ParseFromString(str1);
switch (request.op()) {
case spanstore::proto::Request::GET:
return;
case spanstore::proto::Request::PREPARE:
// get a prepare timestamp and return to client
store->Prepare(request.txnid(),
Transaction(request.prepare().txn()));
if (mode == MODE_SPAN_LOCK || mode == MODE_SPAN_OCC) {
reply.set_timestamp(request.prepare().timestamp());
}
break;
case spanstore::proto::Request::COMMIT:
store->Commit(request.txnid(), request.commit().timestamp());
break;
case spanstore::proto::Request::ABORT:
store->Abort(request.txnid(), Transaction(request.abort().txn()));
break;
default:
Panic("Unrecognized operation.");
}
reply.set_status(status);
reply.SerializeToString(&str2);
}
void
Server::UnloggedUpcall(const string &str1, string &str2)
{
Request request;
Reply reply;
int status;
request.ParseFromString(str1);
ASSERT(request.op() == spanstore::proto::Request::GET);
if (request.get().has_timestamp()) {
pair<Timestamp, string> val;
status = store->Get(request.txnid(), request.get().key(),
request.get().timestamp(), val);
if (status == 0) {
reply.set_value(val.second);
}
} else {
pair<Timestamp, string> val;
status = store->Get(request.txnid(), request.get().key(), val);
if (status == 0) {
reply.set_value(val.second);
reply.set_timestamp(val.first.getTimestamp());
}
}
reply.set_status(status);
reply.SerializeToString(&str2);
}
void
Server::Load(const string &key, const string &value, const Timestamp timestamp)
{
store->Load(key, value, timestamp);
}
} // namespace spanstore
int
main(int argc, char **argv)
{
int index = -1;
unsigned int myShard=0, maxShard=1, nKeys=1;
const char *configPath = NULL;
const char *keyPath = NULL;
uint64_t skew = 0, error = 0;
spanstore::Mode mode;
// Parse arguments
int opt;
while ((opt = getopt(argc, argv, "c:i:m:e:s:f:n:N:k:")) != -1) {
switch (opt) {
case 'c':
configPath = optarg;
break;
case 'i':
{
char *strtolPtr;
index = strtoul(optarg, &strtolPtr, 10);
if ((*optarg == '\0') || (*strtolPtr != '\0') || (index < 0))
{
fprintf(stderr, "option -i requires a numeric arg\n");
}
break;
}
case 'm':
{
if (strcasecmp(optarg, "lock") == 0) {
mode = spanstore::MODE_LOCK;
} else if (strcasecmp(optarg, "occ") == 0) {
mode = spanstore::MODE_OCC;
} else if (strcasecmp(optarg, "span-lock") == 0) {
mode = spanstore::MODE_SPAN_LOCK;
} else if (strcasecmp(optarg, "span-occ") == 0) {
mode = spanstore::MODE_SPAN_OCC;
} else {
fprintf(stderr, "unknown mode '%s'\n", optarg);
}
break;
}
case 's':
{
char *strtolPtr;
skew = strtoul(optarg, &strtolPtr, 10);
if ((*optarg == '\0') || (*strtolPtr != '\0') || (skew < 0))
{
fprintf(stderr, "option -s requires a numeric arg\n");
}
break;
}
case 'e':
{
char *strtolPtr;
error = strtoul(optarg, &strtolPtr, 10);
if ((*optarg == '\0') || (*strtolPtr != '\0') || (error < 0))
{
fprintf(stderr, "option -e requires a numeric arg\n");
}
break;
}
case 'k':
{
char *strtolPtr;
nKeys = strtoul(optarg, &strtolPtr, 10);
if ((*optarg == '\0') || (*strtolPtr != '\0') || (nKeys < 0))
{
fprintf(stderr, "option -e requires a numeric arg\n");
}
break;
}
case 'n':
{
char *strtolPtr;
myShard = strtoul(optarg, &strtolPtr, 10);
if ((*optarg == '\0') || (*strtolPtr != '\0') || (myShard < 0))
{
fprintf(stderr, "option -e requires a numeric arg\n");
}
break;
}
case 'N':
{
char *strtolPtr;
maxShard = strtoul(optarg, &strtolPtr, 10);
if ((*optarg == '\0') || (*strtolPtr != '\0') || (maxShard <= 0))
{
fprintf(stderr, "option -e requires a numeric arg\n");
}
break;
}
case 'f': // Load keys from file
{
keyPath = optarg;
break;
}
default:
fprintf(stderr, "Unknown argument %s\n", argv[optind]);
}
}
if (!configPath) {
fprintf(stderr, "option -c is required\n");
}
if (index == -1) {
fprintf(stderr, "option -i is required\n");
}
if (mode == spanstore::MODE_UNKNOWN) {
fprintf(stderr, "option -m is required\n");
}
// Load configuration
std::ifstream configStream(configPath);
if (configStream.fail()) {
fprintf(stderr, "unable to read configuration file: %s\n", configPath);
}
specpaxos::Configuration config(configStream);
if (index >= config.n) {
fprintf(stderr, "replica index %d is out of bounds; "
"only %d replicas defined\n", index, config.n);
}
UDPTransport transport(0.0, 0.0, 0);
spanstore::Server server(mode, skew, error);
specpaxos::vr::VRReplica replica(config, index, &transport, 1, &server);
if (keyPath) {
string key;
ifstream in;
in.open(keyPath);
if (!in) {
fprintf(stderr, "Could not read keys from: %s\n", keyPath);
exit(0);
}
for (unsigned int i = 0; i < nKeys; i++) {
getline(in, key);
uint64_t hash = 5381;
const char* str = key.c_str();
for (unsigned int j = 0; j < key.length(); j++) {
hash = ((hash << 5) + hash) + (uint64_t)str[j];
}
if (hash % maxShard == myShard) {
server.Load(key, "null", Timestamp());
}
}
in.close();
}
transport.Run();
return 0;
}
// -*- mode: c++; c-file-style: "k&r"; c-basic-offset: 4 -*-
// vim: set ts=4 sw=4:
/***********************************************************************
*
* spanstore/replica.h:
* A single SpanStore server replica.
*
**********************************************************************/
#ifndef _SPAN_SERVER_H_
#define _SPAN_SERVER_H_
#include "paxos-lib/lib/configuration.h"
#include "paxos-lib/common/replica.h"
#include "paxos-lib/lib/udptransport.h"
#include "paxos-lib/vr/replica.h"
#include "common/truetime.h"
#include "common/txnstore.h"
#include "spanstore/occstore.h"
#include "spanstore/lockstore.h"
#include "spanstore/span-proto.pb.h"
namespace spanstore {
enum Mode {
MODE_UNKNOWN,
MODE_OCC,
MODE_LOCK,
MODE_SPAN_OCC,
MODE_SPAN_LOCK
};
class Server : public specpaxos::AppReplica
{
public:
Server(Mode mode, uint64_t skew, uint64_t error);
virtual ~Server();
virtual void LeaderUpcall(opnum_t opnum, const string &str1, bool &replicate, string &str2);
virtual void ReplicaUpcall(opnum_t opnum, const string &str1, string &str2);
virtual void UnloggedUpcall(const string &str1, string &str2);
void Load(const string &key, const string &value, const Timestamp timestamp);
private:
Mode mode;
TxnStore *store;
TrueTime timeServer;
};
} // namespace spanstore
#endif /* _SPAN_SERVER_H_ */
import "common/common-proto.proto";
package spanstore.proto;
message GetMessage {
required string key = 1;
optional TimestampMessage timestamp = 2;
}
message PrepareMessage {
required TransactionMessage txn = 1;
optional uint64 timestamp = 2;
}
message CommitMessage {
required uint64 timestamp = 1;
}
message AbortMessage {
required TransactionMessage txn = 1;
}
message Request {
enum Operation {
GET = 1;
PREPARE = 2;
COMMIT = 3;
ABORT = 4;
}
required Operation op = 1;
required uint64 txnid = 2;
optional GetMessage get = 3;
optional PrepareMessage prepare = 4;
optional CommitMessage commit = 5;
optional AbortMessage abort = 6;
}
message Reply {
// 0 = OK
// -1 = failed
// -2 = retry
// -3 = abstain/no reply
required int32 status = 1;
optional string value = 2;
optional uint64 timestamp = 3;
}
// -*- mode: c++; c-file-style: "k&r"; c-basic-offset: 4 -*-
// vim: set ts=4 sw=4:
/***********************************************************************
*
* spanstore/spanclient.cc:
* Single shard SpanStore client implementation.
*
**********************************************************************/
#include "spanstore/spanclient.h"
#include "common/txnstore.h"
namespace spanstore {
using namespace std;
using namespace proto;
SpanClient::SpanClient(Mode mode, const string &configPath,
Transport *transport, uint64_t client_id, int
shard, int closestReplica)
: transport(transport), client_id(client_id), shard(shard)
{
ifstream configStream(configPath);
if (configStream.fail()) {
fprintf(stderr, "unable to read configuration file: %s\n",
configPath.c_str());
}
specpaxos::Configuration config(configStream);
client = new specpaxos::vr::VRClient(config, transport);
if (mode == MODE_OCC || mode == MODE_SPAN_OCC) {
if (closestReplica == -1) {
replica = client_id % config.n;
} else {
replica = closestReplica;
}
Debug("Sending unlogged to replica %i", replica);
} else {
replica = 0;
}
waiting = NULL;
blockingBegin = NULL;
}
SpanClient::~SpanClient()
{
delete client;
}
/* Sends BEGIN to a single shard indexed by i. */
void
SpanClient::Begin(uint64_t id)
{
Debug("[shard %i] BEGIN: %lu", shard, id);
// Wait for any previous pending requests.
if (blockingBegin != NULL) {
blockingBegin->GetReply();
delete blockingBegin;
blockingBegin = NULL;
}
}
/* Returns the value corresponding to the supplied key. */
void
SpanClient::Get(uint64_t id, const string &key, Promise *promise)
{
// Send the GET operation to appropriate shard.
Debug("[shard %i] Sending GET [%s]", shard, key.c_str());
// create request
string request_str;
Request request;
request.set_op(Request::GET);
request.set_txnid(id);
request.mutable_get()->set_key(key);
request.SerializeToString(&request_str);
// set to 1 second by default
int timeout = (promise != NULL) ? promise->GetTimeout() : 1000;
transport->Timer(0, [=]() {
waiting = promise;
client->InvokeUnlogged(replica,
request_str,
bind(&SpanClient::GetCallback,
this,
placeholders::_1,
placeholders::_2),
bind(&SpanClient::GetTimeout,
this),
timeout); // timeout in ms
});
}
void
SpanClient::Get(uint64_t id, const string &key,
const Timestamp &timestamp, Promise *promise)
{
// Send the GET operation to appropriate shard.
Debug("[shard %i] Sending GET [%s]", shard, key.c_str());
// create request
string request_str;
Request request;
request.set_op(Request::GET);
request.set_txnid(id);
request.mutable_get()->set_key(key);
timestamp.serialize(request.mutable_get()->mutable_timestamp());
request.SerializeToString(&request_str);
// set to 1 second by default
int timeout = (promise != NULL) ? promise->GetTimeout() : 1000;
transport->Timer(0, [=]() {
waiting = promise;
client->InvokeUnlogged(replica,
request_str,
bind(&SpanClient::GetCallback,
this,
placeholders::_1,
placeholders::_2),
bind(&SpanClient::GetTimeout,
this),
timeout); // timeout in ms
});
}
void
SpanClient::Prepare(uint64_t id, const Transaction &txn,
const Timestamp &timestamp, Promise *promise)
{
Debug("[shard %i] Sending PREPARE: %lu", shard, id);
// create prepare request
string request_str;
Request request;
request.set_op(Request::PREPARE);
request.set_txnid(id);
txn.serialize(request.mutable_prepare()->mutable_txn());
request.SerializeToString(&request_str);
transport->Timer(0, [=]() {
waiting = promise;
client->Invoke(request_str,
bind(&SpanClient::PrepareCallback,
this,
placeholders::_1,
placeholders::_2));
});
}
void
SpanClient::Commit(uint64_t id, const Transaction &txn,
uint64_t timestamp, Promise *promise)
{
Debug("[shard %i] Sending COMMIT: %lu", shard, id);
// create commit request
string request_str;
Request request;
request.set_op(Request::COMMIT);
request.set_txnid(id);
request.mutable_commit()->set_timestamp(timestamp);
request.SerializeToString(&request_str);
blockingBegin = new Promise(COMMIT_TIMEOUT);
transport->Timer(0, [=]() {
waiting = promise;
client->Invoke(request_str,
bind(&SpanClient::CommitCallback,
this,
placeholders::_1,
placeholders::_2));
});
}
/* Aborts the ongoing transaction. */
void
SpanClient::Abort(uint64_t id, const Transaction &txn, Promise *promise)
{
Debug("[shard %i] Sending ABORT: %lu", shard, id);
// create abort request
string request_str;
Request request;
request.set_op(Request::ABORT);
request.set_txnid(id);
txn.serialize(request.mutable_abort()->mutable_txn());
request.SerializeToString(&request_str);
blockingBegin = new Promise(ABORT_TIMEOUT);
transport->Timer(0, [=]() {
waiting = promise;
client->Invoke(request_str,
bind(&SpanClient::AbortCallback,
this,
placeholders::_1,
placeholders::_2));
});
}
void
SpanClient::GetTimeout()
{
if (waiting != NULL) {
Promise *w = waiting;
waiting = NULL;
w->Reply(REPLY_TIMEOUT);
}
}
/* Callback from a shard replica on get operation completion. */
void
SpanClient::GetCallback(const string &request_str, const string &reply_str)
{
/* Replies back from a shard. */
Reply reply;
reply.ParseFromString(reply_str);
Debug("[shard %i] Received GET callback [%d]", shard, reply.status());
if (waiting != NULL) {
Promise *w = waiting;
waiting = NULL;
if (reply.has_timestamp()) {
w->Reply(reply.status(), Timestamp(reply.timestamp()), reply.value());
} else {
w->Reply(reply.status(), reply.value());
}
}
}
/* Callback from a shard replica on prepare operation completion. */
void
SpanClient::PrepareCallback(const string &request_str, const string &reply_str)
{
Reply reply;
reply.ParseFromString(reply_str);
Debug("[shard %i] Received PREPARE callback [%d]", shard, reply.status());
if (waiting != NULL) {
Promise *w = waiting;
waiting = NULL;
if (reply.has_timestamp()) {
w->Reply(reply.status(), Timestamp(reply.timestamp(), 0));
} else {
w->Reply(reply.status(), Timestamp());
}
}
}
/* Callback from a shard replica on commit operation completion. */
void
SpanClient::CommitCallback(const string &request_str, const string &reply_str)
{
// COMMITs always succeed.
Reply reply;
reply.ParseFromString(reply_str);
ASSERT(reply.status() == REPLY_OK);
ASSERT(blockingBegin != NULL);
blockingBegin->Reply(0);
if (waiting != NULL) {
Promise *w = waiting;
waiting = NULL;
w->Reply(reply.status());
}
Debug("[shard %i] Received COMMIT callback [%d]", shard, reply.status());
}
/* Callback from a shard replica on abort operation completion. */
void
SpanClient::AbortCallback(const string &request_str, const string &reply_str)
{
// ABORTs always succeed.
Reply reply;
reply.ParseFromString(reply_str);
ASSERT(reply.status() == REPLY_OK);
ASSERT(blockingBegin != NULL);
blockingBegin->Reply(0);
if (waiting != NULL) {
Promise *w = waiting;
waiting = NULL;
w->Reply(reply.status());
}
Debug("[shard %i] Received ABORT callback [%d]", shard, reply.status());
}
} // namespace spanstore
// -*- mode: c++; c-file-style: "k&r"; c-basic-offset: 4 -*-
// vim: set ts=4 sw=4:
/***********************************************************************
*
* spanstore/spanclient.h:
* Single shard SpanStore client interface.
*
**********************************************************************/
#ifndef _SPAN_TXN_CLIENT_H_
#define _SPAN_TXN_CLIENT_H_
#include "paxos-lib/lib/assert.h"
#include "paxos-lib/lib/message.h"
#include "paxos-lib/lib/transport.h"
#include "paxos-lib/common/client.h"
#include "paxos-lib/vr/client.h"
#include "common/txnclient.h"
#include "common/timestamp.h"
#include "common/transaction.h"
#include "spanstore/span-proto.pb.h"
#include <string>
#include <mutex>
#include <condition_variable>
namespace spanstore {
enum Mode {
MODE_UNKNOWN,
MODE_OCC,
MODE_LOCK,
MODE_SPAN_OCC,
MODE_SPAN_LOCK
};
class SpanClient : public TxnClient
{
public:
/* Constructor needs path to shard config. */
SpanClient(Mode mode,
const std::string &configPath,
Transport *transport,
uint64_t client_id,
int shard,
int closestReplica);
~SpanClient();
// Overriding from TxnClient
void Begin(uint64_t id);
void Get(uint64_t id,
const std::string &key,
Promise *promise = NULL);
void Get(uint64_t id,
const std::string &key,
const Timestamp &timestamp,
Promise *promise = NULL);
void Prepare(uint64_t id,
const Transaction &txn,
const Timestamp &timestamp = Timestamp(),
Promise *promise = NULL);
void Commit(uint64_t id,
const Transaction &txn,
uint64_t timestamp,
Promise *promise = NULL);
void Abort(uint64_t id,
const Transaction &txn,
Promise *promise = NULL);
private:
Transport *transport; // Transport layer.
uint64_t client_id; // Unique ID for this client.
int shard; // which shard this client accesses
int replica; // which replica to use for reads
specpaxos::Client *client; // Client proxy.
Promise *waiting; // waiting thread
Promise *blockingBegin; // block until finished
/* Timeout for Get requests, which only go to one replica. */
void GetTimeout();
/* Callbacks for hearing back from a shard for an operation. */
void GetCallback(const std::string &, const std::string &);
void PrepareCallback(const std::string &, const std::string &);
void CommitCallback(const std::string &, const std::string &);
void AbortCallback(const std::string &, const std::string &);
/* Helper Functions for starting and finishing requests */
void StartRequest();
void WaitForResponse();
void FinishRequest(const std::string &reply_str);
void FinishRequest();
int SendGet(const std::string &request_str);
};
} // namespace spanstore
#endif /* _SPAN_TXN_CLIENT_H_ */
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment