Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • syslab/tapir
  • aaasz/tapir
  • ashmrtnz/tapir
3 results
Show changes
Showing
with 2345 additions and 191 deletions
// -*- mode: c++; c-file-style: "k&r"; c-basic-offset: 4 -*-
/***********************************************************************
*
* log.h:
* a replica's log of pending and committed operations
*
* Copyright 2013 Dan R. K. Ports <drkp@cs.washington.edu>
*
* Permission is hereby granted, free of charge, to any person
* obtaining a copy of this software and associated documentation
* files (the "Software"), to deal in the Software without
* restriction, including without limitation the rights to use, copy,
* modify, merge, publish, distribute, sublicense, and/or sell copies
* of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be
* included in all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
* MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
* NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
* BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
* ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
* CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*
**********************************************************************/
#ifndef _IR_RECORD_H_
#define _IR_RECORD_H_
#include <map>
#include <string>
#include <utility>
#include "lib/assert.h"
#include "lib/message.h"
#include "lib/transport.h"
#include "replication/common/request.pb.h"
#include "replication/common/viewstamp.h"
#include "replication/ir/ir-proto.pb.h"
namespace replication {
namespace ir {
typedef std::pair<uint64_t, uint64_t> opid_t;
struct RecordEntry
{
view_t view;
opid_t opid;
proto::RecordEntryState state;
proto::RecordEntryType type;
Request request;
std::string result;
RecordEntry() { result = ""; }
RecordEntry(const RecordEntry &x)
: view(x.view),
opid(x.opid),
state(x.state),
type(x.type),
request(x.request),
result(x.result) {}
RecordEntry(view_t view, opid_t opid, proto::RecordEntryState state,
proto::RecordEntryType type, const Request &request,
const std::string &result)
: view(view),
opid(opid),
state(state),
type(type),
request(request),
result(result) {}
virtual ~RecordEntry() {}
};
class Record
{
public:
// Use the copy-and-swap idiom to make Record moveable but not copyable
// [1]. We make it non-copyable to avoid unnecessary copies.
//
// [1]: https://stackoverflow.com/a/3279550/3187068
Record(){};
Record(const proto::RecordProto &record_proto);
Record(Record &&other) : Record() { swap(*this, other); }
Record(const Record &) = delete;
Record &operator=(const Record &) = delete;
Record &operator=(Record &&other) {
swap(*this, other);
return *this;
}
friend void swap(Record &x, Record &y) {
std::swap(x.entries, y.entries);
}
RecordEntry &Add(const RecordEntry& entry);
RecordEntry &Add(view_t view, opid_t opid, const Request &request,
proto::RecordEntryState state,
proto::RecordEntryType type);
RecordEntry &Add(view_t view, opid_t opid, const Request &request,
proto::RecordEntryState state, proto::RecordEntryType type,
const std::string &result);
RecordEntry *Find(opid_t opid);
bool SetStatus(opid_t opid, proto::RecordEntryState state);
bool SetResult(opid_t opid, const std::string &result);
bool SetRequest(opid_t opid, const Request &req);
void Remove(opid_t opid);
bool Empty() const;
void ToProto(proto::RecordProto *proto) const;
const std::map<opid_t, RecordEntry> &Entries() const;
private:
std::map<opid_t, RecordEntry> entries;
};
} // namespace ir
} // namespace replication
#endif /* _IR_RECORD_H_ */
// -*- mode: c++; c-file-style: "k&r"; c-basic-offset: 4 -*-
/***********************************************************************
*
* replication/ir/replica.cc:
* IR Replica server
*
**********************************************************************/
#include "replication/ir/replica.h"
#include <cstdint>
#include <set>
namespace replication {
namespace ir {
using namespace std;
using namespace proto;
IRReplica::IRReplica(transport::Configuration config, int myIdx,
Transport *transport, IRAppReplica *app)
: config(std::move(config)), myIdx(myIdx), transport(transport), app(app),
status(STATUS_NORMAL), view(0), latest_normal_view(0),
// TODO: Take these filenames in via the command line?
persistent_view_info(config.replica(myIdx).host + ":" +
config.replica(myIdx).port + "_" +
std::to_string(myIdx) + ".bin"),
// Note that a leader waits for DO-VIEW-CHANGE messages from f other
// replicas (as opposed to f + 1) for a total of f + 1 replicas.
do_view_change_quorum(config.f)
{
transport->Register(this, config, myIdx);
// If our view info was previously initialized, then we are being started
// in recovery mode. If our view info has never been initialized, then this
// is the first time we are being run.
if (persistent_view_info.Initialized()) {
Debug("View information found in %s. Starting recovery.",
persistent_view_info.Filename().c_str());
status = STATUS_RECOVERING;
RecoverViewInfo();
Debug("Recovered view = %" PRIu64 " latest_normal_view = %" PRIu64 ".",
view, latest_normal_view);
++view;
if (myIdx == config.GetLeaderIndex(view)) {
// A recoverying replica should not be the leader.
++view;
}
PersistViewInfo();
BroadcastDoViewChangeMessages();
} else {
PersistViewInfo();
}
// TODO: Figure out a good view change timeout.
const uint64_t view_change_timeout_ms = 10 * 1000;
view_change_timeout = std::unique_ptr<Timeout>(
new Timeout(transport, view_change_timeout_ms,
[this]() { this->HandleViewChangeTimeout(); }));
view_change_timeout->Start();
}
IRReplica::~IRReplica() { }
void
IRReplica::ReceiveMessage(const TransportAddress &remote,
const string &type, const string &data)
{
HandleMessage(remote, type, data);
}
void
IRReplica::HandleMessage(const TransportAddress &remote,
const string &type, const string &data)
{
ProposeInconsistentMessage proposeInconsistent;
FinalizeInconsistentMessage finalizeInconsistent;
ProposeConsensusMessage proposeConsensus;
FinalizeConsensusMessage finalizeConsensus;
UnloggedRequestMessage unloggedRequest;
DoViewChangeMessage doViewChange;
StartViewMessage startView;
if (type == proposeInconsistent.GetTypeName()) {
proposeInconsistent.ParseFromString(data);
HandleProposeInconsistent(remote, proposeInconsistent);
} else if (type == finalizeInconsistent.GetTypeName()) {
finalizeInconsistent.ParseFromString(data);
HandleFinalizeInconsistent(remote, finalizeInconsistent);
} else if (type == proposeConsensus.GetTypeName()) {
proposeConsensus.ParseFromString(data);
HandleProposeConsensus(remote, proposeConsensus);
} else if (type == finalizeConsensus.GetTypeName()) {
finalizeConsensus.ParseFromString(data);
HandleFinalizeConsensus(remote, finalizeConsensus);
} else if (type == doViewChange.GetTypeName()) {
doViewChange.ParseFromString(data);
HandleDoViewChange(remote, doViewChange);
} else if (type == startView.GetTypeName()) {
startView.ParseFromString(data);
HandleStartView(remote, startView);
} else if (type == unloggedRequest.GetTypeName()) {
unloggedRequest.ParseFromString(data);
HandleUnlogged(remote, unloggedRequest);
} else {
Panic("Received unexpected message type in IR proto: %s",
type.c_str());
}
}
void
IRReplica::HandleProposeInconsistent(const TransportAddress &remote,
const ProposeInconsistentMessage &msg)
{
uint64_t clientid = msg.req().clientid();
uint64_t clientreqid = msg.req().clientreqid();
Debug("%lu:%lu Received inconsistent op: %s", clientid, clientreqid, (char *)msg.req().op().c_str());
opid_t opid = make_pair(clientid, clientreqid);
// Check record if we've already handled this request
RecordEntry *entry = record.Find(opid);
ReplyInconsistentMessage reply;
if (entry != NULL) {
// If we already have this op in our record, then just return it
reply.set_view(entry->view);
reply.set_replicaidx(myIdx);
reply.mutable_opid()->set_clientid(clientid);
reply.mutable_opid()->set_clientreqid(clientreqid);
reply.set_finalized(entry->state == RECORD_STATE_FINALIZED);
} else {
// Otherwise, put it in our record as tentative
record.Add(view, opid, msg.req(), RECORD_STATE_TENTATIVE,
RECORD_TYPE_INCONSISTENT);
// 3. Return Reply
reply.set_view(view);
reply.set_replicaidx(myIdx);
reply.mutable_opid()->set_clientid(clientid);
reply.mutable_opid()->set_clientreqid(clientreqid);
reply.set_finalized(false);
}
// Send the reply
transport->SendMessage(this, remote, reply);
}
void
IRReplica::HandleFinalizeInconsistent(const TransportAddress &remote,
const FinalizeInconsistentMessage &msg)
{
uint64_t clientid = msg.opid().clientid();
uint64_t clientreqid = msg.opid().clientreqid();
Debug("%lu:%lu Received finalize inconsistent op", clientid, clientreqid);
opid_t opid = make_pair(clientid, clientreqid);
// Check record for the request
RecordEntry *entry = record.Find(opid);
if (entry != NULL && entry->state == RECORD_STATE_TENTATIVE) {
// Mark entry as finalized
record.SetStatus(opid, RECORD_STATE_FINALIZED);
// Execute the operation
app->ExecInconsistentUpcall(entry->request.op());
// Send the reply
ConfirmMessage reply;
reply.set_view(view);
reply.set_replicaidx(myIdx);
*reply.mutable_opid() = msg.opid();
transport->SendMessage(this, remote, reply);
} else {
// Ignore?
}
}
void
IRReplica::HandleProposeConsensus(const TransportAddress &remote,
const ProposeConsensusMessage &msg)
{
uint64_t clientid = msg.req().clientid();
uint64_t clientreqid = msg.req().clientreqid();
Debug("%lu:%lu Received consensus op: %s", clientid, clientreqid, (char *)msg.req().op().c_str());
opid_t opid = make_pair(clientid, clientreqid);
// Check record if we've already handled this request
RecordEntry *entry = record.Find(opid);
ReplyConsensusMessage reply;
if (entry != NULL) {
// If we already have this op in our record, then just return it
reply.set_view(entry->view);
reply.set_replicaidx(myIdx);
reply.mutable_opid()->set_clientid(clientid);
reply.mutable_opid()->set_clientreqid(clientreqid);
reply.set_result(entry->result);
reply.set_finalized(entry->state == RECORD_STATE_FINALIZED);
} else {
// Execute op
string result;
app->ExecConsensusUpcall(msg.req().op(), result);
// Put it in our record as tentative
record.Add(view, opid, msg.req(), RECORD_STATE_TENTATIVE,
RECORD_TYPE_CONSENSUS, result);
// 3. Return Reply
reply.set_view(view);
reply.set_replicaidx(myIdx);
reply.mutable_opid()->set_clientid(clientid);
reply.mutable_opid()->set_clientreqid(clientreqid);
reply.set_result(result);
reply.set_finalized(false);
}
// Send the reply
transport->SendMessage(this, remote, reply);
}
void
IRReplica::HandleFinalizeConsensus(const TransportAddress &remote,
const FinalizeConsensusMessage &msg)
{
uint64_t clientid = msg.opid().clientid();
uint64_t clientreqid = msg.opid().clientreqid();
Debug("%lu:%lu Received finalize consensus op", clientid, clientreqid);
opid_t opid = make_pair(clientid, clientreqid);
// Check record for the request
RecordEntry *entry = record.Find(opid);
if (entry != NULL) {
// Mark entry as finalized
record.SetStatus(opid, RECORD_STATE_FINALIZED);
if (msg.result() != entry->result) {
// Update the result
entry->result = msg.result();
}
// Send the reply
ConfirmMessage reply;
reply.set_view(view);
reply.set_replicaidx(myIdx);
*reply.mutable_opid() = msg.opid();
if (!transport->SendMessage(this, remote, reply)) {
Warning("Failed to send reply message");
}
} else {
// Ignore?
Warning("Finalize request for unknown consensus operation");
}
}
void
IRReplica::HandleDoViewChange(const TransportAddress &remote,
const proto::DoViewChangeMessage &msg)
{
Debug(
"Received DoViewChangeMessage from replica %d with new_view = %" PRIu64
", latest_normal_view = %" PRIu64 ", has_record = %d.",
msg.replicaidx(), msg.new_view(), msg.latest_normal_view(),
msg.has_record());
if (msg.new_view() < view) {
Debug("Ignoring DO-VIEW-CHANGE for view %" PRIu64 " < %" PRIu64 ". ",
msg.new_view(), view);
return;
} else if (msg.new_view() == view) {
// If we're NORMAL, then we've already completed this view change.
if (status == STATUS_NORMAL) {
Debug("Ignoring DO-VIEW-CHANGE for view %" PRIu64
" because our status is NORMAL.",
view);
return;
}
// If we're a recovering node, we don't want to be the leader.
if (status == STATUS_NORMAL) {
Debug("Ignoring DO-VIEW-CHANGE for view %" PRIu64
" because our status is RECOVERING.",
view);
return;
}
} else {
ASSERT(msg.new_view() > view);
// Update and persist our view.
view = msg.new_view();
PersistViewInfo();
// Update our status. If we're NORMAL, then we transition into
// VIEW_CHANGE. If we're VIEW_CHANGE or RECOVERING, we want to stay in
// VIEW_CHANGE or RECOVERING. Note that it would be a bug to transition
// from RECOVERING to VIEW_CHANGE before we finish recovering.
if (status == STATUS_NORMAL) {
status = STATUS_VIEW_CHANGE;
}
// We just began a new view change, so we need to broadcast
// DO-VIEW-CHANGE messages to everyone.
BroadcastDoViewChangeMessages();
// Restart our view change timer. We don't to perform a view change
// right after we just performed a view change.
view_change_timeout->Reset();
}
ASSERT(msg.new_view() == view);
// If we're not the leader of this view change, then we have nothing to do.
if (myIdx != config.GetLeaderIndex(view)) {
return;
}
// Replicas should send their records to the leader.
ASSERT(msg.has_record());
const std::map<int, DoViewChangeMessage> *quorum =
do_view_change_quorum.AddAndCheckForQuorum(msg.new_view(),
msg.replicaidx(), msg);
if (quorum == nullptr) {
// There is no quorum yet.
return;
}
Debug("Received a quourum of DoViewChangeMessages. Initiating "
"IR-MERGE-RECORDS.");
// Update our record, status, and view.
record = IrMergeRecords(*quorum);
status = STATUS_NORMAL;
view = msg.new_view();
latest_normal_view = view;
PersistViewInfo();
// Notify all replicas of the new view.
StartViewMessage start_view_msg;
record.ToProto(start_view_msg.mutable_record());
start_view_msg.set_new_view(view);
// TODO: Don't send this message to myself. It's not incorrect, but it's
// unnecessary.
// TODO: Acknowledge StartViewMessage messages, and rebroadcast them after
// a timeout.
Debug("Sending StartViewMessages to all replicas.");
bool success = transport->SendMessageToAll(this, start_view_msg);
if (!success) {
Warning("Could not send StartViewMessage.");
}
}
void
IRReplica::HandleStartView(const TransportAddress &remote,
const proto::StartViewMessage &msg)
{
Debug("Received StartViewMessage with new_view = %" PRIu64 ".",
msg.new_view());
// A leader should not be sending START-VIEW messages to themselves.
ASSERT(myIdx != config.GetLeaderIndex(msg.new_view()));
if (msg.new_view() < view) {
Debug("Ignoring START-VIEW for view %" PRIu64 " < %" PRIu64 ". ",
msg.new_view(), view);
return;
}
// If new_view == view and we're NORMAL, then we've already completed this
// view change, and we don't want to do it again.
if (msg.new_view() == view && status == STATUS_NORMAL) {
Debug("Ignoring START-VIEW for view %" PRIu64
" because our status is NORMAL.",
view);
return;
}
ASSERT((msg.new_view() >= view) ||
(msg.new_view() == view && status != STATUS_NORMAL));
// Throw away our record for the new master record and call sync.
record = Record(msg.record());
app->Sync(record.Entries());
status = STATUS_NORMAL;
view = msg.new_view();
latest_normal_view = view;
PersistViewInfo();
}
void
IRReplica::HandleUnlogged(const TransportAddress &remote,
const UnloggedRequestMessage &msg)
{
UnloggedReplyMessage reply;
string res;
Debug("Received unlogged request %s", (char *)msg.req().op().c_str());
app->UnloggedUpcall(msg.req().op(), res);
reply.set_reply(res);
reply.set_clientreqid(msg.req().clientreqid());
if (!(transport->SendMessage(this, remote, reply)))
Warning("Failed to send reply message");
}
void IRReplica::HandleViewChangeTimeout() {
Debug("HandleViewChangeTimeout fired.");
if (status == STATUS_NORMAL) {
status = STATUS_VIEW_CHANGE;
}
++view;
PersistViewInfo();
BroadcastDoViewChangeMessages();
}
void IRReplica::PersistViewInfo() {
PersistedViewInfo view_info;
view_info.set_view(view);
view_info.set_latest_normal_view(latest_normal_view);
std::string output;
ASSERT(view_info.SerializeToString(&output));
persistent_view_info.Write(output);
}
void IRReplica::RecoverViewInfo() {
PersistedViewInfo view_info;
view_info.ParseFromString(persistent_view_info.Read());
view = view_info.view();
latest_normal_view = view_info.latest_normal_view();
}
void IRReplica::BroadcastDoViewChangeMessages() {
// Send a DoViewChangeMessage _without_ our record to all replicas except
// ourselves and the leader.
proto::DoViewChangeMessage msg;
msg.set_replicaidx(myIdx);
msg.clear_record();
msg.set_new_view(view);
msg.set_latest_normal_view(latest_normal_view);
const int leader_idx = config.GetLeaderIndex(view);
Debug(
"Broadcasting DoViewChangeMessages to replicas with leader id = %d, "
"view = %" PRIu64 ", latest_normal_view = %" PRIu64 ".",
leader_idx, view, latest_normal_view);
for (int i = 0; i < config.n; ++i) {
if (i == myIdx || i == leader_idx) {
continue;
}
bool success = transport->SendMessageToReplica(this, i, msg);
if (!success) {
Warning("Could not send DoViewChangeMessage to replica %d.", i);
}
}
// Send a DoViewChangeMessage _with_ our record to the leader (unless we
// are the leader).
record.ToProto(msg.mutable_record());
if (leader_idx != myIdx) {
bool success = transport->SendMessageToReplica(this, leader_idx, msg);
if (!success) {
Warning("Could not send DoViewChangeMessage to leader %d.",
leader_idx);
}
}
}
Record
IRReplica::IrMergeRecords(const std::map<int, DoViewChangeMessage>& records) {
// TODO: This implementation of IrMergeRecords is not the most efficient in
// the world. It could be optimized a bit if it happens to be a bottleneck.
// For example, Merge could take in pointers to the record entry vectors.
// Create a type alias to save some typing.
using RecordEntryVec = std::vector<RecordEntry>;
// Find the largest latest_normal_view.
view_t max_latest_normal_view = latest_normal_view;
for (const std::pair<const int, DoViewChangeMessage>& p : records) {
const DoViewChangeMessage& msg = p.second;
max_latest_normal_view =
std::max(max_latest_normal_view, msg.latest_normal_view());
}
// Collect the records with largest latest_normal_view.
std::vector<Record> latest_records;
for (const std::pair<const int, DoViewChangeMessage>& p : records) {
const DoViewChangeMessage& msg = p.second;
if (msg.latest_normal_view() == max_latest_normal_view) {
ASSERT(msg.has_record());
latest_records.push_back(Record(msg.record()));
}
}
if (latest_normal_view == max_latest_normal_view) {
latest_records.push_back(std::move(record));
}
// Group together all the entries from all the records in latest_records.
// We'll use this to build d and u. Simultaneously populate R.
// TODO: Avoid redundant copies.
Record R;
std::map<opid_t, RecordEntryVec> entries_by_opid;
for (const Record &r : latest_records) {
for (const std::pair<const opid_t, RecordEntry> &p : r.Entries()) {
const opid_t &opid = p.first;
const RecordEntry &entry = p.second;
ASSERT(opid == entry.opid);
if (entry.type == RECORD_TYPE_INCONSISTENT) {
// TODO: Do we have to update the view here?
if (R.Find(opid) == nullptr) {
R.Add(entry);
}
} else if (entry.state == RECORD_STATE_FINALIZED) {
// TODO: Do we have to update the view here?
if (R.Find(opid) == nullptr) {
R.Add(entry);
}
entries_by_opid.erase(opid);
} else {
ASSERT(entry.type == RECORD_TYPE_CONSENSUS &&
entry.state == RECORD_STATE_TENTATIVE);
// If R already contains this operation, then we don't group
// it.
if (R.Entries().count(entry.opid) == 0) {
entries_by_opid[entry.opid].push_back(entry);
}
}
}
}
// Build d and u.
std::map<opid_t, RecordEntryVec> d;
std::map<opid_t, RecordEntryVec> u;
std::map<opid_t, std::string> majority_results_in_d;
for (const std::pair<const opid_t, RecordEntryVec> &p : entries_by_opid) {
const opid_t &opid = p.first;
const RecordEntryVec &entries = p.second;
// Count the frequency of each response.
std::map<std::string, std::size_t> result_counts;
for (const RecordEntry& entry : entries) {
result_counts[entry.result] += 1;
}
// Check if any response occurs ceil(f/2) + 1 times or more.
bool in_d = false;
std::string majority_result_in_d = "";
for (const std::pair<const std::string, std::size_t> &c :
result_counts) {
if (c.second >= ceil(0.5 * config.f) + 1) {
majority_result_in_d = c.first;
in_d = true;
break;
}
}
// TODO: Avoid redundant copies.
if (in_d) {
d[opid] = entries;
majority_results_in_d[opid] = majority_result_in_d;
} else {
u[opid] = entries;
}
}
// Sync.
app->Sync(R.Entries());
// Merge.
std::map<opid_t, std::string> results_by_opid =
app->Merge(d, u, majority_results_in_d);
// Sanity check Merge results. Every opid should be present.
ASSERT(results_by_opid.size() == d.size() + u.size());
for (const std::pair<const opid_t, std::string> &p : results_by_opid) {
const opid_t &opid = p.first;
ASSERT(d.count(opid) + u.count(opid) == 1);
}
// Convert Merge results into a Record.
Record merged;
for (std::pair<const opid_t, std::string> &p : results_by_opid) {
const opid_t &opid = p.first;
std::string &result = p.second;
const std::vector<RecordEntry> entries = entries_by_opid[opid];
ASSERT(records.size() > 0);
const RecordEntry &entry = entries[0];
// TODO: Is this view correct?
merged.Add(view, opid, entry.request, RECORD_STATE_FINALIZED,
entry.type, std::move(result));
}
// R = R cup merged.
for (const std::pair<const opid_t, RecordEntry> &r : merged.Entries()) {
// TODO: Avoid copy.
R.Add(r.second);
}
return R;
}
} // namespace ir
} // namespace replication
// -*- mode: c++; c-file-style: "k&r"; c-basic-offset: 4 -*-
/***********************************************************************
*
* replication/ir/replica.h:
* IR Replica server
*
**********************************************************************/
#ifndef _IR_REPLICA_H_
#define _IR_REPLICA_H_
#include <memory>
#include "lib/assert.h"
#include "lib/configuration.h"
#include "lib/message.h"
#include "lib/persistent_register.h"
#include "lib/udptransport.h"
#include "replication/common/quorumset.h"
#include "replication/common/replica.h"
#include "replication/ir/ir-proto.pb.h"
#include "replication/ir/record.h"
namespace replication {
namespace ir {
class IRAppReplica
{
public:
IRAppReplica() { };
virtual ~IRAppReplica() { };
// Invoke inconsistent operation, no return value
virtual void ExecInconsistentUpcall(const string &str1) { };
// Invoke consensus operation
virtual void ExecConsensusUpcall(const string &str1, string &str2) { };
// Invoke unreplicated operation
virtual void UnloggedUpcall(const string &str1, string &str2) { };
// Sync
virtual void Sync(const std::map<opid_t, RecordEntry>& record) { };
// Merge
virtual std::map<opid_t, std::string> Merge(
const std::map<opid_t, std::vector<RecordEntry>> &d,
const std::map<opid_t, std::vector<RecordEntry>> &u,
const std::map<opid_t, std::string> &majority_results_in_d) {
return {};
};
};
class IRReplica : TransportReceiver
{
public:
IRReplica(transport::Configuration config, int myIdx,
Transport *transport, IRAppReplica *app);
~IRReplica();
// Message handlers.
void ReceiveMessage(const TransportAddress &remote,
const std::string &type, const std::string &data);
void HandleMessage(const TransportAddress &remote,
const std::string &type, const std::string &data);
void HandleProposeInconsistent(const TransportAddress &remote,
const proto::ProposeInconsistentMessage &msg);
void HandleFinalizeInconsistent(const TransportAddress &remote,
const proto::FinalizeInconsistentMessage &msg);
void HandleProposeConsensus(const TransportAddress &remote,
const proto::ProposeConsensusMessage &msg);
void HandleFinalizeConsensus(const TransportAddress &remote,
const proto::FinalizeConsensusMessage &msg);
void HandleDoViewChange(const TransportAddress &remote,
const proto::DoViewChangeMessage &msg);
void HandleStartView(const TransportAddress &remote,
const proto::StartViewMessage &msg);
void HandleUnlogged(const TransportAddress &remote,
const proto::UnloggedRequestMessage &msg);
// Timeout handlers.
void HandleViewChangeTimeout();
private:
// Persist `view` and `latest_normal_view` to disk using
// `persistent_view_info`.
void PersistViewInfo();
// Recover `view` and `latest_normal_view` from disk using
// `persistent_view_info`.
void RecoverViewInfo();
// Broadcast DO-VIEW-CHANGE messages to all other replicas with our record
// included only in the message to the leader.
void BroadcastDoViewChangeMessages();
// IrMergeRecords implements Figure 5 of the TAPIR paper.
Record IrMergeRecords(
const std::map<int, proto::DoViewChangeMessage> &records);
transport::Configuration config;
int myIdx; // Replica index into config.
Transport *transport;
IRAppReplica *app;
ReplicaStatus status;
// For the view change and recovery protocol, a replica stores its view and
// latest normal view to disk. We store this info in view and
// latest_normal_view and use persistent_view_info to persist it to disk.
view_t view;
view_t latest_normal_view;
PersistentRegister persistent_view_info;
Record record;
std::unique_ptr<Timeout> view_change_timeout;
// The leader of a view-change waits to receive a quorum of DO-VIEW-CHANGE
// messages before merging and syncing and sending out START-VIEW messages.
// do_view_change_quorum is used to wait for this quorum.
//
// TODO: Garbage collect old view change quorums. Once we've entered view
// v, we should be able to garbage collect all quorums for views less than
// v.
QuorumSet<view_t, proto::DoViewChangeMessage> do_view_change_quorum;
};
} // namespace ir
} // namespace replication
#endif /* _IR_REPLICA_H_ */
d := $(dir $(lastword $(MAKEFILE_LIST)))
GTEST_SRCS += $(d)ir-test.cc
$(d)ir-test: $(o)ir-test.o \
$(OBJS-ir-replica) $(OBJS-ir-client) \
$(LIB-simtransport) \
$(GTEST_MAIN)
TEST_BINS += $(d)ir-test
// -*- mode: c++; c-file-style: "k&r"; c-basic-offset: 4 -*-
/***********************************************************************
*
* ir-test.cc:
* test cases for Inconsistent Replication Protocol
*
* Copyright 2013 Dan R. K. Ports <drkp@cs.washington.edu>
* Copyright 2015 Irene Zhang Ports <iyzhang@cs.washington.edu>
*
* Permission is hereby granted, free of charge, to any person
* obtaining a copy of this software and associated documentation
* files (the "Software"), to deal in the Software without
* restriction, including without limitation the rights to use, copy,
* modify, merge, publish, distribute, sublicense, and/or sell copies
* of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be
* included in all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
* MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
* NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
* BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
* ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
* CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*
**********************************************************************/
#include "lib/configuration.h"
#include "lib/message.h"
#include "lib/transport.h"
#include "lib/simtransport.h"
#include "replication/common/client.h"
#include "replication/common/replica.h"
#include "replication/ir/client.h"
#include "replication/ir/replica.h"
#include <cstdio>
#include <stdlib.h>
#include <stdio.h>
#include <gtest/gtest.h>
#include <vector>
#include <set>
#include <sstream>
#include <memory>
using google::protobuf::Message;
using namespace replication;
using namespace replication::ir;
using namespace replication::ir::proto;
class IRApp : public IRAppReplica {
std::vector<string> *iOps;
std::vector<string> *cOps;
std::vector<string> *unloggedOps;
public:
IRApp(std::vector<string> *i, std::vector<string> *c,
std::vector<string> *u)
: iOps(i), cOps(c), unloggedOps(u) {}
void ExecInconsistentUpcall(const string &req) {
iOps->push_back(req);
}
void ExecConsensusUpcall(const string &req, string &reply) {
cOps->push_back(req);
reply = "1";
}
void UnloggedUpcall(const string &req, string &reply) {
unloggedOps->push_back(req);
reply = "unlreply: " + req;
}
};
class IRTest : public ::testing::Test
{
protected:
std::vector<transport::ReplicaAddress> replicaAddrs;
std::unique_ptr<transport::Configuration> config;
SimulatedTransport transport;
std::vector<std::unique_ptr<IRApp>> apps;
std::vector<std::unique_ptr<IRReplica>> replicas;
std::unique_ptr<IRClient> client;
std::vector<std::vector<string> > iOps;
std::vector<std::vector<string> > cOps;
std::vector<std::vector<string> > unloggedOps;
int requestNum;
IRTest() : requestNum(-1) {
replicaAddrs = {{"localhost", "12345"},
{"localhost", "12346"},
{"localhost", "12347"}};
config = std::unique_ptr<transport::Configuration>(
new transport::Configuration(3, 1, replicaAddrs));
iOps.resize(config->n);
cOps.resize(config->n);
unloggedOps.resize(config->n);
for (int i = 0; i < config->n; i++) {
auto ir_app = std::unique_ptr<IRApp>(
new IRApp(&iOps[i], &cOps[i], &unloggedOps[i]));
auto p = std::unique_ptr<IRReplica>(
new IRReplica(*config, i, &transport, ir_app.get()));
apps.push_back(std::move(ir_app));
replicas.push_back(std::move(p));
}
client = std::unique_ptr<IRClient>(new IRClient(*config, &transport));
}
virtual string RequestOp(int n) {
std::ostringstream stream;
stream << "test: " << n;
return stream.str();
}
virtual string LastRequestOp() {
return RequestOp(requestNum);
}
virtual void ClientSendNextInconsistent(Client::continuation_t upcall) {
requestNum++;
client->InvokeInconsistent(LastRequestOp(), upcall);
}
virtual void ClientSendNextConsensus(Client::continuation_t upcall,
IRClient::decide_t decide) {
requestNum++;
client->InvokeConsensus(LastRequestOp(), decide, upcall);
}
virtual void ClientSendNextUnlogged(
int idx, Client::continuation_t upcall,
Client::error_continuation_t error_continuation = nullptr,
uint32_t timeout = Client::DEFAULT_UNLOGGED_OP_TIMEOUT) {
requestNum++;
client->InvokeUnlogged(idx, LastRequestOp(), upcall,
error_continuation, timeout);
}
virtual void TearDown() {
// Replicas store their view information in the following files:
// - localhost:12345_0.bin
// - localhost:12346_1.bin
// - localhost:12347_2.bin
// We have to make sure to delete them after every test. Otherwise,
// replicas run in recovery mode.
for (std::size_t i = 0; i < replicaAddrs.size(); ++i) {
const transport::ReplicaAddress &addr = replicaAddrs[i];
const std::string filename =
addr.host + ":" + addr.port + "_" + std::to_string(i) + ".bin";
int success = std::remove(filename.c_str());
ASSERT(success == 0);
}
}
};
TEST_F(IRTest, OneInconsistentOp)
{
auto upcall = [this](const string &req, const string &reply) {
EXPECT_EQ(req, LastRequestOp());
// Inconsistent ops do not return a value
EXPECT_EQ(reply, "");
transport.CancelAllTimers();
};
ClientSendNextInconsistent(upcall);
transport.Run();
// By now, they all should have executed the last request.
for (int i = 0; i < config->n; i++) {
EXPECT_EQ(iOps[i].size(), 1);
EXPECT_EQ(iOps[i].back(), LastRequestOp());
}
}
TEST_F(IRTest, OneConsensusOp)
{
auto upcall = [this](const string &req, const string &reply) {
EXPECT_EQ(req, LastRequestOp());
EXPECT_EQ(reply, "1");
transport.CancelAllTimers();
};
auto decide = [this](const std::map<string, std::size_t> &results) {
// shouldn't ever get called
EXPECT_FALSE(true);
return "";
};
ClientSendNextConsensus(upcall, decide);
transport.Run();
// By now, they all should have executed the last request.
for (int i = 0; i < config->n; i++) {
EXPECT_EQ(cOps[i].size(), 1);
EXPECT_EQ(cOps[i].back(), LastRequestOp());
}
}
TEST_F(IRTest, Unlogged)
{
auto upcall = [this](const string &req, const string &reply) {
EXPECT_EQ(req, LastRequestOp());
EXPECT_EQ(reply, "unlreply: "+LastRequestOp());
EXPECT_EQ(unloggedOps[1].back(), req);
transport.CancelAllTimers();
};
int timeouts = 0;
auto timeout = [&](const string &req, ErrorCode) {
timeouts++;
};
ClientSendNextUnlogged(1, upcall, timeout);
transport.Run();
for (unsigned int i = 0; i < iOps.size(); i++) {
EXPECT_EQ(0, iOps[i].size());
EXPECT_EQ((i == 1 ? 1 : 0), unloggedOps[i].size());
}
EXPECT_EQ(0, timeouts);
}
TEST_F(IRTest, UnloggedTimeout)
{
auto upcall = [this](const string &req, const string &reply) {
FAIL();
transport.CancelAllTimers();
};
int timeouts = 0;
auto timeout = [&](const string &req, ErrorCode) {
timeouts++;
};
// Drop messages to or from replica 1
transport.AddFilter(10, [](TransportReceiver *src, int srcIdx,
TransportReceiver *dst, int dstIdx,
Message &m, uint64_t &delay) {
if ((srcIdx == 1) || (dstIdx == 1)) {
return false;
}
return true;
});
// Run for 10 seconds
transport.Timer(10000, [&]() {
transport.CancelAllTimers();
});
ClientSendNextUnlogged(1, upcall, timeout);
transport.Run();
for (unsigned int i = 0; i < iOps.size(); i++) {
EXPECT_EQ(0, iOps[i].size());
EXPECT_EQ(0, unloggedOps[i].size());
}
EXPECT_EQ(1, timeouts);
}
// TEST_F(IRTest, ManyOps)
// {
// Client::continuation_t upcall = [&](const string &req, const string &reply) {
// EXPECT_EQ(req, LastRequestOp());
// EXPECT_EQ(reply, "reply: "+LastRequestOp());
// // Not guaranteed that any replicas except the leader have
// // executed this request.
// EXPECT_EQ(ops[0].back(), req);
// if (requestNum < 9) {
// ClientSendNext(upcall);
// } else {
// transport.CancelAllTimers();
// }
// };
// ClientSendNext(upcall);
// transport.Run();
// // By now, they all should have executed the last request.
// for (int i = 0; i < config->n; i++) {
// EXPECT_EQ(10, ops[i].size());
// for (int j = 0; j < 10; j++) {
// EXPECT_EQ(RequestOp(j), ops[i][j]);
// }
// }
// }
......@@ -44,104 +44,95 @@ VRClient::VRClient(const transport::Configuration &config,
uint64_t clientid)
: Client(config, transport, clientid)
{
pendingRequest = NULL;
pendingUnloggedRequest = NULL;
lastReqId = 0;
requestTimeout = new Timeout(transport, 500, [this]() {
ResendRequest();
});
unloggedRequestTimeout = new Timeout(transport, 500, [this]() {
UnloggedRequestTimeoutCallback();
});
}
VRClient::~VRClient()
{
if (pendingRequest) {
delete pendingRequest;
}
if (pendingUnloggedRequest) {
delete pendingUnloggedRequest;
for (auto kv : pendingReqs) {
delete kv.second;
}
delete requestTimeout;
delete unloggedRequestTimeout;
}
void
VRClient::Invoke(const string &request,
continuation_t continuation)
continuation_t continuation,
error_continuation_t error_continuation)
{
// XXX Can only handle one pending request for now
if (pendingRequest != NULL) {
Panic("Client only supports one pending request");
}
// TODO: Currently, invocations never timeout and error_continuation is
// never called. It may make sense to set a timeout on the invocation.
(void) error_continuation;
++lastReqId;
uint64_t reqId = lastReqId;
pendingRequest = new PendingRequest(request, reqId, continuation);
uint64_t reqId = ++lastReqId;
Timeout *timer = new Timeout(transport, 500, [this, reqId]() {
ResendRequest(reqId);
});
PendingRequest *req =
new PendingRequest(request, reqId, continuation, timer);
SendRequest();
pendingReqs[reqId] = req;
SendRequest(req);
}
void
VRClient::InvokeUnlogged(int replicaIdx,
const string &request,
continuation_t continuation,
timeout_continuation_t timeoutContinuation,
error_continuation_t error_continuation,
uint32_t timeout)
{
// XXX Can only handle one pending request for now
if (pendingUnloggedRequest != NULL) {
Panic("Client only supports one pending request");
}
++lastReqId;
uint64_t reqId = lastReqId;
pendingUnloggedRequest = new PendingRequest(request, reqId, continuation);
pendingUnloggedRequest->timeoutContinuation = timeoutContinuation;
uint64_t reqId = ++lastReqId;
proto::UnloggedRequestMessage reqMsg;
reqMsg.mutable_req()->set_op(pendingUnloggedRequest->request);
reqMsg.mutable_req()->set_op(request);
reqMsg.mutable_req()->set_clientid(clientid);
reqMsg.mutable_req()->set_clientreqid(pendingUnloggedRequest->clientReqId);
ASSERT(!unloggedRequestTimeout->Active());
unloggedRequestTimeout->SetTimeout(timeout);
unloggedRequestTimeout->Start();
transport->SendMessageToReplica(this, replicaIdx, reqMsg);
reqMsg.mutable_req()->set_clientreqid(reqId);
if (transport->SendMessageToReplica(this, replicaIdx, reqMsg)) {
Timeout *timer =
new Timeout(transport, timeout, [this, reqId]() {
UnloggedRequestTimeoutCallback(reqId);
});
PendingUnloggedRequest *req =
new PendingUnloggedRequest(request, reqId,
continuation, timer,
error_continuation);
pendingReqs[reqId] = req;
req->timer->Start();
} else {
Warning("Could not send unlogged request to replica %u.",
replicaIdx);
}
}
void
VRClient::SendRequest()
VRClient::SendRequest(const PendingRequest *req)
{
if (pendingRequest == NULL) {
return;
}
proto::RequestMessage reqMsg;
reqMsg.mutable_req()->set_op(pendingRequest->request);
reqMsg.mutable_req()->set_op(req->request);
reqMsg.mutable_req()->set_clientid(clientid);
reqMsg.mutable_req()->set_clientreqid(pendingRequest->clientReqId);
reqMsg.mutable_req()->set_clientreqid(req->clientReqId);
//Debug("SENDING REQUEST: %lu %lu", clientid, pendingRequest->clientReqId);
// XXX Try sending only to (what we think is) the leader first
transport->SendMessageToAll(this, reqMsg);
requestTimeout->Reset();
if (transport->SendMessageToAll(this, reqMsg)) {
req->timer->Reset();
} else {
Warning("Could not send request to replicas.");
pendingReqs.erase(req->clientReqId);
delete req;
}
}
void
VRClient::ResendRequest()
VRClient::ResendRequest(const uint64_t reqId)
{
if (pendingRequest == NULL) {
requestTimeout->Stop();
if (pendingReqs.find(reqId) == pendingReqs.end()) {
Debug("Received resend request when no request was pending");
return;
}
Warning("Client timeout; resending request: %lu", pendingRequest->clientReqId);
SendRequest();
Warning("Client timeout; resending request: %lu", reqId);
SendRequest(pendingReqs[reqId]);
}
......@@ -150,9 +141,9 @@ VRClient::ReceiveMessage(const TransportAddress &remote,
const string &type,
const string &data)
{
static proto::ReplyMessage reply;
static proto::UnloggedReplyMessage unloggedReply;
proto::ReplyMessage reply;
proto::UnloggedReplyMessage unloggedReply;
if (type == reply.GetTypeName()) {
reply.ParseFromString(data);
HandleReply(remote, reply);
......@@ -168,74 +159,58 @@ void
VRClient::HandleReply(const TransportAddress &remote,
const proto::ReplyMessage &msg)
{
if (pendingRequest == NULL) {
Warning("Received reply when no request was pending");
uint64_t reqId = msg.clientreqid();
auto it = pendingReqs.find(reqId);
if (it == pendingReqs.end()) {
Debug("Received reply when no request was pending");
return;
}
if (msg.clientreqid() != pendingRequest->clientReqId) {
Debug("Received reply for a different request");
return;
}
Debug("Client received reply: %lu", pendingRequest->clientReqId);
requestTimeout->Stop();
PendingRequest *req = pendingRequest;
pendingRequest = NULL;
#if CLIENT_NETWORK_DELAY
transport->Timer(CLIENT_NETWORK_DELAY, [=]() {
req->continuation(req->request, msg.reply());
delete req;
});
#else
PendingRequest *req = it->second;
Debug("Client received reply: %lu", reqId);
req->timer->Stop();
pendingReqs.erase(it);
req->continuation(req->request, msg.reply());
delete req;
#endif
}
void
VRClient::HandleUnloggedReply(const TransportAddress &remote,
const proto::UnloggedReplyMessage &msg)
{
if (pendingUnloggedRequest == NULL) {
Warning("Received unloggedReply when no request was pending");
uint64_t reqId = msg.clientreqid();
auto it = pendingReqs.find(reqId);
if (it == pendingReqs.end()) {
Debug("Received reply when no request was pending");
return;
}
Debug("Client received unloggedReply");
unloggedRequestTimeout->Stop();
PendingRequest *req = pendingUnloggedRequest;
pendingUnloggedRequest = NULL;
#if READ_AT_LEADER
transport->Timer(CLIENT_NETWORK_DELAY, [=]() {
req->continuation(req->request, msg.reply());
delete req;
});
#else
PendingRequest *req = it->second;
Debug("Client received unloggedReply %lu", reqId);
req->timer->Stop();
pendingReqs.erase(it);
req->continuation(req->request, msg.reply());
delete req;
#endif
}
void
VRClient::UnloggedRequestTimeoutCallback()
VRClient::UnloggedRequestTimeoutCallback(const uint64_t reqId)
{
PendingRequest *req = pendingUnloggedRequest;
pendingUnloggedRequest = NULL;
auto it = pendingReqs.find(reqId);
if (it == pendingReqs.end()) {
Debug("Received reply when no request was pending");
return;
}
Warning("Unlogged request timed out");
unloggedRequestTimeout->Stop();
req->timeoutContinuation(req->request);
PendingUnloggedRequest *req =
static_cast<PendingUnloggedRequest *>(it->second);
req->timer->Stop();
pendingReqs.erase(it);
if (req->error_continuation) {
req->error_continuation(req->request, ErrorCode::TIMEOUT);
}
delete req;
}
} // namespace vr
......
// -*- mode: c++; c-file-style: "k&r"; c-basic-offset: 4 -*-
/***********************************************************************
*
* vr/client.h:
* replication/vr/client.h:
* dummy implementation of replication interface that just uses a
* single replica and passes commands directly to it
*
......@@ -36,6 +36,8 @@
#include "lib/configuration.h"
#include "replication/vr/vr-proto.pb.h"
#include <unordered_map>
namespace replication {
namespace vr {
......@@ -47,11 +49,12 @@ public:
uint64_t clientid = 0);
virtual ~VRClient();
virtual void Invoke(const string &request,
continuation_t continuation);
continuation_t continuation,
error_continuation_t error_continuation = nullptr);
virtual void InvokeUnlogged(int replicaIdx,
const string &request,
continuation_t continuation,
timeout_continuation_t timeoutContinuation = nullptr,
error_continuation_t error_continuation = nullptr,
uint32_t timeout = DEFAULT_UNLOGGED_OP_TIMEOUT);
virtual void ReceiveMessage(const TransportAddress &remote,
const string &type, const string &data);
......@@ -66,24 +69,37 @@ protected:
string request;
uint64_t clientReqId;
continuation_t continuation;
timeout_continuation_t timeoutContinuation;
inline PendingRequest(string request, uint64_t clientReqId,
continuation_t continuation)
Timeout *timer;
inline PendingRequest(string request,
uint64_t clientReqId,
continuation_t continuation,
Timeout *timer)
: request(request), clientReqId(clientReqId),
continuation(continuation) { }
continuation(continuation), timer(timer) { };
inline ~PendingRequest() { delete timer; }
};
PendingRequest *pendingRequest;
PendingRequest *pendingUnloggedRequest;
Timeout *requestTimeout;
Timeout *unloggedRequestTimeout;
void SendRequest();
void ResendRequest();
struct PendingUnloggedRequest : public PendingRequest
{
error_continuation_t error_continuation;
inline PendingUnloggedRequest(string request,
uint64_t clientReqId,
continuation_t continuation,
Timeout *timer,
error_continuation_t error_continuation)
: PendingRequest(request, clientReqId, continuation, timer),
error_continuation(error_continuation) { };
};
std::unordered_map<uint64_t, PendingRequest *> pendingReqs;
void SendRequest(const PendingRequest *req);
void ResendRequest(const uint64_t reqId);
void HandleReply(const TransportAddress &remote,
const proto::ReplyMessage &msg);
void HandleUnloggedReply(const TransportAddress &remote,
const proto::UnloggedReplyMessage &msg);
void UnloggedRequestTimeoutCallback();
void UnloggedRequestTimeoutCallback(const uint64_t reqId);
};
} // namespace replication::vr
......
......@@ -349,16 +349,16 @@ void
VRReplica::ReceiveMessage(const TransportAddress &remote,
const string &type, const string &data)
{
static RequestMessage request;
static UnloggedRequestMessage unloggedRequest;
static PrepareMessage prepare;
static PrepareOKMessage prepareOK;
static CommitMessage commit;
static RequestStateTransferMessage requestStateTransfer;
static StateTransferMessage stateTransfer;
static StartViewChangeMessage startViewChange;
static DoViewChangeMessage doViewChange;
static StartViewMessage startView;
RequestMessage request;
UnloggedRequestMessage unloggedRequest;
PrepareMessage prepare;
PrepareOKMessage prepareOK;
CommitMessage commit;
RequestStateTransferMessage requestStateTransfer;
StateTransferMessage stateTransfer;
StartViewChangeMessage startViewChange;
DoViewChangeMessage doViewChange;
StartViewMessage startView;
if (type == request.GetTypeName()) {
request.ParseFromString(data);
......@@ -513,7 +513,8 @@ VRReplica::HandleUnloggedRequest(const TransportAddress &remote,
Debug("Received unlogged request %s", (char *)msg.req().op().c_str());
ExecuteUnlogged(msg.req(), reply);
reply.set_clientreqid(msg.req().clientreqid());
if (!(transport->SendMessage(this, remote, reply)))
Warning("Failed to send reply message");
}
......
......@@ -60,7 +60,7 @@ class VRApp : public AppReplica {
public:
VRApp(std::vector<string> *o, std::vector<string> *u) : ops(o), unloggedOps(u) { }
void ReplicaUpcall(opnum_t opnum, const string &req, string &reply) {
ops->push_back(req);
reply = "reply: " + req;
......@@ -71,7 +71,7 @@ public:
reply = "unlreply: " + req;
}
};
class VRTest : public ::testing::TestWithParam<int>
{
protected:
......@@ -82,7 +82,7 @@ protected:
std::vector<std::vector<string> > ops;
std::vector<std::vector<string> > unloggedOps;
int requestNum;
virtual void SetUp() {
std::vector<transport::ReplicaAddress> replicaAddrs =
{ { "localhost", "12345" },
......@@ -91,7 +91,7 @@ protected:
config = new transport::Configuration(3, 1, replicaAddrs);
transport = new SimulatedTransport();
ops.resize(config->n);
unloggedOps.resize(config->n);
......@@ -118,24 +118,24 @@ protected:
virtual string LastRequestOp() {
return RequestOp(requestNum);
}
virtual void ClientSendNext(Client::continuation_t upcall) {
requestNum++;
client->Invoke(LastRequestOp(), upcall);
}
virtual void ClientSendNextUnlogged(int idx, Client::continuation_t upcall,
Client::timeout_continuation_t timeoutContinuation = nullptr,
Client::error_continuation_t error_continuation = nullptr,
uint32_t timeout = Client::DEFAULT_UNLOGGED_OP_TIMEOUT) {
requestNum++;
client->InvokeUnlogged(idx, LastRequestOp(), upcall, timeoutContinuation, timeout);
client->InvokeUnlogged(idx, LastRequestOp(), upcall, error_continuation, timeout);
}
virtual void TearDown() {
for (auto x : replicas) {
delete x;
}
replicas.clear();
ops.clear();
unloggedOps.clear();
......@@ -157,7 +157,7 @@ TEST_P(VRTest, OneOp)
EXPECT_EQ(ops[0].back(), req);
transport->CancelAllTimers();
};
ClientSendNext(upcall);
transport->Run();
......@@ -178,10 +178,10 @@ TEST_P(VRTest, Unlogged)
transport->CancelAllTimers();
};
int timeouts = 0;
auto timeout = [&](const string &req) {
auto timeout = [&](const string &req, ErrorCode) {
timeouts++;
};
ClientSendNextUnlogged(1, upcall, timeout);
transport->Run();
......@@ -199,7 +199,7 @@ TEST_P(VRTest, UnloggedTimeout)
transport->CancelAllTimers();
};
int timeouts = 0;
auto timeout = [&](const string &req) {
auto timeout = [&](const string &req, ErrorCode) {
timeouts++;
};
......@@ -245,7 +245,7 @@ TEST_P(VRTest, ManyOps)
transport->CancelAllTimers();
}
};
ClientSendNext(upcall);
transport->Run();
......@@ -253,7 +253,7 @@ TEST_P(VRTest, ManyOps)
for (int i = 0; i < config->n; i++) {
EXPECT_EQ(10, ops[i].size());
for (int j = 0; j < 10; j++) {
EXPECT_EQ(RequestOp(j), ops[i][j]);
EXPECT_EQ(RequestOp(j), ops[i][j]);
}
}
}
......@@ -274,7 +274,7 @@ TEST_P(VRTest, FailedReplica)
transport->CancelAllTimers();
}
};
ClientSendNext(upcall);
// Drop messages to or from replica 1
......@@ -286,7 +286,7 @@ TEST_P(VRTest, FailedReplica)
}
return true;
});
transport->Run();
// By now, they all should have executed the last request.
......@@ -296,7 +296,7 @@ TEST_P(VRTest, FailedReplica)
}
EXPECT_EQ(10, ops[i].size());
for (int j = 0; j < 10; j++) {
EXPECT_EQ(RequestOp(j), ops[i][j]);
EXPECT_EQ(RequestOp(j), ops[i][j]);
}
}
}
......@@ -322,7 +322,7 @@ TEST_P(VRTest, StateTransfer)
transport->CancelAllTimers();
}
};
ClientSendNext(upcall);
// Drop messages to or from replica 1
......@@ -334,14 +334,14 @@ TEST_P(VRTest, StateTransfer)
}
return true;
});
transport->Run();
// By now, they all should have executed the last request.
for (int i = 0; i < config->n; i++) {
EXPECT_EQ(10, ops[i].size());
for (int j = 0; j < 10; j++) {
EXPECT_EQ(RequestOp(j), ops[i][j]);
EXPECT_EQ(RequestOp(j), ops[i][j]);
}
}
}
......@@ -370,9 +370,9 @@ TEST_P(VRTest, FailedLeader)
transport->CancelAllTimers();
}
};
ClientSendNext(upcall);
transport->Run();
// By now, they all should have executed the last request.
......@@ -382,7 +382,7 @@ TEST_P(VRTest, FailedLeader)
}
EXPECT_EQ(10, ops[i].size());
for (int j = 0; j < 10; j++) {
EXPECT_EQ(RequestOp(j), ops[i][j]);
EXPECT_EQ(RequestOp(j), ops[i][j]);
}
}
}
......@@ -412,11 +412,11 @@ TEST_P(VRTest, DroppedReply)
return true;
});
ClientSendNext(upcall);
transport->Run();
EXPECT_TRUE(received);
// Each replica should have executed only one request
for (int i = 0; i < config->n; i++) {
EXPECT_EQ(1, ops[i].size());
......@@ -457,18 +457,18 @@ TEST_P(VRTest, DroppedReplyThenFailedLeader)
}
return true;
});
ClientSendNext(upcall);
transport->Run();
EXPECT_TRUE(received);
// Each replica should have executed only one request
// (and actually the faulty one should too, but don't check that)
for (int i = 0; i < config->n; i++) {
if (i != 0) {
EXPECT_EQ(1, ops[i].size());
EXPECT_EQ(1, ops[i].size());
}
}
}
......@@ -477,7 +477,7 @@ TEST_P(VRTest, ManyClients)
{
const int NUM_CLIENTS = 10;
const int MAX_REQS = 100;
std::vector<VRClient *> clients;
std::vector<int> lastReq;
std::vector<Client::continuation_t> upcalls;
......@@ -522,7 +522,7 @@ TEST_P(VRTest, Stress)
const int MAX_REQS = 100;
const int MAX_DELAY = 1;
const int DROP_PROBABILITY = 10; // 1/x
std::vector<VRClient *> clients;
std::vector<int> lastReq;
std::vector<Client::continuation_t> upcalls;
......@@ -540,7 +540,7 @@ TEST_P(VRTest, Stress)
}
srand(time(NULL));
// Delay messages from clients by a random amount, and drop some
// of them
transport->AddFilter(10, [=](TransportReceiver *src, int srcIdx,
......@@ -551,7 +551,7 @@ TEST_P(VRTest, Stress)
}
return ((rand() % DROP_PROBABILITY) != 0);
});
// This could take a while; simulate two hours
transport->Timer(7200000, [&]() {
transport->CancelAllTimers();
......
syntax = "proto2";
import "replication/common/request.proto";
package replication.vr.proto;
......@@ -19,6 +21,7 @@ message UnloggedRequestMessage {
message UnloggedReplyMessage {
required bytes reply = 1;
required uint64 clientreqid = 2;
}
message PrepareMessage {
......
# How to Run
The clients and servers have to be provided a configuration file, one
for each shard and a timestamp server (for OCC). For example a 3 shard
configuration will have the following files:
shard0.config
```
f 1
replica <server-address-1>:<port>
replica <server-address-2>:<port>
replica <server-address-3>:<port>
```
shard1.config
```
f 1
replica <server-address-4>:<port>
replica <server-address-5>:<port>
replica <server-address-6>:<port>
```
shard2.config
```
f 1
replica <server-address-7>:<port>
replica <server-address-8>:<port>
replica <server-address-9>:<port>
```
shard.tss.config
```
f 1
replica <server-address-10>:<port>
replica <server-address-11>:<port>
replica <server-address-12>:<port>
```
## Running Servers
To start the replicas, run the following command with the `server`
binary for any of the stores,
`./server -c <shard-config-$n> -i <replica-number> -m <mode> -f <preload-keys>`
For each shard, you need to run `2f+1` instances of `server`
corresponding to the address:port pointed by `replica-number`.
Make sure you run all replicas for all shards.
## Running Clients
To run any of the clients in the benchmark directory,
`./client -c <shard-config-prefix> -N <n_shards> -m <mode>`
d := $(dir $(lastword $(MAKEFILE_LIST)))
SRCS += $(addprefix $(d), benchClient.cc retwisClient.cc terminalClient.cc)
OBJS-all-clients := $(OBJS-strong-client) $(OBJS-weak-client) $(OBJS-tapir-client)
$(d)benchClient: $(OBJS-all-clients) $(o)benchClient.o
$(d)retwisClient: $(OBJS-all-clients) $(o)retwisClient.o
$(d)terminalClient: $(OBJS-all-clients) $(o)terminalClient.o
BINS += $(d)benchClient $(d)retwisClient $(d)terminalClient
// -*- mode: c++; c-file-style: "k&r"; c-basic-offset: 4 -*-
/***********************************************************************
*
* store/benchmark/benchClient.cc:
* Benchmarking client for a distributed transactional store.
*
**********************************************************************/
#include "store/common/truetime.h"
#include "store/common/frontend/client.h"
#include "store/strongstore/client.h"
#include "store/weakstore/client.h"
#include "store/tapirstore/client.h"
using namespace std;
// Function to pick a random key according to some distribution.
int rand_key();
bool ready = false;
double alpha = -1;
double *zipf;
vector<string> keys;
int nKeys = 100;
int
main(int argc, char **argv)
{
const char *configPath = NULL;
const char *keysPath = NULL;
int duration = 10;
int nShards = 1;
int tLen = 10;
int wPer = 50; // Out of 100
int closestReplica = -1; // Closest replica id.
int skew = 0; // difference between real clock and TrueTime
int error = 0; // error bars
Client *client;
enum {
MODE_UNKNOWN,
MODE_TAPIR,
MODE_WEAK,
MODE_STRONG
} mode = MODE_UNKNOWN;
// Mode for strongstore.
strongstore::Mode strongmode;
int opt;
while ((opt = getopt(argc, argv, "c:d:N:l:w:k:f:m:e:s:z:r:")) != -1) {
switch (opt) {
case 'c': // Configuration path
{
configPath = optarg;
break;
}
case 'f': // Generated keys path
{
keysPath = optarg;
break;
}
case 'N': // Number of shards.
{
char *strtolPtr;
nShards = strtoul(optarg, &strtolPtr, 10);
if ((*optarg == '\0') || (*strtolPtr != '\0') ||
(nShards <= 0)) {
fprintf(stderr, "option -n requires a numeric arg\n");
}
break;
}
case 'd': // Duration in seconds to run.
{
char *strtolPtr;
duration = strtoul(optarg, &strtolPtr, 10);
if ((*optarg == '\0') || (*strtolPtr != '\0') ||
(duration <= 0)) {
fprintf(stderr, "option -n requires a numeric arg\n");
}
break;
}
case 'l': // Length of each transaction (deterministic!)
{
char *strtolPtr;
tLen = strtoul(optarg, &strtolPtr, 10);
if ((*optarg == '\0') || (*strtolPtr != '\0') ||
(tLen <= 0)) {
fprintf(stderr, "option -l requires a numeric arg\n");
}
break;
}
case 'w': // Percentage of writes (out of 100)
{
char *strtolPtr;
wPer = strtoul(optarg, &strtolPtr, 10);
if ((*optarg == '\0') || (*strtolPtr != '\0') ||
(wPer < 0 || wPer > 100)) {
fprintf(stderr, "option -w requires a arg b/w 0-100\n");
}
break;
}
case 'k': // Number of keys to operate on.
{
char *strtolPtr;
nKeys = strtoul(optarg, &strtolPtr, 10);
if ((*optarg == '\0') || (*strtolPtr != '\0') ||
(nKeys <= 0)) {
fprintf(stderr, "option -k requires a numeric arg\n");
}
break;
}
case 's': // Simulated clock skew.
{
char *strtolPtr;
skew = strtoul(optarg, &strtolPtr, 10);
if ((*optarg == '\0') || (*strtolPtr != '\0') || (skew < 0))
{
fprintf(stderr,
"option -s requires a numeric arg\n");
}
break;
}
case 'e': // Simulated clock error.
{
char *strtolPtr;
error = strtoul(optarg, &strtolPtr, 10);
if ((*optarg == '\0') || (*strtolPtr != '\0') || (error < 0))
{
fprintf(stderr,
"option -e requires a numeric arg\n");
}
break;
}
case 'z': // Zipf coefficient for key selection.
{
char *strtolPtr;
alpha = strtod(optarg, &strtolPtr);
if ((*optarg == '\0') || (*strtolPtr != '\0'))
{
fprintf(stderr,
"option -z requires a numeric arg\n");
}
break;
}
case 'r': // Preferred closest replica.
{
char *strtolPtr;
closestReplica = strtod(optarg, &strtolPtr);
if ((*optarg == '\0') || (*strtolPtr != '\0'))
{
fprintf(stderr,
"option -r requires a numeric arg\n");
}
break;
}
case 'm': // Mode to run in [occ/lock/...]
{
if (strcasecmp(optarg, "txn-l") == 0) {
mode = MODE_TAPIR;
} else if (strcasecmp(optarg, "txn-s") == 0) {
mode = MODE_TAPIR;
} else if (strcasecmp(optarg, "qw") == 0) {
mode = MODE_WEAK;
} else if (strcasecmp(optarg, "occ") == 0) {
mode = MODE_STRONG;
strongmode = strongstore::MODE_OCC;
} else if (strcasecmp(optarg, "lock") == 0) {
mode = MODE_STRONG;
strongmode = strongstore::MODE_LOCK;
} else if (strcasecmp(optarg, "span-occ") == 0) {
mode = MODE_STRONG;
strongmode = strongstore::MODE_SPAN_OCC;
} else if (strcasecmp(optarg, "span-lock") == 0) {
mode = MODE_STRONG;
strongmode = strongstore::MODE_SPAN_LOCK;
} else {
fprintf(stderr, "unknown mode '%s'\n", optarg);
exit(0);
}
break;
}
default:
fprintf(stderr, "Unknown argument %s\n", argv[optind]);
break;
}
}
if (mode == MODE_TAPIR) {
client = new tapirstore::Client(configPath, nShards,
closestReplica, TrueTime(skew, error));
} else if (mode == MODE_WEAK) {
client = new weakstore::Client(configPath, nShards,
closestReplica);
} else if (mode == MODE_STRONG) {
client = new strongstore::Client(strongmode, configPath,
nShards, closestReplica, TrueTime(skew, error));
} else {
fprintf(stderr, "option -m is required\n");
exit(0);
}
// Read in the keys from a file.
string key, value;
ifstream in;
in.open(keysPath);
if (!in) {
fprintf(stderr, "Could not read keys from: %s\n", keysPath);
exit(0);
}
for (int i = 0; i < nKeys; i++) {
getline(in, key);
keys.push_back(key);
}
in.close();
struct timeval t0, t1, t2, t3, t4;
int nTransactions = 0;
int tCount = 0;
double tLatency = 0.0;
int getCount = 0;
double getLatency = 0.0;
int putCount = 0;
double putLatency = 0.0;
int beginCount = 0;
double beginLatency = 0.0;
int commitCount = 0;
double commitLatency = 0.0;
gettimeofday(&t0, NULL);
srand(t0.tv_sec + t0.tv_usec);
while (1) {
gettimeofday(&t4, NULL);
client->Begin();
gettimeofday(&t1, NULL);
beginCount++;
beginLatency += ((t1.tv_sec - t4.tv_sec)*1000000 + (t1.tv_usec - t4.tv_usec));
for (int j = 0; j < tLen; j++) {
key = keys[rand_key()];
if (rand() % 100 < wPer) {
gettimeofday(&t3, NULL);
client->Put(key, key);
gettimeofday(&t4, NULL);
putCount++;
putLatency += ((t4.tv_sec - t3.tv_sec)*1000000 + (t4.tv_usec - t3.tv_usec));
} else {
gettimeofday(&t3, NULL);
client->Get(key, value);
gettimeofday(&t4, NULL);
getCount++;
getLatency += ((t4.tv_sec - t3.tv_sec)*1000000 + (t4.tv_usec - t3.tv_usec));
}
}
gettimeofday(&t3, NULL);
bool status = client->Commit();
gettimeofday(&t2, NULL);
commitCount++;
commitLatency += ((t2.tv_sec - t3.tv_sec)*1000000 + (t2.tv_usec - t3.tv_usec));
long latency = (t2.tv_sec - t1.tv_sec)*1000000 + (t2.tv_usec - t1.tv_usec);
fprintf(stderr, "%d %ld.%06ld %ld.%06ld %ld %d\n", nTransactions+1, t1.tv_sec,
t1.tv_usec, t2.tv_sec, t2.tv_usec, latency, status?1:0);
if (status) {
tCount++;
tLatency += latency;
}
nTransactions++;
gettimeofday(&t1, NULL);
if ( ((t1.tv_sec-t0.tv_sec)*1000000 + (t1.tv_usec-t0.tv_usec)) > duration*1000000)
break;
}
fprintf(stderr, "# Commit_Ratio: %lf\n", (double)tCount/nTransactions);
fprintf(stderr, "# Overall_Latency: %lf\n", tLatency/tCount);
fprintf(stderr, "# Begin: %d, %lf\n", beginCount, beginLatency/beginCount);
fprintf(stderr, "# Get: %d, %lf\n", getCount, getLatency/getCount);
fprintf(stderr, "# Put: %d, %lf\n", putCount, putLatency/putCount);
fprintf(stderr, "# Commit: %d, %lf\n", commitCount, commitLatency/commitCount);
return 0;
}
int rand_key()
{
if (alpha < 0) {
// Uniform selection of keys.
return (rand() % nKeys);
} else {
// Zipf-like selection of keys.
if (!ready) {
zipf = new double[nKeys];
double c = 0.0;
for (int i = 1; i <= nKeys; i++) {
c = c + (1.0 / pow((double) i, alpha));
}
c = 1.0 / c;
double sum = 0.0;
for (int i = 1; i <= nKeys; i++) {
sum += (c / pow((double) i, alpha));
zipf[i-1] = sum;
}
ready = true;
}
double random = 0.0;
while (random == 0.0 || random == 1.0) {
random = (1.0 + rand())/RAND_MAX;
}
// binary search to find key;
int l = 0, r = nKeys, mid;
while (l < r) {
mid = (l + r) / 2;
if (random > zipf[mid]) {
l = mid + 1;
} else if (random < zipf[mid]) {
r = mid - 1;
} else {
break;
}
}
return mid;
}
}
// -*- mode: c++; c-file-style: "k&r"; c-basic-offset: 4 -*-
/***********************************************************************
*
* store/benchmark/retwisClient.cc:
* Retwis benchmarking client for a distributed transactional store.
*
**********************************************************************/
#include "store/common/truetime.h"
#include "store/common/frontend/client.h"
#include "store/strongstore/client.h"
#include "store/weakstore/client.h"
#include "store/tapirstore/client.h"
#include <algorithm>
using namespace std;
// Function to pick a random key according to some distribution.
int rand_key();
bool ready = false;
double alpha = -1;
double *zipf;
vector<string> keys;
int nKeys = 100;
int
main(int argc, char **argv)
{
const char *configPath = NULL;
const char *keysPath = NULL;
int duration = 10;
int nShards = 1;
int closestReplica = -1; // Closest replica id.
int skew = 0; // difference between real clock and TrueTime
int error = 0; // error bars
Client *client;
enum {
MODE_UNKNOWN,
MODE_TAPIR,
MODE_WEAK,
MODE_STRONG
} mode = MODE_UNKNOWN;
// Mode for strongstore.
strongstore::Mode strongmode;
int opt;
while ((opt = getopt(argc, argv, "c:d:N:k:f:m:e:s:z:r:")) != -1) {
switch (opt) {
case 'c': // Configuration path
{
configPath = optarg;
break;
}
case 'f': // Generated keys path
{
keysPath = optarg;
break;
}
case 'N': // Number of shards.
{
char *strtolPtr;
nShards = strtoul(optarg, &strtolPtr, 10);
if ((*optarg == '\0') || (*strtolPtr != '\0') ||
(nShards <= 0)) {
fprintf(stderr, "option -N requires a numeric arg\n");
}
break;
}
case 'd': // Duration in seconds to run.
{
char *strtolPtr;
duration = strtoul(optarg, &strtolPtr, 10);
if ((*optarg == '\0') || (*strtolPtr != '\0') ||
(duration <= 0)) {
fprintf(stderr, "option -d requires a numeric arg\n");
}
break;
}
case 'k': // Number of keys to operate on.
{
char *strtolPtr;
nKeys = strtoul(optarg, &strtolPtr, 10);
if ((*optarg == '\0') || (*strtolPtr != '\0') ||
(nKeys <= 0)) {
fprintf(stderr, "option -k requires a numeric arg\n");
}
break;
}
case 's': // Simulated clock skew.
{
char *strtolPtr;
skew = strtoul(optarg, &strtolPtr, 10);
if ((*optarg == '\0') || (*strtolPtr != '\0') || (skew < 0))
{
fprintf(stderr,
"option -s requires a numeric arg\n");
}
break;
}
case 'e': // Simulated clock error.
{
char *strtolPtr;
error = strtoul(optarg, &strtolPtr, 10);
if ((*optarg == '\0') || (*strtolPtr != '\0') || (error < 0))
{
fprintf(stderr,
"option -e requires a numeric arg\n");
}
break;
}
case 'z': // Zipf coefficient for key selection.
{
char *strtolPtr;
alpha = strtod(optarg, &strtolPtr);
if ((*optarg == '\0') || (*strtolPtr != '\0'))
{
fprintf(stderr,
"option -z requires a numeric arg\n");
}
break;
}
case 'r': // Preferred closest replica.
{
char *strtolPtr;
closestReplica = strtod(optarg, &strtolPtr);
if ((*optarg == '\0') || (*strtolPtr != '\0'))
{
fprintf(stderr,
"option -r requires a numeric arg\n");
}
break;
}
case 'm': // Mode to run in [occ/lock/...]
{
if (strcasecmp(optarg, "txn-l") == 0) {
mode = MODE_TAPIR;
} else if (strcasecmp(optarg, "txn-s") == 0) {
mode = MODE_TAPIR;
} else if (strcasecmp(optarg, "qw") == 0) {
mode = MODE_WEAK;
} else if (strcasecmp(optarg, "occ") == 0) {
mode = MODE_STRONG;
strongmode = strongstore::MODE_OCC;
} else if (strcasecmp(optarg, "lock") == 0) {
mode = MODE_STRONG;
strongmode = strongstore::MODE_LOCK;
} else if (strcasecmp(optarg, "span-occ") == 0) {
mode = MODE_STRONG;
strongmode = strongstore::MODE_SPAN_OCC;
} else if (strcasecmp(optarg, "span-lock") == 0) {
mode = MODE_STRONG;
strongmode = strongstore::MODE_SPAN_LOCK;
} else {
fprintf(stderr, "unknown mode '%s'\n", optarg);
exit(0);
}
break;
}
default:
fprintf(stderr, "Unknown argument %s\n", argv[optind]);
break;
}
}
if (mode == MODE_TAPIR) {
client = new tapirstore::Client(configPath, nShards,
closestReplica, TrueTime(skew, error));
} else if (mode == MODE_WEAK) {
client = new weakstore::Client(configPath, nShards,
closestReplica);
} else if (mode == MODE_STRONG) {
client = new strongstore::Client(strongmode, configPath,
nShards, closestReplica, TrueTime(skew, error));
} else {
fprintf(stderr, "option -m is required\n");
exit(0);
}
// Read in the keys from a file.
string key, value;
ifstream in;
in.open(keysPath);
if (!in) {
fprintf(stderr, "Could not read keys from: %s\n", keysPath);
exit(0);
}
for (int i = 0; i < nKeys; i++) {
getline(in, key);
keys.push_back(key);
}
in.close();
struct timeval t0, t1, t2;
int nTransactions = 0; // Number of transactions attempted.
int ttype; // Transaction type.
int ret;
bool status;
vector<int> keyIdx;
gettimeofday(&t0, NULL);
srand(t0.tv_sec + t0.tv_usec);
while (1) {
keyIdx.clear();
// Begin a transaction.
client->Begin();
gettimeofday(&t1, NULL);
status = true;
// Decide which type of retwis transaction it is going to be.
ttype = rand() % 100;
if (ttype < 5) {
// 5% - Add user transaction. 1,3
keyIdx.push_back(rand_key());
keyIdx.push_back(rand_key());
keyIdx.push_back(rand_key());
sort(keyIdx.begin(), keyIdx.end());
if ((ret = client->Get(keys[keyIdx[0]], value))) {
Warning("Aborting due to %s %d", keys[keyIdx[0]].c_str(), ret);
status = false;
}
for (int i = 0; i < 3 && status; i++) {
client->Put(keys[keyIdx[i]], keys[keyIdx[i]]);
}
ttype = 1;
} else if (ttype < 20) {
// 15% - Follow/Unfollow transaction. 2,2
keyIdx.push_back(rand_key());
keyIdx.push_back(rand_key());
sort(keyIdx.begin(), keyIdx.end());
for (int i = 0; i < 2 && status; i++) {
if ((ret = client->Get(keys[keyIdx[i]], value))) {
Warning("Aborting due to %s %d", keys[keyIdx[i]].c_str(), ret);
status = false;
}
client->Put(keys[keyIdx[i]], keys[keyIdx[i]]);
}
ttype = 2;
} else if (ttype < 50) {
// 30% - Post tweet transaction. 3,5
keyIdx.push_back(rand_key());
keyIdx.push_back(rand_key());
keyIdx.push_back(rand_key());
keyIdx.push_back(rand_key());
keyIdx.push_back(rand_key());
sort(keyIdx.begin(), keyIdx.end());
for (int i = 0; i < 3 && status; i++) {
if ((ret = client->Get(keys[keyIdx[i]], value))) {
Warning("Aborting due to %s %d", keys[keyIdx[i]].c_str(), ret);
status = false;
}
client->Put(keys[keyIdx[i]], keys[keyIdx[i]]);
}
for (int i = 0; i < 2; i++) {
client->Put(keys[keyIdx[i+3]], keys[keyIdx[i+3]]);
}
ttype = 3;
} else {
// 50% - Get followers/timeline transaction. rand(1,10),0
int nGets = 1 + rand() % 10;
for (int i = 0; i < nGets; i++) {
keyIdx.push_back(rand_key());
}
sort(keyIdx.begin(), keyIdx.end());
for (int i = 0; i < nGets && status; i++) {
if ((ret = client->Get(keys[keyIdx[i]], value))) {
Warning("Aborting due to %s %d", keys[keyIdx[i]].c_str(), ret);
status = false;
}
}
ttype = 4;
}
if (status) {
status = client->Commit();
} else {
Debug("Aborting transaction due to failed Read");
}
gettimeofday(&t2, NULL);
long latency = (t2.tv_sec - t1.tv_sec) * 1000000 + (t2.tv_usec - t1.tv_usec);
int retries = 0;
if (!client->Stats().empty()) {
retries = client->Stats()[0];
}
fprintf(stderr, "%d %ld.%06ld %ld.%06ld %ld %d %d %d", ++nTransactions, t1.tv_sec,
t1.tv_usec, t2.tv_sec, t2.tv_usec, latency, status?1:0, ttype, retries);
fprintf(stderr, "\n");
if (((t2.tv_sec-t0.tv_sec)*1000000 + (t2.tv_usec-t0.tv_usec)) > duration*1000000)
break;
}
fprintf(stderr, "# Client exiting..\n");
return 0;
}
int rand_key()
{
if (alpha < 0) {
// Uniform selection of keys.
return (rand() % nKeys);
} else {
// Zipf-like selection of keys.
if (!ready) {
zipf = new double[nKeys];
double c = 0.0;
for (int i = 1; i <= nKeys; i++) {
c = c + (1.0 / pow((double) i, alpha));
}
c = 1.0 / c;
double sum = 0.0;
for (int i = 1; i <= nKeys; i++) {
sum += (c / pow((double) i, alpha));
zipf[i-1] = sum;
}
ready = true;
}
double random = 0.0;
while (random == 0.0 || random == 1.0) {
random = (1.0 + rand())/RAND_MAX;
}
// binary search to find key;
int l = 0, r = nKeys, mid;
while (l < r) {
mid = (l + r) / 2;
if (random > zipf[mid]) {
l = mid + 1;
} else if (random < zipf[mid]) {
r = mid - 1;
} else {
break;
}
}
return mid;
}
}
// -*- mode: c++; c-file-style: "k&r"; c-basic-offset: 4 -*-
/***********************************************************************
*
* bench/terminal.cc:
* A terminal client for a distributed transactional store.
*
**********************************************************************/
#include "store/common/truetime.h"
#include "store/common/frontend/client.h"
#include "store/strongstore/client.h"
#include "store/weakstore/client.h"
#include "store/tapirstore/client.h"
using namespace std;
int
main(int argc, char **argv)
{
const char *configPath = NULL;
int nShards = 1;
int closestReplica = -1; // Closest replica id.
Client *client;
enum {
MODE_UNKNOWN,
MODE_TAPIR,
MODE_WEAK,
MODE_STRONG
} mode = MODE_UNKNOWN;
// Mode for strongstore.
strongstore::Mode strongmode;
int opt;
while ((opt = getopt(argc, argv, "c:N:m:r:")) != -1) {
switch (opt) {
case 'c': // Configuration path
{
configPath = optarg;
break;
}
case 'N': // Number of shards.
{
char *strtolPtr;
nShards = strtoul(optarg, &strtolPtr, 10);
if ((*optarg == '\0') || (*strtolPtr != '\0') ||
(nShards <= 0)) {
fprintf(stderr, "option -n requires a numeric arg\n");
}
break;
}
case 'm': // Mode to run in [occ/lock/...]
{
if (strcasecmp(optarg, "txn-l") == 0) {
mode = MODE_TAPIR;
} else if (strcasecmp(optarg, "txn-s") == 0) {
mode = MODE_TAPIR;
} else if (strcasecmp(optarg, "qw") == 0) {
mode = MODE_WEAK;
} else if (strcasecmp(optarg, "occ") == 0) {
mode = MODE_STRONG;
strongmode = strongstore::MODE_OCC;
} else if (strcasecmp(optarg, "lock") == 0) {
mode = MODE_STRONG;
strongmode = strongstore::MODE_LOCK;
} else if (strcasecmp(optarg, "span-occ") == 0) {
mode = MODE_STRONG;
strongmode = strongstore::MODE_SPAN_OCC;
} else if (strcasecmp(optarg, "span-lock") == 0) {
mode = MODE_STRONG;
strongmode = strongstore::MODE_SPAN_LOCK;
} else {
fprintf(stderr, "unknown mode '%s'\n", optarg);
exit(0);
}
break;
}
case 'r':
{
char *strtolPtr;
closestReplica = strtod(optarg, &strtolPtr);
if ((*optarg == '\0') || (*strtolPtr != '\0'))
{
fprintf(stderr,
"option -r requires a numeric arg\n");
}
break;
}
default:
fprintf(stderr, "Unknown argument %s\n", argv[optind]);
break;
}
}
if (mode == MODE_TAPIR) {
client = new tapirstore::Client(configPath, nShards,
closestReplica, TrueTime(0, 0));
} else if (mode == MODE_WEAK) {
client = new weakstore::Client(configPath, nShards,
closestReplica);
} else if (mode == MODE_STRONG) {
client = new strongstore::Client(strongmode, configPath,
nShards, closestReplica, TrueTime(0, 0));
} else {
fprintf(stderr, "option -m is required\n");
exit(0);
}
char c, cmd[2048], *tok;
int clen, status;
string key, value;
while (1) {
printf(">> ");
fflush(stdout);
clen = 0;
while ((c = getchar()) != '\n')
cmd[clen++] = c;
cmd[clen] = '\0';
if (clen == 0) continue;
tok = strtok(cmd, " ,.-");
if (strcasecmp(tok, "exit") == 0 || strcasecmp(tok, "q") == 0) {
printf("Exiting..\n");
break;
} else if (strcasecmp(tok, "get") == 0) {
tok = strtok(NULL, " ,.-");
key = string(tok);
status = client->Get(key, value);
if (status == 0) {
printf("%s -> %s\n", key.c_str(), value.c_str());
} else {
printf("Error in retrieving value\n");
}
} else if (strcasecmp(tok, "put") == 0) {
tok = strtok(NULL, " ,.-");
key = string(tok);
tok = strtok(NULL, " ,.-");
value = string(tok);
client->Put(key, value);
} else if (strcasecmp(tok, "begin") == 0) {
client->Begin();
} else if (strcasecmp(tok, "commit") == 0) {
bool status = client->Commit();
if (status) {
printf("Commit succeeded..\n");
} else {
printf("Commit failed..\n");
}
} else {
printf("Unknown command.. Try again!\n");
}
fflush(stdout);
}
exit(0);
return 0;
}
d := $(dir $(lastword $(MAKEFILE_LIST)))
SRCS += $(addprefix $(d), \
timestamp.cc transaction.cc promise.cc tracer.cc truetime.cc)
SRCS += $(addprefix $(d), promise.cc timestamp.cc tracer.cc \
transaction.cc truetime.cc)
PROTOS += $(addprefix $(d), common-proto.proto)
LIB-common := $(o)timestamp.o $(o)transaction.o $(o)promise.o $(o)truetime.o $(o)common-proto.o $(o)tracer.o
LIB-store-common := $(o)common-proto.o $(o)promise.o $(o)timestamp.o \
$(o)tracer.o $(o)transaction.o $(o)truetime.o
include store/common/frontend/Rules.mk
include store/common/backend/Rules.mk
include $(d)backend/Rules.mk $(d)frontend/Rules.mk
d := $(dir $(lastword $(MAKEFILE_LIST)))
SRCS += $(addprefix $(d), \
kvstore.cc lockserver.cc versionstore.cc)
kvstore.cc lockserver.cc txnstore.cc versionstore.cc)
LIB-backend := $(o)kvstore.o $(o)versionstore.o $(o)lockserver.o
LIB-store-backend := $(o)kvstore.o $(o)lockserver.o $(o)txnstore.o $(o)versionstore.o
include $(d)tests/Rules.mk
// -*- mode: c++; c-file-style: "k&r"; c-basic-offset: 4 -*-
// vim: set ts=4 sw=4:
/***********************************************************************
*
* common/kvstore.cc:
* Simple versioned key-value store
*
* Copyright 2015 Irene Zhang <iyzhang@cs.washington.edu>
*
* Permission is hereby granted, free of charge, to any person
* obtaining a copy of this software and associated documentation
* files (the "Software"), to deal in the Software without
* restriction, including without limitation the rights to use, copy,
* modify, merge, publish, distribute, sublicense, and/or sell copies
* of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be
* included in all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
* MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
* NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
* BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
* ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
* CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*
**********************************************************************/
#include "kvstore.h"
#include "store/common/backend/kvstore.h"
using namespace std;
......
// -*- mode: c++; c-file-style: "k&r"; c-basic-offset: 4 -*-
// vim: set ts=4 sw=4:
/***********************************************************************
*
* common/kvstore.h:
* store/common/backend/kvstore.h:
* Simple versioned key-value store
*
* Copyright 2015 Irene Zhang <iyzhang@cs.washington.edu>
*
* Permission is hereby granted, free of charge, to any person
* obtaining a copy of this software and associated documentation
* files (the "Software"), to deal in the Software without
* restriction, including without limitation the rights to use, copy,
* modify, merge, publish, distribute, sublicense, and/or sell copies
* of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be
* included in all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
* MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
* NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
* BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
* ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
* CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*
**********************************************************************/
#ifndef _KV_STORE_H_
......@@ -12,11 +33,8 @@
#include "lib/assert.h"
#include "lib/message.h"
#include <string>
#include <unordered_map>
#include <vector>
#include <fstream>
#include <iostream>
#include <list>
class KVStore
......
// -*- mode: c++; c-file-style: "k&r"; c-basic-offset: 4 -*-
// vim: set ts=4 sw=4:
/***********************************************************************
*
* spanstore/lockserver.cc:
......@@ -7,12 +6,10 @@
*
**********************************************************************/
#include "lockserver.h"
#include "store/common/backend/lockserver.h"
using namespace std;
namespace spanstore {
LockServer::LockServer()
{
readers = 0;
......@@ -280,4 +277,3 @@ LockServer::releaseForWrite(const string &lock, uint64_t holder)
}
}
} // namespace spanstore