Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • syslab/tapir
  • aaasz/tapir
  • ashmrtnz/tapir
3 results
Show changes
Showing
with 2181 additions and 0 deletions
syntax = "proto2";
import "replication/common/request.proto";
package replication.vr.proto;
message RequestMessage {
required replication.Request req = 1;
}
message ReplyMessage {
required uint64 view = 1;
required uint64 opnum = 2;
required uint64 clientreqid = 3;
required bytes reply = 4;
}
message UnloggedRequestMessage {
required replication.UnloggedRequest req = 1;
}
message UnloggedReplyMessage {
required bytes reply = 1;
required uint64 clientreqid = 2;
}
message PrepareMessage {
required uint64 view = 1;
required uint64 opnum = 2;
required uint64 batchstart = 3;
repeated Request request = 4;
}
message PrepareOKMessage {
required uint64 view = 1;
required uint64 opnum = 2;
required uint32 replicaIdx = 3;
}
message CommitMessage {
required uint64 view = 1;
required uint64 opnum = 2;
}
message RequestStateTransferMessage {
required uint64 view = 1;
required uint64 opnum = 2;
}
message StateTransferMessage {
message LogEntry {
required uint64 view = 1;
required uint64 opnum = 2;
required replication.Request request = 3;
optional uint32 state = 4;
optional bytes hash = 5;
}
required uint64 view = 1;
required uint64 opnum = 2;
repeated LogEntry entries = 3;
}
message StartViewChangeMessage {
required uint64 view = 1;
required uint32 replicaIdx = 2;
required uint64 lastCommitted = 3;
}
message DoViewChangeMessage {
message LogEntry {
required uint64 view = 1;
required uint64 opnum = 2;
required replication.Request request = 3;
optional uint32 state = 4;
optional bytes hash = 5;
}
required uint64 view = 1;
required uint64 lastNormalView = 2;
required uint64 lastOp = 3;
required uint64 lastCommitted = 4;
repeated LogEntry entries = 5;
required uint32 replicaIdx = 6;
}
message StartViewMessage {
message LogEntry {
required uint64 view = 1;
required uint64 opnum = 2;
required replication.Request request = 3;
optional uint32 state = 4;
optional bytes hash = 5;
}
required uint64 view = 1;
required uint64 lastOp = 2;
required uint64 lastCommitted = 3;
repeated LogEntry entries = 4;
}
# How to Run
The clients and servers have to be provided a configuration file, one
for each shard and a timestamp server (for OCC). For example a 3 shard
configuration will have the following files:
shard0.config
```
f 1
replica <server-address-1>:<port>
replica <server-address-2>:<port>
replica <server-address-3>:<port>
```
shard1.config
```
f 1
replica <server-address-4>:<port>
replica <server-address-5>:<port>
replica <server-address-6>:<port>
```
shard2.config
```
f 1
replica <server-address-7>:<port>
replica <server-address-8>:<port>
replica <server-address-9>:<port>
```
shard.tss.config
```
f 1
replica <server-address-10>:<port>
replica <server-address-11>:<port>
replica <server-address-12>:<port>
```
## Running Servers
To start the replicas, run the following command with the `server`
binary for any of the stores,
`./server -c <shard-config-$n> -i <replica-number> -m <mode> -f <preload-keys>`
For each shard, you need to run `2f+1` instances of `server`
corresponding to the address:port pointed by `replica-number`.
Make sure you run all replicas for all shards.
## Running Clients
To run any of the clients in the benchmark directory,
`./client -c <shard-config-prefix> -N <n_shards> -m <mode>`
d := $(dir $(lastword $(MAKEFILE_LIST)))
SRCS += $(addprefix $(d), benchClient.cc retwisClient.cc terminalClient.cc)
OBJS-all-clients := $(OBJS-strong-client) $(OBJS-weak-client) $(OBJS-tapir-client)
$(d)benchClient: $(OBJS-all-clients) $(o)benchClient.o
$(d)retwisClient: $(OBJS-all-clients) $(o)retwisClient.o
$(d)terminalClient: $(OBJS-all-clients) $(o)terminalClient.o
BINS += $(d)benchClient $(d)retwisClient $(d)terminalClient
// -*- mode: c++; c-file-style: "k&r"; c-basic-offset: 4 -*-
/***********************************************************************
*
* store/benchmark/benchClient.cc:
* Benchmarking client for a distributed transactional store.
*
**********************************************************************/
#include "store/common/truetime.h"
#include "store/common/frontend/client.h"
#include "store/strongstore/client.h"
#include "store/weakstore/client.h"
#include "store/tapirstore/client.h"
using namespace std;
// Function to pick a random key according to some distribution.
int rand_key();
bool ready = false;
double alpha = -1;
double *zipf;
vector<string> keys;
int nKeys = 100;
int
main(int argc, char **argv)
{
const char *configPath = NULL;
const char *keysPath = NULL;
int duration = 10;
int nShards = 1;
int tLen = 10;
int wPer = 50; // Out of 100
int closestReplica = -1; // Closest replica id.
int skew = 0; // difference between real clock and TrueTime
int error = 0; // error bars
Client *client;
enum {
MODE_UNKNOWN,
MODE_TAPIR,
MODE_WEAK,
MODE_STRONG
} mode = MODE_UNKNOWN;
// Mode for strongstore.
strongstore::Mode strongmode;
int opt;
while ((opt = getopt(argc, argv, "c:d:N:l:w:k:f:m:e:s:z:r:")) != -1) {
switch (opt) {
case 'c': // Configuration path
{
configPath = optarg;
break;
}
case 'f': // Generated keys path
{
keysPath = optarg;
break;
}
case 'N': // Number of shards.
{
char *strtolPtr;
nShards = strtoul(optarg, &strtolPtr, 10);
if ((*optarg == '\0') || (*strtolPtr != '\0') ||
(nShards <= 0)) {
fprintf(stderr, "option -n requires a numeric arg\n");
}
break;
}
case 'd': // Duration in seconds to run.
{
char *strtolPtr;
duration = strtoul(optarg, &strtolPtr, 10);
if ((*optarg == '\0') || (*strtolPtr != '\0') ||
(duration <= 0)) {
fprintf(stderr, "option -n requires a numeric arg\n");
}
break;
}
case 'l': // Length of each transaction (deterministic!)
{
char *strtolPtr;
tLen = strtoul(optarg, &strtolPtr, 10);
if ((*optarg == '\0') || (*strtolPtr != '\0') ||
(tLen <= 0)) {
fprintf(stderr, "option -l requires a numeric arg\n");
}
break;
}
case 'w': // Percentage of writes (out of 100)
{
char *strtolPtr;
wPer = strtoul(optarg, &strtolPtr, 10);
if ((*optarg == '\0') || (*strtolPtr != '\0') ||
(wPer < 0 || wPer > 100)) {
fprintf(stderr, "option -w requires a arg b/w 0-100\n");
}
break;
}
case 'k': // Number of keys to operate on.
{
char *strtolPtr;
nKeys = strtoul(optarg, &strtolPtr, 10);
if ((*optarg == '\0') || (*strtolPtr != '\0') ||
(nKeys <= 0)) {
fprintf(stderr, "option -k requires a numeric arg\n");
}
break;
}
case 's': // Simulated clock skew.
{
char *strtolPtr;
skew = strtoul(optarg, &strtolPtr, 10);
if ((*optarg == '\0') || (*strtolPtr != '\0') || (skew < 0))
{
fprintf(stderr,
"option -s requires a numeric arg\n");
}
break;
}
case 'e': // Simulated clock error.
{
char *strtolPtr;
error = strtoul(optarg, &strtolPtr, 10);
if ((*optarg == '\0') || (*strtolPtr != '\0') || (error < 0))
{
fprintf(stderr,
"option -e requires a numeric arg\n");
}
break;
}
case 'z': // Zipf coefficient for key selection.
{
char *strtolPtr;
alpha = strtod(optarg, &strtolPtr);
if ((*optarg == '\0') || (*strtolPtr != '\0'))
{
fprintf(stderr,
"option -z requires a numeric arg\n");
}
break;
}
case 'r': // Preferred closest replica.
{
char *strtolPtr;
closestReplica = strtod(optarg, &strtolPtr);
if ((*optarg == '\0') || (*strtolPtr != '\0'))
{
fprintf(stderr,
"option -r requires a numeric arg\n");
}
break;
}
case 'm': // Mode to run in [occ/lock/...]
{
if (strcasecmp(optarg, "txn-l") == 0) {
mode = MODE_TAPIR;
} else if (strcasecmp(optarg, "txn-s") == 0) {
mode = MODE_TAPIR;
} else if (strcasecmp(optarg, "qw") == 0) {
mode = MODE_WEAK;
} else if (strcasecmp(optarg, "occ") == 0) {
mode = MODE_STRONG;
strongmode = strongstore::MODE_OCC;
} else if (strcasecmp(optarg, "lock") == 0) {
mode = MODE_STRONG;
strongmode = strongstore::MODE_LOCK;
} else if (strcasecmp(optarg, "span-occ") == 0) {
mode = MODE_STRONG;
strongmode = strongstore::MODE_SPAN_OCC;
} else if (strcasecmp(optarg, "span-lock") == 0) {
mode = MODE_STRONG;
strongmode = strongstore::MODE_SPAN_LOCK;
} else {
fprintf(stderr, "unknown mode '%s'\n", optarg);
exit(0);
}
break;
}
default:
fprintf(stderr, "Unknown argument %s\n", argv[optind]);
break;
}
}
if (mode == MODE_TAPIR) {
client = new tapirstore::Client(configPath, nShards,
closestReplica, TrueTime(skew, error));
} else if (mode == MODE_WEAK) {
client = new weakstore::Client(configPath, nShards,
closestReplica);
} else if (mode == MODE_STRONG) {
client = new strongstore::Client(strongmode, configPath,
nShards, closestReplica, TrueTime(skew, error));
} else {
fprintf(stderr, "option -m is required\n");
exit(0);
}
// Read in the keys from a file.
string key, value;
ifstream in;
in.open(keysPath);
if (!in) {
fprintf(stderr, "Could not read keys from: %s\n", keysPath);
exit(0);
}
for (int i = 0; i < nKeys; i++) {
getline(in, key);
keys.push_back(key);
}
in.close();
struct timeval t0, t1, t2, t3, t4;
int nTransactions = 0;
int tCount = 0;
double tLatency = 0.0;
int getCount = 0;
double getLatency = 0.0;
int putCount = 0;
double putLatency = 0.0;
int beginCount = 0;
double beginLatency = 0.0;
int commitCount = 0;
double commitLatency = 0.0;
gettimeofday(&t0, NULL);
srand(t0.tv_sec + t0.tv_usec);
while (1) {
gettimeofday(&t4, NULL);
client->Begin();
gettimeofday(&t1, NULL);
beginCount++;
beginLatency += ((t1.tv_sec - t4.tv_sec)*1000000 + (t1.tv_usec - t4.tv_usec));
for (int j = 0; j < tLen; j++) {
key = keys[rand_key()];
if (rand() % 100 < wPer) {
gettimeofday(&t3, NULL);
client->Put(key, key);
gettimeofday(&t4, NULL);
putCount++;
putLatency += ((t4.tv_sec - t3.tv_sec)*1000000 + (t4.tv_usec - t3.tv_usec));
} else {
gettimeofday(&t3, NULL);
client->Get(key, value);
gettimeofday(&t4, NULL);
getCount++;
getLatency += ((t4.tv_sec - t3.tv_sec)*1000000 + (t4.tv_usec - t3.tv_usec));
}
}
gettimeofday(&t3, NULL);
bool status = client->Commit();
gettimeofday(&t2, NULL);
commitCount++;
commitLatency += ((t2.tv_sec - t3.tv_sec)*1000000 + (t2.tv_usec - t3.tv_usec));
long latency = (t2.tv_sec - t1.tv_sec)*1000000 + (t2.tv_usec - t1.tv_usec);
fprintf(stderr, "%d %ld.%06ld %ld.%06ld %ld %d\n", nTransactions+1, t1.tv_sec,
t1.tv_usec, t2.tv_sec, t2.tv_usec, latency, status?1:0);
if (status) {
tCount++;
tLatency += latency;
}
nTransactions++;
gettimeofday(&t1, NULL);
if ( ((t1.tv_sec-t0.tv_sec)*1000000 + (t1.tv_usec-t0.tv_usec)) > duration*1000000)
break;
}
fprintf(stderr, "# Commit_Ratio: %lf\n", (double)tCount/nTransactions);
fprintf(stderr, "# Overall_Latency: %lf\n", tLatency/tCount);
fprintf(stderr, "# Begin: %d, %lf\n", beginCount, beginLatency/beginCount);
fprintf(stderr, "# Get: %d, %lf\n", getCount, getLatency/getCount);
fprintf(stderr, "# Put: %d, %lf\n", putCount, putLatency/putCount);
fprintf(stderr, "# Commit: %d, %lf\n", commitCount, commitLatency/commitCount);
return 0;
}
int rand_key()
{
if (alpha < 0) {
// Uniform selection of keys.
return (rand() % nKeys);
} else {
// Zipf-like selection of keys.
if (!ready) {
zipf = new double[nKeys];
double c = 0.0;
for (int i = 1; i <= nKeys; i++) {
c = c + (1.0 / pow((double) i, alpha));
}
c = 1.0 / c;
double sum = 0.0;
for (int i = 1; i <= nKeys; i++) {
sum += (c / pow((double) i, alpha));
zipf[i-1] = sum;
}
ready = true;
}
double random = 0.0;
while (random == 0.0 || random == 1.0) {
random = (1.0 + rand())/RAND_MAX;
}
// binary search to find key;
int l = 0, r = nKeys, mid;
while (l < r) {
mid = (l + r) / 2;
if (random > zipf[mid]) {
l = mid + 1;
} else if (random < zipf[mid]) {
r = mid - 1;
} else {
break;
}
}
return mid;
}
}
// -*- mode: c++; c-file-style: "k&r"; c-basic-offset: 4 -*-
/***********************************************************************
*
* store/benchmark/retwisClient.cc:
* Retwis benchmarking client for a distributed transactional store.
*
**********************************************************************/
#include "store/common/truetime.h"
#include "store/common/frontend/client.h"
#include "store/strongstore/client.h"
#include "store/weakstore/client.h"
#include "store/tapirstore/client.h"
#include <algorithm>
using namespace std;
// Function to pick a random key according to some distribution.
int rand_key();
bool ready = false;
double alpha = -1;
double *zipf;
vector<string> keys;
int nKeys = 100;
int
main(int argc, char **argv)
{
const char *configPath = NULL;
const char *keysPath = NULL;
int duration = 10;
int nShards = 1;
int closestReplica = -1; // Closest replica id.
int skew = 0; // difference between real clock and TrueTime
int error = 0; // error bars
Client *client;
enum {
MODE_UNKNOWN,
MODE_TAPIR,
MODE_WEAK,
MODE_STRONG
} mode = MODE_UNKNOWN;
// Mode for strongstore.
strongstore::Mode strongmode;
int opt;
while ((opt = getopt(argc, argv, "c:d:N:k:f:m:e:s:z:r:")) != -1) {
switch (opt) {
case 'c': // Configuration path
{
configPath = optarg;
break;
}
case 'f': // Generated keys path
{
keysPath = optarg;
break;
}
case 'N': // Number of shards.
{
char *strtolPtr;
nShards = strtoul(optarg, &strtolPtr, 10);
if ((*optarg == '\0') || (*strtolPtr != '\0') ||
(nShards <= 0)) {
fprintf(stderr, "option -N requires a numeric arg\n");
}
break;
}
case 'd': // Duration in seconds to run.
{
char *strtolPtr;
duration = strtoul(optarg, &strtolPtr, 10);
if ((*optarg == '\0') || (*strtolPtr != '\0') ||
(duration <= 0)) {
fprintf(stderr, "option -d requires a numeric arg\n");
}
break;
}
case 'k': // Number of keys to operate on.
{
char *strtolPtr;
nKeys = strtoul(optarg, &strtolPtr, 10);
if ((*optarg == '\0') || (*strtolPtr != '\0') ||
(nKeys <= 0)) {
fprintf(stderr, "option -k requires a numeric arg\n");
}
break;
}
case 's': // Simulated clock skew.
{
char *strtolPtr;
skew = strtoul(optarg, &strtolPtr, 10);
if ((*optarg == '\0') || (*strtolPtr != '\0') || (skew < 0))
{
fprintf(stderr,
"option -s requires a numeric arg\n");
}
break;
}
case 'e': // Simulated clock error.
{
char *strtolPtr;
error = strtoul(optarg, &strtolPtr, 10);
if ((*optarg == '\0') || (*strtolPtr != '\0') || (error < 0))
{
fprintf(stderr,
"option -e requires a numeric arg\n");
}
break;
}
case 'z': // Zipf coefficient for key selection.
{
char *strtolPtr;
alpha = strtod(optarg, &strtolPtr);
if ((*optarg == '\0') || (*strtolPtr != '\0'))
{
fprintf(stderr,
"option -z requires a numeric arg\n");
}
break;
}
case 'r': // Preferred closest replica.
{
char *strtolPtr;
closestReplica = strtod(optarg, &strtolPtr);
if ((*optarg == '\0') || (*strtolPtr != '\0'))
{
fprintf(stderr,
"option -r requires a numeric arg\n");
}
break;
}
case 'm': // Mode to run in [occ/lock/...]
{
if (strcasecmp(optarg, "txn-l") == 0) {
mode = MODE_TAPIR;
} else if (strcasecmp(optarg, "txn-s") == 0) {
mode = MODE_TAPIR;
} else if (strcasecmp(optarg, "qw") == 0) {
mode = MODE_WEAK;
} else if (strcasecmp(optarg, "occ") == 0) {
mode = MODE_STRONG;
strongmode = strongstore::MODE_OCC;
} else if (strcasecmp(optarg, "lock") == 0) {
mode = MODE_STRONG;
strongmode = strongstore::MODE_LOCK;
} else if (strcasecmp(optarg, "span-occ") == 0) {
mode = MODE_STRONG;
strongmode = strongstore::MODE_SPAN_OCC;
} else if (strcasecmp(optarg, "span-lock") == 0) {
mode = MODE_STRONG;
strongmode = strongstore::MODE_SPAN_LOCK;
} else {
fprintf(stderr, "unknown mode '%s'\n", optarg);
exit(0);
}
break;
}
default:
fprintf(stderr, "Unknown argument %s\n", argv[optind]);
break;
}
}
if (mode == MODE_TAPIR) {
client = new tapirstore::Client(configPath, nShards,
closestReplica, TrueTime(skew, error));
} else if (mode == MODE_WEAK) {
client = new weakstore::Client(configPath, nShards,
closestReplica);
} else if (mode == MODE_STRONG) {
client = new strongstore::Client(strongmode, configPath,
nShards, closestReplica, TrueTime(skew, error));
} else {
fprintf(stderr, "option -m is required\n");
exit(0);
}
// Read in the keys from a file.
string key, value;
ifstream in;
in.open(keysPath);
if (!in) {
fprintf(stderr, "Could not read keys from: %s\n", keysPath);
exit(0);
}
for (int i = 0; i < nKeys; i++) {
getline(in, key);
keys.push_back(key);
}
in.close();
struct timeval t0, t1, t2;
int nTransactions = 0; // Number of transactions attempted.
int ttype; // Transaction type.
int ret;
bool status;
vector<int> keyIdx;
gettimeofday(&t0, NULL);
srand(t0.tv_sec + t0.tv_usec);
while (1) {
keyIdx.clear();
// Begin a transaction.
client->Begin();
gettimeofday(&t1, NULL);
status = true;
// Decide which type of retwis transaction it is going to be.
ttype = rand() % 100;
if (ttype < 5) {
// 5% - Add user transaction. 1,3
keyIdx.push_back(rand_key());
keyIdx.push_back(rand_key());
keyIdx.push_back(rand_key());
sort(keyIdx.begin(), keyIdx.end());
if ((ret = client->Get(keys[keyIdx[0]], value))) {
Warning("Aborting due to %s %d", keys[keyIdx[0]].c_str(), ret);
status = false;
}
for (int i = 0; i < 3 && status; i++) {
client->Put(keys[keyIdx[i]], keys[keyIdx[i]]);
}
ttype = 1;
} else if (ttype < 20) {
// 15% - Follow/Unfollow transaction. 2,2
keyIdx.push_back(rand_key());
keyIdx.push_back(rand_key());
sort(keyIdx.begin(), keyIdx.end());
for (int i = 0; i < 2 && status; i++) {
if ((ret = client->Get(keys[keyIdx[i]], value))) {
Warning("Aborting due to %s %d", keys[keyIdx[i]].c_str(), ret);
status = false;
}
client->Put(keys[keyIdx[i]], keys[keyIdx[i]]);
}
ttype = 2;
} else if (ttype < 50) {
// 30% - Post tweet transaction. 3,5
keyIdx.push_back(rand_key());
keyIdx.push_back(rand_key());
keyIdx.push_back(rand_key());
keyIdx.push_back(rand_key());
keyIdx.push_back(rand_key());
sort(keyIdx.begin(), keyIdx.end());
for (int i = 0; i < 3 && status; i++) {
if ((ret = client->Get(keys[keyIdx[i]], value))) {
Warning("Aborting due to %s %d", keys[keyIdx[i]].c_str(), ret);
status = false;
}
client->Put(keys[keyIdx[i]], keys[keyIdx[i]]);
}
for (int i = 0; i < 2; i++) {
client->Put(keys[keyIdx[i+3]], keys[keyIdx[i+3]]);
}
ttype = 3;
} else {
// 50% - Get followers/timeline transaction. rand(1,10),0
int nGets = 1 + rand() % 10;
for (int i = 0; i < nGets; i++) {
keyIdx.push_back(rand_key());
}
sort(keyIdx.begin(), keyIdx.end());
for (int i = 0; i < nGets && status; i++) {
if ((ret = client->Get(keys[keyIdx[i]], value))) {
Warning("Aborting due to %s %d", keys[keyIdx[i]].c_str(), ret);
status = false;
}
}
ttype = 4;
}
if (status) {
status = client->Commit();
} else {
Debug("Aborting transaction due to failed Read");
}
gettimeofday(&t2, NULL);
long latency = (t2.tv_sec - t1.tv_sec) * 1000000 + (t2.tv_usec - t1.tv_usec);
int retries = 0;
if (!client->Stats().empty()) {
retries = client->Stats()[0];
}
fprintf(stderr, "%d %ld.%06ld %ld.%06ld %ld %d %d %d", ++nTransactions, t1.tv_sec,
t1.tv_usec, t2.tv_sec, t2.tv_usec, latency, status?1:0, ttype, retries);
fprintf(stderr, "\n");
if (((t2.tv_sec-t0.tv_sec)*1000000 + (t2.tv_usec-t0.tv_usec)) > duration*1000000)
break;
}
fprintf(stderr, "# Client exiting..\n");
return 0;
}
int rand_key()
{
if (alpha < 0) {
// Uniform selection of keys.
return (rand() % nKeys);
} else {
// Zipf-like selection of keys.
if (!ready) {
zipf = new double[nKeys];
double c = 0.0;
for (int i = 1; i <= nKeys; i++) {
c = c + (1.0 / pow((double) i, alpha));
}
c = 1.0 / c;
double sum = 0.0;
for (int i = 1; i <= nKeys; i++) {
sum += (c / pow((double) i, alpha));
zipf[i-1] = sum;
}
ready = true;
}
double random = 0.0;
while (random == 0.0 || random == 1.0) {
random = (1.0 + rand())/RAND_MAX;
}
// binary search to find key;
int l = 0, r = nKeys, mid;
while (l < r) {
mid = (l + r) / 2;
if (random > zipf[mid]) {
l = mid + 1;
} else if (random < zipf[mid]) {
r = mid - 1;
} else {
break;
}
}
return mid;
}
}
// -*- mode: c++; c-file-style: "k&r"; c-basic-offset: 4 -*-
/***********************************************************************
*
* bench/terminal.cc:
* A terminal client for a distributed transactional store.
*
**********************************************************************/
#include "store/common/truetime.h"
#include "store/common/frontend/client.h"
#include "store/strongstore/client.h"
#include "store/weakstore/client.h"
#include "store/tapirstore/client.h"
using namespace std;
int
main(int argc, char **argv)
{
const char *configPath = NULL;
int nShards = 1;
int closestReplica = -1; // Closest replica id.
Client *client;
enum {
MODE_UNKNOWN,
MODE_TAPIR,
MODE_WEAK,
MODE_STRONG
} mode = MODE_UNKNOWN;
// Mode for strongstore.
strongstore::Mode strongmode;
int opt;
while ((opt = getopt(argc, argv, "c:N:m:r:")) != -1) {
switch (opt) {
case 'c': // Configuration path
{
configPath = optarg;
break;
}
case 'N': // Number of shards.
{
char *strtolPtr;
nShards = strtoul(optarg, &strtolPtr, 10);
if ((*optarg == '\0') || (*strtolPtr != '\0') ||
(nShards <= 0)) {
fprintf(stderr, "option -n requires a numeric arg\n");
}
break;
}
case 'm': // Mode to run in [occ/lock/...]
{
if (strcasecmp(optarg, "txn-l") == 0) {
mode = MODE_TAPIR;
} else if (strcasecmp(optarg, "txn-s") == 0) {
mode = MODE_TAPIR;
} else if (strcasecmp(optarg, "qw") == 0) {
mode = MODE_WEAK;
} else if (strcasecmp(optarg, "occ") == 0) {
mode = MODE_STRONG;
strongmode = strongstore::MODE_OCC;
} else if (strcasecmp(optarg, "lock") == 0) {
mode = MODE_STRONG;
strongmode = strongstore::MODE_LOCK;
} else if (strcasecmp(optarg, "span-occ") == 0) {
mode = MODE_STRONG;
strongmode = strongstore::MODE_SPAN_OCC;
} else if (strcasecmp(optarg, "span-lock") == 0) {
mode = MODE_STRONG;
strongmode = strongstore::MODE_SPAN_LOCK;
} else {
fprintf(stderr, "unknown mode '%s'\n", optarg);
exit(0);
}
break;
}
case 'r':
{
char *strtolPtr;
closestReplica = strtod(optarg, &strtolPtr);
if ((*optarg == '\0') || (*strtolPtr != '\0'))
{
fprintf(stderr,
"option -r requires a numeric arg\n");
}
break;
}
default:
fprintf(stderr, "Unknown argument %s\n", argv[optind]);
break;
}
}
if (mode == MODE_TAPIR) {
client = new tapirstore::Client(configPath, nShards,
closestReplica, TrueTime(0, 0));
} else if (mode == MODE_WEAK) {
client = new weakstore::Client(configPath, nShards,
closestReplica);
} else if (mode == MODE_STRONG) {
client = new strongstore::Client(strongmode, configPath,
nShards, closestReplica, TrueTime(0, 0));
} else {
fprintf(stderr, "option -m is required\n");
exit(0);
}
char c, cmd[2048], *tok;
int clen, status;
string key, value;
while (1) {
printf(">> ");
fflush(stdout);
clen = 0;
while ((c = getchar()) != '\n')
cmd[clen++] = c;
cmd[clen] = '\0';
if (clen == 0) continue;
tok = strtok(cmd, " ,.-");
if (strcasecmp(tok, "exit") == 0 || strcasecmp(tok, "q") == 0) {
printf("Exiting..\n");
break;
} else if (strcasecmp(tok, "get") == 0) {
tok = strtok(NULL, " ,.-");
key = string(tok);
status = client->Get(key, value);
if (status == 0) {
printf("%s -> %s\n", key.c_str(), value.c_str());
} else {
printf("Error in retrieving value\n");
}
} else if (strcasecmp(tok, "put") == 0) {
tok = strtok(NULL, " ,.-");
key = string(tok);
tok = strtok(NULL, " ,.-");
value = string(tok);
client->Put(key, value);
} else if (strcasecmp(tok, "begin") == 0) {
client->Begin();
} else if (strcasecmp(tok, "commit") == 0) {
bool status = client->Commit();
if (status) {
printf("Commit succeeded..\n");
} else {
printf("Commit failed..\n");
}
} else {
printf("Unknown command.. Try again!\n");
}
fflush(stdout);
}
exit(0);
return 0;
}
d := $(dir $(lastword $(MAKEFILE_LIST)))
SRCS += $(addprefix $(d), promise.cc timestamp.cc tracer.cc \
transaction.cc truetime.cc)
PROTOS += $(addprefix $(d), common-proto.proto)
LIB-store-common := $(o)common-proto.o $(o)promise.o $(o)timestamp.o \
$(o)tracer.o $(o)transaction.o $(o)truetime.o
include $(d)backend/Rules.mk $(d)frontend/Rules.mk
d := $(dir $(lastword $(MAKEFILE_LIST)))
SRCS += $(addprefix $(d), \
kvstore.cc lockserver.cc txnstore.cc versionstore.cc)
LIB-store-backend := $(o)kvstore.o $(o)lockserver.o $(o)txnstore.o $(o)versionstore.o
include $(d)tests/Rules.mk
// -*- mode: c++; c-file-style: "k&r"; c-basic-offset: 4 -*-
/***********************************************************************
*
* common/kvstore.cc:
* Simple versioned key-value store
*
* Copyright 2015 Irene Zhang <iyzhang@cs.washington.edu>
*
* Permission is hereby granted, free of charge, to any person
* obtaining a copy of this software and associated documentation
* files (the "Software"), to deal in the Software without
* restriction, including without limitation the rights to use, copy,
* modify, merge, publish, distribute, sublicense, and/or sell copies
* of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be
* included in all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
* MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
* NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
* BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
* ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
* CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*
**********************************************************************/
#include "store/common/backend/kvstore.h"
using namespace std;
KVStore::KVStore() { }
KVStore::~KVStore() { }
bool
KVStore::get(const string &key, string &value)
{
// check for existence of key in store
if (store.find(key) == store.end() || store[key].empty()) {
return false;
} else {
value = store[key].back();
return true;
}
}
bool
KVStore::put(const string &key, const string &value)
{
store[key].push_back(value);
return true;
}
/* Delete the latest version of this key. */
bool
KVStore::remove(const string &key, string &value)
{
if (store.find(key) == store.end() || store[key].empty()) {
return false;
}
store[key].pop_back();
return true;
}
// -*- mode: c++; c-file-style: "k&r"; c-basic-offset: 4 -*-
/***********************************************************************
*
* store/common/backend/kvstore.h:
* Simple versioned key-value store
*
* Copyright 2015 Irene Zhang <iyzhang@cs.washington.edu>
*
* Permission is hereby granted, free of charge, to any person
* obtaining a copy of this software and associated documentation
* files (the "Software"), to deal in the Software without
* restriction, including without limitation the rights to use, copy,
* modify, merge, publish, distribute, sublicense, and/or sell copies
* of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be
* included in all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
* MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
* NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
* BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
* ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
* CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*
**********************************************************************/
#ifndef _KV_STORE_H_
#define _KV_STORE_H_
#include "lib/assert.h"
#include "lib/message.h"
#include <string>
#include <unordered_map>
#include <list>
class KVStore
{
public:
KVStore();
~KVStore();
bool get(const std::string &key, std::string &value);
bool put(const std::string &key, const std::string &value);
bool remove(const std::string &key, std::string &value);
private:
/* Global store which keeps key -> (timestamp, value) list. */
std::unordered_map<std::string, std::list<std::string>> store;
};
#endif /* _KV_STORE_H_ */
// -*- mode: c++; c-file-style: "k&r"; c-basic-offset: 4 -*-
/***********************************************************************
*
* spanstore/lockserver.cc:
* Simple multi-reader, single-writer lock server
*
**********************************************************************/
#include "store/common/backend/lockserver.h"
using namespace std;
LockServer::LockServer()
{
readers = 0;
writers = 0;
}
LockServer::~LockServer() { }
bool
LockServer::Waiter::checkTimeout(const struct timeval &now)
{
if (now.tv_sec > waitTime.tv_sec) {
return true;
} else {
ASSERT(now.tv_usec > waitTime.tv_usec && now.tv_sec == waitTime.tv_sec);
if (now.tv_usec - waitTime.tv_usec > LOCK_WAIT_TIMEOUT)
return true;
}
return false;
}
void
LockServer::Lock::waitForLock(uint64_t requester, bool write)
{
if (waiters.find(requester) != waiters.end()) {
// Already waiting
return;
}
Debug("[%lu] Adding me to the queue ...", requester);
// Otherwise
waiters[requester] = Waiter(write);
waitQ.push(requester);
}
bool
LockServer::Lock::tryAcquireLock(uint64_t requester, bool write)
{
if (waitQ.size() == 0) {
return true;
}
Debug("[%lu] Trying to get lock for %d", requester, (int)write);
struct timeval now;
uint64_t w = waitQ.front();
gettimeofday(&now, NULL);
// prune old requests out of the wait queue
while (waiters[w].checkTimeout(now)) {
waiters.erase(w);
waitQ.pop();
// if everyone else was old ...
if (waitQ.size() == 0) {
return true;
}
w = waitQ.front();
ASSERT(waiters.find(w) != waiters.end());
}
if (waitQ.front() == requester) {
// this lock is being reserved for the requester
waitQ.pop();
ASSERT(waiters.find(requester) != waiters.end());
ASSERT(waiters[requester].write == write);
waiters.erase(requester);
return true;
} else {
// otherwise, add me to the list
waitForLock(requester, write);
return false;
}
}
bool
LockServer::Lock::isWriteNext()
{
if (waitQ.size() == 0) return false;
struct timeval now;
uint64_t w = waitQ.front();
gettimeofday(&now, NULL);
// prune old requests out of the wait queue
while (waiters[w].checkTimeout(now)) {
waiters.erase(w);
waitQ.pop();
// if everyone else was old ...
if (waitQ.size() == 0) {
return false;
}
w = waitQ.front();
ASSERT(waiters.find(w) != waiters.end());
}
ASSERT(waiters.find(waitQ.front()) != waiters.end());
return waiters[waitQ.front()].write;
}
bool
LockServer::lockForRead(const string &lock, uint64_t requester)
{
Lock &l = locks[lock];
Debug("Lock for Read: %s [%lu %lu %lu %lu]", lock.c_str(),
readers, writers, l.holders.size(), l.waiters.size());
switch (l.state) {
case UNLOCKED:
// if you are next in the queue
if (l.tryAcquireLock(requester, false)) {
Debug("[%lu] I have acquired the read lock!", requester);
l.state = LOCKED_FOR_READ;
ASSERT(l.holders.size() == 0);
l.holders.insert(requester);
readers++;
return true;
}
return false;
case LOCKED_FOR_READ:
// if you already hold this lock
if (l.holders.find(requester) != l.holders.end()) {
return true;
}
// There is a write waiting, let's give up the lock
if (l.isWriteNext()) {
Debug("[%lu] Waiting on lock because there is a pending write request", requester);
l.waitForLock(requester, false);
return false;
}
l.holders.insert(requester);
readers++;
return true;
case LOCKED_FOR_WRITE:
case LOCKED_FOR_READ_WRITE:
if (l.holders.count(requester) > 0) {
l.state = LOCKED_FOR_READ_WRITE;
readers++;
return true;
}
ASSERT(l.holders.size() == 1);
Debug("Locked for write, held by %lu", *(l.holders.begin()));
l.waitForLock(requester, false);
return false;
}
NOT_REACHABLE();
return false;
}
bool
LockServer::lockForWrite(const string &lock, uint64_t requester)
{
Lock &l = locks[lock];
Debug("Lock for Write: %s [%lu %lu %lu %lu]", lock.c_str(),
readers, writers, l.holders.size(), l.waiters.size());
switch (l.state) {
case UNLOCKED:
// Got it!
if (l.tryAcquireLock(requester, true)) {
Debug("[%lu] I have acquired the write lock!", requester);
l.state = LOCKED_FOR_WRITE;
ASSERT(l.holders.size() == 0);
l.holders.insert(requester);
writers++;
return true;
}
return false;
case LOCKED_FOR_READ:
if (l.holders.size() == 1 && l.holders.count(requester) > 0) {
// if there is one holder of this read lock and it is the
// requester, then upgrade the lock
l.state = LOCKED_FOR_READ_WRITE;
writers++;
return true;
}
Debug("Locked for read by%s%lu other people", l.holders.count(requester) > 0 ? "you" : "", l.holders.size());
l.waitForLock(requester, true);
return false;
case LOCKED_FOR_WRITE:
case LOCKED_FOR_READ_WRITE:
ASSERT(l.holders.size() == 1);
if (l.holders.count(requester) > 0) {
return true;
}
Debug("Held by %lu for %s", *(l.holders.begin()), (l.state == LOCKED_FOR_WRITE) ? "write" : "read-write" );
l.waitForLock(requester, true);
return false;
}
NOT_REACHABLE();
return false;
}
void
LockServer::releaseForRead(const string &lock, uint64_t holder)
{
if (locks.find(lock) == locks.end()) {
return;
}
Lock &l = locks[lock];
if (l.holders.count(holder) == 0) {
Warning("[%ld] Releasing unheld read lock: %s", holder, lock.c_str());
return;
}
switch (l.state) {
case UNLOCKED:
case LOCKED_FOR_WRITE:
return;
case LOCKED_FOR_READ:
readers--;
if (l.holders.erase(holder) < 1) {
Warning("[%ld] Releasing unheld read lock: %s", holder, lock.c_str());
}
if (l.holders.empty()) {
l.state = UNLOCKED;
}
return;
case LOCKED_FOR_READ_WRITE:
readers--;
l.state = LOCKED_FOR_WRITE;
return;
}
}
void
LockServer::releaseForWrite(const string &lock, uint64_t holder)
{
if (locks.find(lock) == locks.end()) {
return;
}
Lock &l = locks[lock];
if (l.holders.count(holder) == 0) {
Warning("[%ld] Releasing unheld write lock: %s", holder, lock.c_str());
return;
}
switch (l.state) {
case UNLOCKED:
case LOCKED_FOR_READ:
return;
case LOCKED_FOR_WRITE:
writers--;
l.holders.erase(holder);
ASSERT(l.holders.size() == 0);
l.state = UNLOCKED;
return;
case LOCKED_FOR_READ_WRITE:
writers--;
l.state = LOCKED_FOR_READ;
ASSERT(l.holders.size() == 1);
return;
}
}
// -*- mode: c++; c-file-style: "k&r"; c-basic-offset: 4 -*-
/***********************************************************************
*
* store/common/backend/lockserver.h:
* Simple multi-reader, single-writer lock server.
*
* Copyright 2015 Irene Zhang <iyzhang@cs.washington.edu>
*
* Permission is hereby granted, free of charge, to any person
* obtaining a copy of this software and associated documentation
* files (the "Software"), to deal in the Software without
* restriction, including without limitation the rights to use, copy,
* modify, merge, publish, distribute, sublicense, and/or sell copies
* of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be
* included in all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
* MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
* NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
* BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
* ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
* CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*
**********************************************************************/
#ifndef _LOCK_SERVER_H_
#define _LOCK_SERVER_H_
#include "lib/assert.h"
#include "lib/message.h"
#include <sys/time.h>
#include <map>
#include <queue>
#include <string>
#include <unordered_map>
#include <unordered_set>
#define LOCK_WAIT_TIMEOUT 5000
class LockServer
{
public:
LockServer();
~LockServer();
bool lockForRead(const std::string &lock, uint64_t requester);
bool lockForWrite(const std::string &lock, uint64_t requester);
void releaseForRead(const std::string &lock, uint64_t holder);
void releaseForWrite(const std::string &lock, uint64_t holder);
private:
enum LockState {
UNLOCKED,
LOCKED_FOR_READ,
LOCKED_FOR_WRITE,
LOCKED_FOR_READ_WRITE
};
struct Waiter {
bool write;
struct timeval waitTime;
Waiter() {write = false;}
Waiter(bool w) {
gettimeofday(&waitTime, NULL);
write = w;
}
bool checkTimeout(const struct timeval &now);
};
struct Lock {
LockState state;
std::unordered_set<uint64_t> holders;
std::queue<uint64_t> waitQ;
std::map<uint64_t, Waiter> waiters;
Lock() {
state = UNLOCKED;
};
void waitForLock(uint64_t requester, bool write);
bool tryAcquireLock(uint64_t requester, bool write);
bool isWriteNext();
};
/* Global store which keep key -> (timestamp, value) list. */
std::unordered_map<std::string, Lock> locks;
uint64_t readers;
uint64_t writers;
};
#endif /* _LOCK_SERVER_H_ */
d := $(dir $(lastword $(MAKEFILE_LIST)))
#
# gtest-based tests
#
GTEST_SRCS += $(addprefix $(d), \
kvstore-test.cc \
versionstore-test.cc \
lockserver-test.cc)
$(d)kvstore-test: $(o)kvstore-test.o $(LIB-transport) $(LIB-store-common) $(LIB-store-backend) $(GTEST_MAIN)
TEST_BINS += $(d)kvstore-test
$(d)versionstore-test: $(o)versionstore-test.o $(LIB-transport) $(LIB-store-common) $(LIB-store-backend) $(GTEST_MAIN)
TEST_BINS += $(d)versionstore-test
$(d)lockserver-test: $(o)lockserver-test.o $(LIB-transport) $(LIB-store-common) $(LIB-store-backend) $(GTEST_MAIN)
TEST_BINS += $(d)lockserver-test
// -*- mode: c++; c-file-style: "k&r"; c-basic-offset: 4 -*-
/***********************************************************************
*
* kvstore-test.cc:
* test cases for simple key-value store class
*
* Copyright 2015 Irene Zhang <iyzhang@cs.washington.edu>
*
* Permission is hereby granted, free of charge, to any person
* obtaining a copy of this software and associated documentation
* files (the "Software"), to deal in the Software without
* restriction, including without limitation the rights to use, copy,
* modify, merge, publish, distribute, sublicense, and/or sell copies
* of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be
* included in all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
* MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
* NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
* BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
* ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
* CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*
**********************************************************************/
#include "store/common/backend/kvstore.h"
#include <gtest/gtest.h>
TEST(KVStore, Put)
{
KVStore store;
EXPECT_TRUE(store.put("test1", "abc"));
EXPECT_TRUE(store.put("test2", "def"));
EXPECT_TRUE(store.put("test1", "xyz"));
EXPECT_TRUE(store.put("test3", "abc"));
}
TEST(KVStore, Get)
{
KVStore store;
std::string val;
EXPECT_TRUE(store.put("test1", "abc"));
EXPECT_TRUE(store.get("test1", val));
EXPECT_EQ(val, "abc");
EXPECT_TRUE(store.put("test2", "def"));
EXPECT_TRUE(store.get("test2", val));
EXPECT_EQ(val, "def");
EXPECT_TRUE(store.put("test1", "xyz"));
EXPECT_TRUE(store.get("test1", val));
EXPECT_EQ(val, "xyz");
}
// -*- mode: c++; c-file-style: "k&r"; c-basic-offset: 4 -*-
/***********************************************************************
*
* store/common/backend/tests/lockserver-test.cc:
* test cases for simple lock server class
*
* Copyright 2015 Irene Zhang <iyzhang@cs.washington.edu>
*
* Permission is hereby granted, free of charge, to any person
* obtaining a copy of this software and associated documentation
* files (the "Software"), to deal in the Software without
* restriction, including without limitation the rights to use, copy,
* modify, merge, publish, distribute, sublicense, and/or sell copies
* of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be
* included in all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
* MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
* NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
* BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
* ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
* CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*
**********************************************************************/
#include "store/common/backend/lockserver.h"
#include <gtest/gtest.h>
TEST(LockServer, ReadLock)
{
LockServer s;
EXPECT_TRUE(s.lockForRead("x", 1));
EXPECT_TRUE(s.lockForRead("x", 2));
EXPECT_FALSE(s.lockForWrite("x", 3));
}
TEST(LockServer, WriteLock)
{
LockServer s;
EXPECT_TRUE(s.lockForWrite("x", 1));
EXPECT_FALSE(s.lockForRead("x", 2));
EXPECT_FALSE(s.lockForWrite("x", 3));
}
// -*- mode: c++; c-file-style: "k&r"; c-basic-offset: 4 -*-
/***********************************************************************
*
* store/common/backend/tests/versionstore-test.cc
* test cases for simple versioned key-value store class
*
* Copyright 2015 Irene Zhang <iyzhang@cs.washington.edu>
*
* Permission is hereby granted, free of charge, to any person
* obtaining a copy of this software and associated documentation
* files (the "Software"), to deal in the Software without
* restriction, including without limitation the rights to use, copy,
* modify, merge, publish, distribute, sublicense, and/or sell copies
* of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be
* included in all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
* MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
* NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
* BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
* ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
* CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*
**********************************************************************/
#include "store/common/backend/versionstore.h"
#include <gtest/gtest.h>
TEST(VersionedKVStore, Get)
{
VersionedKVStore store;
std::pair<Timestamp, std::string> val;
store.put("test1", "abc", Timestamp(10));
EXPECT_TRUE(store.get("test1", val));
EXPECT_EQ(val.second, "abc");
EXPECT_EQ(Timestamp(10), val.first);
store.put("test2", "def", Timestamp(10));
EXPECT_TRUE(store.get("test2", val));
EXPECT_EQ(val.second, "def");
EXPECT_EQ(Timestamp(10), val.first);
store.put("test1", "xyz", Timestamp(11));
EXPECT_TRUE(store.get("test1", val));
EXPECT_EQ(val.second, "xyz");
EXPECT_EQ(Timestamp(11), val.first);
EXPECT_TRUE(store.get("test1", Timestamp(10), val));
EXPECT_EQ(val.second, "abc");
}
// -*- mode: c++; c-file-style: "k&r"; c-basic-offset: 4 -*-
/***********************************************************************
*
* store/txnstore/lib/txnstore.h:
* Interface for a single node transactional store serving as a
* server-side backend
*
* Copyright 2013-2015 Irene Zhang <iyzhang@cs.washington.edu>
* Naveen Kr. Sharma <naveenks@cs.washington.edu>
* Dan R. K. Ports <drkp@cs.washington.edu>
*
* Permission is hereby granted, free of charge, to any person
* obtaining a copy of this software and associated documentation
* files (the "Software"), to deal in the Software without
* restriction, including without limitation the rights to use, copy,
* modify, merge, publish, distribute, sublicense, and/or sell copies
* of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be
* included in all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
* MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
* NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
* BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
* ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
* CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*
**********************************************************************/
#include "store/common/backend/txnstore.h"
using namespace std;
TxnStore::TxnStore() {}
TxnStore::~TxnStore() {}
int
TxnStore::Get(uint64_t id, const string &key, pair<Timestamp, string> &value)
{
Panic("Unimplemented GET");
return 0;
}
int
TxnStore::Get(uint64_t id, const string &key, const Timestamp &timestamp,
pair<Timestamp, string> &value)
{
Panic("Unimplemented GET");
return 0;
}
int
TxnStore::Put(uint64_t id, const string &key, const string &value)
{
Panic("Unimplemented PUT");
return 0;
}
int
TxnStore::Prepare(uint64_t id, const Transaction &txn)
{
Panic("Unimplemented PREPARE");
return 0;
}
int
TxnStore::Prepare(uint64_t id, const Transaction &txn,
const Timestamp &timestamp, Timestamp &proposed)
{
Panic("Unimplemented PREPARE");
return 0;
}
void
TxnStore::Commit(uint64_t id, uint64_t timestamp)
{
Panic("Unimplemented COMMIT");
}
void
TxnStore::Abort(uint64_t id, const Transaction &txn)
{
Panic("Unimplemented ABORT");
}
void
TxnStore::Load(const string &key, const string &value, const Timestamp &timestamp)
{
Panic("Unimplemented LOAD");
}
// -*- mode: c++; c-file-style: "k&r"; c-basic-offset: 4 -*-
/***********************************************************************
*
* store/txnstore/lib/txnstore.h:
* Interface for a single node transactional store serving as a
* server-side backend
*
* Copyright 2013-2015 Irene Zhang <iyzhang@cs.washington.edu>
* Naveen Kr. Sharma <naveenks@cs.washington.edu>
* Dan R. K. Ports <drkp@cs.washington.edu>
*
* Permission is hereby granted, free of charge, to any person
* obtaining a copy of this software and associated documentation
* files (the "Software"), to deal in the Software without
* restriction, including without limitation the rights to use, copy,
* modify, merge, publish, distribute, sublicense, and/or sell copies
* of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be
* included in all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
* MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
* NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
* BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
* ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
* CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*
**********************************************************************/
#ifndef _TXN_STORE_H_
#define _TXN_STORE_H_
#include "lib/assert.h"
#include "lib/message.h"
#include "store/common/timestamp.h"
#include "store/common/transaction.h"
class TxnStore
{
public:
TxnStore();
virtual ~TxnStore();
// add key to read set
virtual int Get(uint64_t id, const std::string &key,
std::pair<Timestamp, std::string> &value);
virtual int Get(uint64_t id, const std::string &key,
const Timestamp &timestamp, std::pair<Timestamp, std::string> &value);
// add key to write set
virtual int Put(uint64_t id, const std::string &key,
const std::string &value);
// check whether we can commit this transaction (and lock the read/write set)
virtual int Prepare(uint64_t id, const Transaction &txn);
virtual int Prepare(uint64_t id, const Transaction &txn,
const Timestamp &timestamp, Timestamp &proposed);
// commit the transaction
virtual void Commit(uint64_t id, uint64_t timestamp = 0);
// abort a running transaction
virtual void Abort(uint64_t id, const Transaction &txn = Transaction());
// load keys
virtual void Load(const std::string &key, const std::string &value,
const Timestamp &timestamp);
};
#endif /* _TXN_STORE_H_ */
// -*- mode: c++; c-file-style: "k&r"; c-basic-offset: 4 -*-
/***********************************************************************
*
* store/common/backend/versionstore.cc:
* Timestamped version store
*
* Copyright 2015 Irene Zhang <iyzhang@cs.washington.edu>
*
* Permission is hereby granted, free of charge, to any person
* obtaining a copy of this software and associated documentation
* files (the "Software"), to deal in the Software without
* restriction, including without limitation the rights to use, copy,
* modify, merge, publish, distribute, sublicense, and/or sell copies
* of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be
* included in all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
* MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
* NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
* BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
* ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
* CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*
**********************************************************************/
#include "store/common/backend/versionstore.h"
using namespace std;
VersionedKVStore::VersionedKVStore() { }
VersionedKVStore::~VersionedKVStore() { }
bool
VersionedKVStore::inStore(const string &key)
{
return store.find(key) != store.end() && store[key].size() > 0;
}
void
VersionedKVStore::getValue(const string &key, const Timestamp &t, set<VersionedKVStore::VersionedValue>::iterator &it)
{
VersionedValue v(t);
it = store[key].upper_bound(v);
// if there is no valid version at this timestamp
if (it == store[key].begin()) {
it = store[key].end();
} else {
it--;
}
}
/* Returns the most recent value and timestamp for given key.
* Error if key does not exist. */
bool
VersionedKVStore::get(const string &key, pair<Timestamp, string> &value)
{
// check for existence of key in store
if (inStore(key)) {
VersionedValue v = *(store[key].rbegin());
value = make_pair(v.write, v.value);
return true;
}
return false;
}
/* Returns the value valid at given timestamp.
* Error if key did not exist at the timestamp. */
bool
VersionedKVStore::get(const string &key, const Timestamp &t, pair<Timestamp, string> &value)
{
if (inStore(key)) {
set<VersionedValue>::iterator it;
getValue(key, t, it);
if (it != store[key].end()) {
value = make_pair((*it).write, (*it).value);
return true;
}
}
return false;
}
bool
VersionedKVStore::getRange(const string &key, const Timestamp &t,
pair<Timestamp, Timestamp> &range)
{
if (inStore(key)) {
set<VersionedValue>::iterator it;
getValue(key, t, it);
if (it != store[key].end()) {
range.first = (*it).write;
it++;
if (it != store[key].end()) {
range.second = (*it).write;
}
return true;
}
}
return false;
}
void
VersionedKVStore::put(const string &key, const string &value, const Timestamp &t)
{
// Key does not exist. Create a list and an entry.
store[key].insert(VersionedValue(t, value));
}
/*
* Commit a read by updating the timestamp of the latest read txn for
* the version of the key that the txn read.
*/
void
VersionedKVStore::commitGet(const string &key, const Timestamp &readTime, const Timestamp &commit)
{
// Hmm ... could read a key we don't have if we are behind ... do we commit this or wait for the log update?
if (inStore(key)) {
set<VersionedValue>::iterator it;
getValue(key, readTime, it);
if (it != store[key].end()) {
// figure out if anyone has read this version before
if (lastReads.find(key) != lastReads.end() &&
lastReads[key].find((*it).write) != lastReads[key].end()) {
if (lastReads[key][(*it).write] < commit) {
lastReads[key][(*it).write] = commit;
}
}
}
} // otherwise, ignore the read
}
bool
VersionedKVStore::getLastRead(const string &key, Timestamp &lastRead)
{
if (inStore(key)) {
VersionedValue v = *(store[key].rbegin());
if (lastReads.find(key) != lastReads.end() &&
lastReads[key].find(v.write) != lastReads[key].end()) {
lastRead = lastReads[key][v.write];
return true;
}
}
return false;
}
/*
* Get the latest read for the write valid at timestamp t
*/
bool
VersionedKVStore::getLastRead(const string &key, const Timestamp &t, Timestamp &lastRead)
{
if (inStore(key)) {
set<VersionedValue>::iterator it;
getValue(key, t, it);
ASSERT(it != store[key].end());
// figure out if anyone has read this version before
if (lastReads.find(key) != lastReads.end() &&
lastReads[key].find((*it).write) != lastReads[key].end()) {
lastRead = lastReads[key][(*it).write];
return true;
}
}
return false;
}
// -*- mode: c++; c-file-style: "k&r"; c-basic-offset: 4 -*-
/***********************************************************************
*
* store/common/backend/versionstore.cc:
* Timestamped version store
*
* Copyright 2015 Irene Zhang <iyzhang@cs.washington.edu>
*
* Permission is hereby granted, free of charge, to any person
* obtaining a copy of this software and associated documentation
* files (the "Software"), to deal in the Software without
* restriction, including without limitation the rights to use, copy,
* modify, merge, publish, distribute, sublicense, and/or sell copies
* of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be
* included in all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
* MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
* NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
* BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
* ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
* CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*
**********************************************************************/
#ifndef _VERSIONED_KV_STORE_H_
#define _VERSIONED_KV_STORE_H_
#include "lib/assert.h"
#include "lib/message.h"
#include "store/common/timestamp.h"
#include <set>
#include <map>
#include <unordered_map>
class VersionedKVStore
{
public:
VersionedKVStore();
~VersionedKVStore();
bool get(const std::string &key, std::pair<Timestamp, std::string> &value);
bool get(const std::string &key, const Timestamp &t, std::pair<Timestamp, std::string> &value);
bool getRange(const std::string &key, const Timestamp &t, std::pair<Timestamp, Timestamp> &range);
bool getLastRead(const std::string &key, Timestamp &readTime);
bool getLastRead(const std::string &key, const Timestamp &t, Timestamp &readTime);
void put(const std::string &key, const std::string &value, const Timestamp &t);
void commitGet(const std::string &key, const Timestamp &readTime, const Timestamp &commit);
private:
struct VersionedValue {
Timestamp write;
std::string value;
VersionedValue(Timestamp commit) : write(commit), value("tmp") { };
VersionedValue(Timestamp commit, std::string val) : write(commit), value(val) { };
friend bool operator> (const VersionedValue &v1, const VersionedValue &v2) {
return v1.write > v2.write;
};
friend bool operator< (const VersionedValue &v1, const VersionedValue &v2) {
return v1.write < v2.write;
};
};
/* Global store which keeps key -> (timestamp, value) list. */
std::unordered_map< std::string, std::set<VersionedValue> > store;
std::unordered_map< std::string, std::map< Timestamp, Timestamp > > lastReads;
bool inStore(const std::string &key);
void getValue(const std::string &key, const Timestamp &t, std::set<VersionedValue>::iterator &it);
};
#endif /* _VERSIONED_KV_STORE_H_ */