Skip to content
Snippets Groups Projects
Commit 4ecdee4e authored by Irene Y Zhang's avatar Irene Y Zhang
Browse files

reorganizing some storage files and renaming

parent f7ae438d
No related branches found
No related tags found
No related merge requests found
Showing
with 800 additions and 0 deletions
File moved
File moved
File moved
File moved
File moved
File moved
File moved
File moved
d := $(dir $(lastword $(MAKEFILE_LIST)))
SRCS += $(addprefix $(d), \
client.cc qwclient.cc \
qwstore.cc replica.cc server.cc )
PROTOS += $(addprefix $(d), qw-proto.proto)
OBJS-qw-store := $(LIB-store) $(o)qw-proto.o $(o)qwstore.o $(o)server.o
OBJS-qw-client := $(LIB-message) $(LIB-udptransport) $(LIB-request) $(LIB-common) \
$(LIB-client) $(o)qw-proto.o $(o)qwclient.o $(o)client.o
$(d)server: $(LIB-message) $(LIB-udptransport) $(LIB-request) \
$(LIB-common) $(OBJS-qw-store) $(o)replica.o
BINS += $(d)server
// -*- mode: c++; c-file-style: "k&r"; c-basic-offset: 4 -*-
// vim: set ts=4 sw=4:
/***********************************************************************
*
* qwstore/client.cc:
* Single QWstore client. Implements the API functionalities.
*
**********************************************************************/
#include "qwstore/client.h"
namespace qwstore {
using namespace std;
using namespace proto;
Client::Client(string configPath, int nShards, int closestReplica)
: transport(0.0, 0.0, 0)
{
// Initialize all state here;
client_id = 0;
while (client_id == 0) {
random_device rd;
mt19937_64 gen(rd());
uniform_int_distribution<uint64_t> dis;
client_id = dis(gen);
}
nshards = nShards;
bclient.reserve(nShards);
Debug("Initializing Orstore client with id [%lu]", client_id);
/* Start a client for each shard. */
for (int i = 0; i < nShards; i++) {
string shardConfigPath = configPath + to_string(i) + ".config";
bclient[i] = new QWClient(shardConfigPath, &transport,
client_id, i, closestReplica);
}
/* Run the transport in a new thread. */
clientTransport = new thread(&Client::run_client, this);
Debug("QWstore client [%lu] created!", client_id);
}
Client::~Client()
{
transport.Stop();
for (auto b : bclient) {
delete b;
}
clientTransport->join();
}
/* Runs the transport event loop. */
void
Client::run_client()
{
transport.Run();
}
/* Returns the value corresponding to the supplied key. */
int
Client::Get(const string &key, string &value)
{
Debug("GET Operation [%s]", key.c_str());
// Contact the appropriate shard to get the value.
int i = key_to_shard(key, nshards);
// Send the GET operation to appropriate shard.
Promise promise;
bclient[i]->Get(client_id, key, &promise);
value = promise.GetValue();
return promise.GetReply();
}
/* Sets the value corresponding to the supplied key. */
int
Client::Put(const string &key,
const string &value)
{
Debug("PUT Operation [%s]", key.c_str());
// Contact the appropriate shard to set the value.
int i = key_to_shard(key, nshards);
// Send the GET operation to appropriate shard.
Promise promise;
bclient[i]->Put(client_id, key, value, &promise);
return promise.GetReply();
}
} // namespace qwstore
// -*- mode: c++; c-file-style: "k&r"; c-basic-offset: 4 -*-
// vim: set ts=4 sw=4:
/***********************************************************************
*
* qwstore/client.h:
* QWstore client-side logic and APIs
*
**********************************************************************/
#ifndef _QW_CLIENT_H_
#define _QW_CLIENT_H_
#include "paxos-lib/lib/assert.h"
#include "paxos-lib/lib/message.h"
#include "paxos-lib/lib/configuration.h"
#include "paxos-lib/lib/udptransport.h"
#include "paxos-lib/common/client.h"
#include "paxos-lib/vr/client.h"
#include "common/client.h"
#include "common/bufferclient.h"
#include "common/truetime.h"
#include "qwstore/qwclient.h"
#include "qwstore/qw-proto.pb.h"
#include <string>
#include <thread>
#include <set>
namespace qwstore {
class Client : public ::Client
{
public:
Client(std::string configPath, int nshards, int closestReplica);
~Client();
// Overriding methods from ::Client
void Begin() {};
int Get(const std::string &key, std::string &value);
int Put(const std::string &key, const std::string &value);
bool Commit() { return true; };
void Abort() {};
private:
/* Private helper functions. */
void run_client(); // Runs the transport event loop.
// Unique ID for this client.
uint64_t client_id;
// Number of shards in this deployment
uint64_t nshards;
// Transport used by shard clients.
UDPTransport transport;
// Thread running the transport event loop.
std::thread *clientTransport;
// Client for each shard.
std::vector<QWClient *> bclient;
};
} // namespace qwstore
#endif /* _QW_CLIENT_H_ */
package qwstore.proto;
message ReplyMessage {
// 0 = OK
// -1 = failed
// -2 = retry
// -3 = abstain/no reply
// -4 = network timeout
required int32 status = 1;
}
message GetMessage {
required uint64 clientid = 1;
required string key = 2;
}
message GetReplyMessage {
required int32 status = 1;
required string value = 2;
}
message PutMessage {
required uint64 clientid = 1;
required string key = 2;
required string value = 3;
}
message PutReplyMessage {
required int32 status = 1;
}
\ No newline at end of file
// -*- mode: c++; c-file-style: "k&r"; c-basic-offset: 4 -*-
// vim: set ts=4 sw=4:
/***********************************************************************
*
* qwstore/qwclient.cc
* Client for one QWStore shard.
*
**********************************************************************/
#include "qwstore/qwclient.h"
#include "common/txnstore.h"
using namespace std;
namespace qwstore {
using namespace proto;
QWClient::QWClient(string configPath, Transport *transport,
uint64_t client_id, int shard, int closestReplica) :
transport(transport), client_id(client_id), shard(shard)
{
ifstream configStream(configPath);
if (configStream.fail()) {
fprintf(stderr, "unable to read configuration file: %s\n",
configPath.c_str());
}
config = new specpaxos::Configuration(configStream);
transport->Register(this, *config, -1);
timeout = new Timeout(transport, 250, [this]() {
RequestTimedOut();
});
if (closestReplica == -1) {
replica = client_id % config->n;
} else {
replica = closestReplica;
}
waiting = NULL;
}
QWClient::~QWClient()
{
delete config;
delete timeout;
}
void
QWClient::Get(uint64_t id, const string &key, Promise *promise)
{
// Create get request
GetMessage msg;
msg.set_clientid(client_id);
msg.set_key(key);
ASSERT(waiting == NULL);
waiting = promise;
// Send message
transport->Timer(0, [=]() {
if (transport->SendMessageToReplica(this, replica, msg)) {
if (waiting != NULL) {
timeout->SetTimeout(promise->GetTimeout());
timeout->Start();
}
} else if (waiting != NULL) {
Promise *w = waiting;
waiting = NULL;
w->Reply(REPLY_NETWORK_FAILURE);
}
});
}
void
QWClient::Put(uint64_t id,
const string &key,
const string &value,
Promise *promise)
{
Debug("[shard %d] Sending PUT [%s %s]", shard, key.c_str(), value.c_str());
// Creating put request
PutMessage msg;
msg.set_clientid(client_id);
msg.set_key(key);
msg.set_value(value);
ASSERT(waiting == NULL);
waiting = promise;
// clear the reply counter
totalReplies = 0;
// Send messages
transport->Timer(0, [=]() {
// always send to leader for now
if (transport->SendMessageToAll(this, msg)) {
// set the timeout
if (waiting != NULL) {
timeout->SetTimeout(waiting->GetTimeout());
timeout->Start();
}
} else if (waiting != NULL) {
Promise *w = waiting;
waiting = NULL;
w->Reply(REPLY_NETWORK_FAILURE);
}
});
}
// Callbacks that happen in the transport thread
void
QWClient::RequestTimedOut()
{
Debug("[shard %d] Timeout", shard);
timeout->Stop();
if (waiting != NULL) {
Promise *w = waiting;
waiting = NULL;
w->Reply(REPLY_TIMEOUT);
}
}
void
QWClient::ReceiveMessage(const TransportAddress &remote,
const string &type,
const string &data)
{
static GetReplyMessage getReply;
static PutReplyMessage putReply;
Debug("Received reply type: %s", type.c_str());
if (type == getReply.GetTypeName()) {
getReply.ParseFromString(data);
if (waiting != NULL) {
timeout->Stop();
Promise *w = waiting;
waiting = NULL;
w->Reply(getReply.status(),getReply.value());
}
} else if (type == putReply.GetTypeName()) {
totalReplies++;
if (totalReplies >= config->n) {
if (waiting != NULL) {
timeout->Stop();
Promise *w = waiting;
waiting = NULL;
w->Reply(REPLY_OK);
}
}
} else {
NOT_REACHABLE();
}
}
} // namespace qwstore
// -*- mode: c++; c-file-style: "k&r"; c-basic-offset: 4 -*-
// vim: set ts=4 sw=4:
/***********************************************************************
*
* qwstore/qwclient.h
* Client-side module for QWStore clients.
*
**********************************************************************/
#ifndef _QW_TXN_CLIENT_H_
#define _QW_TXN_CLIENT_H_
#include "paxos-lib/lib/assert.h"
#include "paxos-lib/lib/message.h"
#include "paxos-lib/lib/configuration.h"
#include "paxos-lib/lib/udptransport.h"
#include "common/timestamp.h"
#include "common/transaction.h"
#include "common/txnclient.h"
#include "qwstore/qw-proto.pb.h"
#include <set>
#include <thread>
#define COMMIT_RETRIES 5
namespace qwstore {
class QWClient : public TransportReceiver
{
public:
QWClient(std::string configPath,
Transport *transport,
uint64_t client_id,
int shard,
int closestReplica);
~QWClient();
void Get(uint64_t id, const std::string &key, Promise *promise);
void Put(uint64_t id, const std::string &key, const std::string &value, Promise *promise);
// Overriding from TransportReceiver
void ReceiveMessage(const TransportAddress &remote, const std::string &type, const std::string &data);
private:
specpaxos::Configuration *config;
Transport *transport; // Transport to replicas
uint64_t client_id;
int shard;
int replica;
Timeout *timeout; // Timeout for general requests
int totalReplies;
Promise *waiting;
void RequestTimedOut();
};
} // namespace qwstore
#endif /* _QW_TXN_CLIENT_H_ */
// -*- mode: c++; c-file-style: "k&r"; c-basic-offset: 4 -*-
// vim: set ts=4 sw=4:
/***********************************************************************
*
* qwstore/qwstore.cc:
* Simple quorum write key-value store implementation
*
**********************************************************************/
#include "qwstore/qwstore.h"
using namespace std;
namespace qwstore {
QWStore::QWStore() { }
QWStore::~QWStore() { }
int
QWStore::Get(uint64_t id, const string &key, string &value)
{
Debug("[%lu] GET %s", id, key.c_str());
string val;
if (store.get(key, val)) {
value = val;
return REPLY_OK;
}
return REPLY_FAIL;
}
int
QWStore::Put(uint64_t id, const string &key, const string &value)
{
Debug("[%lu] PUT %s %s", id, key.c_str(), value.c_str());
store.put(key, value);
return REPLY_OK;
}
void
QWStore::Load(const string &key, const string &value)
{
store.put(key, value);
}
} // namespace qwstore
// -*- mode: c++; c-file-style: "k&r"; c-basic-offset: 4 -*-
// vim: set ts=4 sw=4:
/***********************************************************************
*
* qwstore/qwstore.h:
* Simple quorum write key-value store
*
**********************************************************************/
#ifndef _QW_STORE_H_
#define _QW_STORE_H_
#include "paxos-lib/lib/assert.h"
#include "paxos-lib/lib/message.h"
#include "common/kvstore.h"
#include "common/timestamp.h"
#include "common/transaction.h"
#include "common/txnstore.h"
namespace qwstore {
class QWStore
{
private:
KVStore store;
public:
QWStore();
~QWStore();
// add key to read set
virtual int Get(uint64_t id, const std::string &key, std::string &value);
// add key to write set
virtual int Put(uint64_t id, const std::string &key, const std::string &value);
// load keys
virtual void Load(const std::string &key, const std::string &value);
};
} // namespace qwstore
#endif /* _QW_STORE_H_ */
// -*- mode: c++; c-file-style: "k&r"; c-basic-offset: 4 -*-
// vim: set ts=4 sw=4:
/***********************************************************************
*
* qwstore/replica.cc
* Single QWStore storage server executable
*
**********************************************************************/
#include "qwstore/replica.h"
using namespace std;
using namespace qwstore;
static void Usage(const char *progName)
{
fprintf(stderr, "usage: %s -c conf-file -i replica-index\n",
progName);
exit(1);
}
int
main(int argc, char **argv)
{
int index = -1;
const char *configPath = NULL;
const char *keyPath = NULL;
// Parse arguments
int opt;
while ((opt = getopt(argc, argv, "c:i:f:")) != -1) {
switch (opt) {
case 'c':
configPath = optarg;
break;
case 'i':
{
char *strtolPtr;
index = strtoul(optarg, &strtolPtr, 10);
if ((*optarg == '\0') || (*strtolPtr != '\0') || (index < 0))
{
fprintf(stderr,
"option -i requires a numeric arg\n");
Usage(argv[0]);
}
break;
}
case 'f': // Load keys from file
keyPath = optarg;
break;
default:
fprintf(stderr, "Unknown argument %s\n", argv[optind]);
break;
}
}
if (!configPath) {
fprintf(stderr, "option -c is required\n");
Usage(argv[0]);
}
if (index == -1) {
fprintf(stderr, "option -i is required\n");
Usage(argv[0]);
}
// Load configuration
std::ifstream configStream(configPath);
if (configStream.fail()) {
fprintf(stderr, "unable to read configuration file: %s\n",
configPath);
Usage(argv[0]);
}
specpaxos::Configuration config(configStream);
if (index >= config.n) {
fprintf(stderr, "replica index %d is out of bounds; "
"only %d replicas defined\n", index, config.n);
Usage(argv[0]);
}
UDPTransport transport(0.0, 0.0, 0);
Server *server;
server = new Server(config, index, &transport, new QWStore());
if (keyPath) {
string key;
ifstream in;
in.open(keyPath);
if (!in) {
fprintf(stderr, "Could not read keys from: %s\n", keyPath);
exit(0);
}
for (int i = 0; i < 100000; i++) {
getline(in, key);
server->Load(key, key);
}
in.close();
}
transport.Run();
return 0;
}
// -*- mode: c++; c-file-style: "k&r"; c-basic-offset: 4 -*-
// vim: set ts=4 sw=4:
/***********************************************************************
*
* qwstore/replica.h:
* Runs a single ORStore server replica
*
**********************************************************************/
#ifndef _QW_REPLICA_H_
#define _QW_REPLICA_H_
#include "paxos-lib/lib/configuration.h"
#include "paxos-lib/lib/udptransport.h"
#include "qwstore/server.h"
#include "qwstore/qwstore.h"
#endif /* _QW_REPLICA_H_ */
// -*- mode: c++; c-file-style: "k&r"; c-basic-offset: 4 -*-
/***********************************************************************
*
* qwstore/server.cc:
* QWStore storage server. Mostly dispatch code
*
**********************************************************************/
#include "qwstore/server.h"
namespace qwstore {
using namespace proto;
Server::Server(const specpaxos::Configuration &configuration, int myIdx,
Transport *transport, QWStore *store)
: store(store), configuration(configuration), myIdx(myIdx), transport(transport)
{
transport->Register(this, configuration, myIdx);
}
Server::~Server() { }
void
Server::ReceiveMessage(const TransportAddress &remote,
const string &type, const string &data)
{
#if CLIENT_NETWORK_DELAY
TransportAddress *r = remote.clone();
transport->Timer(CLIENT_NETWORK_DELAY, [=]() {
HandleMessage(*r, type, data);
delete r;
});
#else
HandleMessage(remote, type, data);
#endif
}
void
Server::HandleMessage(const TransportAddress &remote,
const string &type, const string &data)
{
static GetMessage get;
static PutMessage put;
if (type == get.GetTypeName()) {
get.ParseFromString(data);
HandleGet(remote, get);
} else if (type == put.GetTypeName()) {
put.ParseFromString(data);
HandlePut(remote, put);
} else {
Panic("Received unexpected message type in OR proto: %s",
type.c_str());
}
}
void
Server::HandleGet(const TransportAddress &remote,
const GetMessage &msg)
{
int status;
string value;
status = store->Get(msg.clientid(), msg.key(), value);
GetReplyMessage reply;
reply.set_status(status);
reply.set_value(value);
transport->SendMessage(this, remote, reply);
}
void
Server::HandlePut(const TransportAddress &remote,
const PutMessage &msg)
{
int status = store->Put(msg.clientid(), msg.key(), msg.value());
PutReplyMessage reply;
reply.set_status(status);
transport->SendMessage(this, remote, reply);
}
void
Server::Load(const string &key, const string &value)
{
store->Load(key, value);
}
} // namespace qwstore
// -*- mode: c++; c-file-style: "k&r"; c-basic-offset: 4 -*-
// vim: set ts=4 sw=4:
/***********************************************************************
*
* qwstore/server.h:
* QWStore storage server
*
**********************************************************************/
#ifndef _QW_SERVER_H_
#define _QW_SERVER_H_
#include "paxos-lib/lib/assert.h"
#include "paxos-lib/lib/message.h"
#include "paxos-lib/lib/udptransport.h"
#include "paxos-lib/lib/configuration.h"
#include "common/timestamp.h"
#include "common/transaction.h"
#include "common/txnstore.h"
#include "qwstore/qwstore.h"
#include "qwstore/qw-proto.pb.h"
namespace qwstore {
class Server : TransportReceiver
{
private:
// Underlying single node transactional key-value store.
QWStore *store;
// Configuration of replicas.
specpaxos::Configuration configuration;
// Index of 'this' replica, and handle to transport layer.
int myIdx;
Transport *transport;
public:
Server(const specpaxos::Configuration &configuration, int myIdx,
Transport *transport, QWStore *store);
~Server();
void ReceiveMessage(const TransportAddress &remote,
const std::string &type, const std::string &data);
void HandleMessage(const TransportAddress &remote,
const std::string &type, const std::string &data);
void HandleGet(const TransportAddress &remote,
const proto::GetMessage &msg);
void HandlePut(const TransportAddress &remote,
const proto::PutMessage &msg);
void Load(const std::string &key, const std::string &value);
};
} // namespace qwstore
#endif /* _QW_SERVER_H_ */
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment