Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • syslab/tapir
  • aaasz/tapir
  • ashmrtnz/tapir
3 results
Show changes
Showing
with 994 additions and 339 deletions
......@@ -38,6 +38,8 @@
#include "replication/ir/ir-proto.pb.h"
#include <functional>
#include <map>
#include <memory>
#include <set>
#include <unordered_map>
......@@ -47,104 +49,173 @@ namespace ir {
class IRClient : public Client
{
public:
typedef std::function<string (const std::set<string> &)> decide_t;
using result_set_t = std::map<string, std::size_t>;
using decide_t = std::function<string(const result_set_t &)>;
IRClient(const transport::Configuration &config,
Transport *transport,
uint64_t clientid = 0);
virtual ~IRClient();
virtual void Invoke(const string &request,
continuation_t continuation);
virtual void InvokeInconsistent(const string &request,
continuation_t continuation);
virtual void InvokeConsensus(const string &request,
decide_t decide,
continuation_t continuation);
virtual void InvokeUnlogged(int replicaIdx,
const string &request,
continuation_t continuation,
timeout_continuation_t timeoutContinuation = nullptr,
uint32_t timeout = DEFAULT_UNLOGGED_OP_TIMEOUT);
virtual void ReceiveMessage(const TransportAddress &remote,
const string &type, const string &data);
virtual void Invoke(
const string &request,
continuation_t continuation,
error_continuation_t error_continuation = nullptr) override;
virtual void InvokeUnlogged(
int replicaIdx,
const string &request,
continuation_t continuation,
error_continuation_t error_continuation = nullptr,
uint32_t timeout = DEFAULT_UNLOGGED_OP_TIMEOUT) override;
virtual void ReceiveMessage(
const TransportAddress &remote,
const string &type,
const string &data) override;
virtual void InvokeInconsistent(
const string &request,
continuation_t continuation,
error_continuation_t error_continuation = nullptr);
virtual void InvokeConsensus(
const string &request,
decide_t decide,
continuation_t continuation,
error_continuation_t error_continuation = nullptr);
protected:
struct PendingRequest
{
struct PendingRequest {
string request;
uint64_t clientReqId;
continuation_t continuation;
bool continuationInvoked = false;
Timeout *timer;
QuorumSet<viewstamp_t, proto::ConfirmMessage> confirmQuorum;
inline PendingRequest(string request,
uint64_t clientReqId,
continuation_t continuation,
Timeout *timer,
int quorumSize) :
request(request), clientReqId(clientReqId),
continuation(continuation), timer(timer),
confirmQuorum(quorumSize) { };
inline ~PendingRequest() { delete timer; };
};
continuation_t continuation;
bool continuationInvoked = false;
std::unique_ptr<Timeout> timer;
QuorumSet<viewstamp_t, proto::ConfirmMessage> confirmQuorum;
struct PendingUnloggedRequest : public PendingRequest
{
timeout_continuation_t timeoutContinuation;
inline PendingUnloggedRequest(string request,
uint64_t clientReqId,
continuation_t continuation,
timeout_continuation_t timeoutContinuation,
Timeout *timer) :
PendingRequest(request, clientReqId,
continuation, timer, 1),
timeoutContinuation(timeoutContinuation) { };
inline PendingRequest(string request, uint64_t clientReqId,
continuation_t continuation,
std::unique_ptr<Timeout> timer, int quorumSize)
: request(request),
clientReqId(clientReqId),
continuation(continuation),
timer(std::move(timer)),
confirmQuorum(quorumSize){};
virtual ~PendingRequest(){};
};
struct PendingInconsistentRequest : public PendingRequest
{
QuorumSet<viewstamp_t, proto::ReplyInconsistentMessage> inconsistentReplyQuorum;
inline PendingInconsistentRequest(string request,
uint64_t clientReqId,
continuation_t continuation,
Timeout *timer,
int quorumSize) :
PendingRequest(request, clientReqId,
continuation, timer, quorumSize),
inconsistentReplyQuorum(quorumSize) { };
struct PendingUnloggedRequest : public PendingRequest {
error_continuation_t error_continuation;
inline PendingUnloggedRequest(
string request, uint64_t clientReqId, continuation_t continuation,
error_continuation_t error_continuation,
std::unique_ptr<Timeout> timer)
: PendingRequest(request, clientReqId, continuation,
std::move(timer), 1),
error_continuation(error_continuation){};
};
struct PendingConsensusRequest : public PendingRequest
{
QuorumSet<viewstamp_t, proto::ReplyConsensusMessage> consensusReplyQuorum;
decide_t decide;
string decideResult;
inline PendingConsensusRequest(string request,
uint64_t clientReqId,
continuation_t continuation,
Timeout *timer,
int quorumSize,
int superQuorum,
decide_t decide) :
PendingRequest(request, clientReqId,
continuation, timer, quorumSize),
consensusReplyQuorum(superQuorum),
decide(decide) { };
struct PendingInconsistentRequest : public PendingRequest {
QuorumSet<viewstamp_t, proto::ReplyInconsistentMessage>
inconsistentReplyQuorum;
inline PendingInconsistentRequest(string request, uint64_t clientReqId,
continuation_t continuation,
std::unique_ptr<Timeout> timer,
int quorumSize)
: PendingRequest(request, clientReqId, continuation,
std::move(timer), quorumSize),
inconsistentReplyQuorum(quorumSize){};
};
struct PendingConsensusRequest : public PendingRequest {
QuorumSet<opnum_t, proto::ReplyConsensusMessage> consensusReplyQuorum;
decide_t decide;
string decideResult;
const std::size_t quorumSize;
const std::size_t superQuorumSize;
bool on_slow_path;
error_continuation_t error_continuation;
// The timer to give up on the fast path and transition to the slow
// path. After this timer is run for the first time, it is nulled.
std::unique_ptr<Timeout> transition_to_slow_path_timer;
// The view for which a majority result (or finalized result) was
// found. The view of a majority of confirms must match this view.
uint64_t reply_consensus_view = 0;
// True when a consensus request has already received a quorum or super
// quorum of replies and has already transitioned into the confirm
// phase.
bool sent_confirms = false;
inline PendingConsensusRequest(
string request, uint64_t clientReqId, continuation_t continuation,
std::unique_ptr<Timeout> timer,
std::unique_ptr<Timeout> transition_to_slow_path_timer,
int quorumSize, int superQuorum, decide_t decide,
error_continuation_t error_continuation)
: PendingRequest(request, clientReqId, continuation,
std::move(timer), quorumSize),
consensusReplyQuorum(quorumSize),
decide(decide),
quorumSize(quorumSize),
superQuorumSize(superQuorum),
on_slow_path(false),
error_continuation(error_continuation),
transition_to_slow_path_timer(
std::move(transition_to_slow_path_timer)){};
};
uint64_t view;
uint64_t lastReqId;
std::unordered_map<uint64_t, PendingRequest *> pendingReqs;
void SendInconsistent(const PendingInconsistentRequest *req);
void ResendInconsistent(const uint64_t reqId);
void ConsensusSlowPath(const uint64_t reqId);
void ResendInconsistent(const uint64_t reqId);
void SendConsensus(const PendingConsensusRequest *req);
void ResendConsensus(const uint64_t reqId);
// `TransitionToConsensusSlowPath` is called after a timeout to end the
// possibility of taking the fast path and transition into taking the slow
// path.
void TransitionToConsensusSlowPath(const uint64_t reqId);
// HandleSlowPathConsensus is called in one of two scenarios:
//
// 1. A finalized ReplyConsensusMessage was received. In this case, we
// immediately enter the slow path and use the finalized result. If
// finalized is true, req has already been populated with the
// finalized result.
// 2. We're in the slow path and receive a majority of
// ReplyConsensusMessages in the same view. In this case, we call
// decide to determine the final result.
//
// In either case, HandleSlowPathConsensus intitiates the finalize phase of
// a consensus request.
void HandleSlowPathConsensus(
const uint64_t reqid,
const std::map<int, proto::ReplyConsensusMessage> &msgs,
const bool finalized_result_found,
PendingConsensusRequest *req);
// HandleFastPathConsensus is called when we're on the fast path and
// receive a super quorum of responses from the same view.
// HandleFastPathConsensus will check to see if there is a superquorum of
// matching responses. If there is, it will return to the user and
// asynchronously intitiate the finalize phase of a consensus request.
// Otherwise, it transitions into the slow path which will also initiate
// the finalize phase of a consensus request, but not yet return to the
// user.
void HandleFastPathConsensus(
const uint64_t reqid,
const std::map<int, proto::ReplyConsensusMessage> &msgs,
PendingConsensusRequest *req);
void ResendConfirmation(const uint64_t reqId, bool isConsensus);
void HandleInconsistentReply(const TransportAddress &remote,
const proto::ReplyInconsistentMessage &msg);
void HandleConsensusReply(const TransportAddress &remote,
const proto::ReplyConsensusMessage &msg);
const proto::ReplyConsensusMessage &msg);
void HandleConfirm(const TransportAddress &remote,
const proto::ConfirmMessage &msg);
void HandleUnloggedReply(const TransportAddress &remote,
......
syntax = "proto2";
import "replication/common/request.proto";
package replication.ir.proto;
......@@ -7,6 +9,40 @@ message OpID {
required uint64 clientreqid = 2;
}
// For the view change and recovery protocol, a replica stores two things on
// disk: (1) its current view and (2) the latest view during which it was
// NORMAL. Replicas pack this information into this proto buf and serialize it
// to disk.
message PersistedViewInfo {
required uint64 view = 1;
required uint64 latest_normal_view = 2;
}
enum RecordEntryState {
RECORD_STATE_TENTATIVE = 0;
RECORD_STATE_FINALIZED = 1;
}
enum RecordEntryType {
RECORD_TYPE_INCONSISTENT = 0;
RECORD_TYPE_CONSENSUS = 1;
}
message RecordEntryProto {
required uint64 view = 1;
required OpID opid = 2;
required RecordEntryState state = 3;
required RecordEntryType type = 4;
required bytes op = 5;
required bytes result = 6;
}
// TODO: Currently, replicas send entire records to one another. Figure out if
// there is a more efficient way to exchange records.
message RecordProto {
repeated RecordEntryProto entry = 1;
}
message ProposeInconsistentMessage {
required replication.Request req = 1;
}
......@@ -15,6 +51,7 @@ message ReplyInconsistentMessage {
required uint64 view = 1;
required uint32 replicaIdx = 2;
required OpID opid = 3;
required bool finalized = 4;
}
message FinalizeInconsistentMessage {
......@@ -36,6 +73,7 @@ message ReplyConsensusMessage {
required uint32 replicaIdx = 2;
required OpID opid = 3;
required bytes result = 4;
required bool finalized = 5;
}
message FinalizeConsensusMessage {
......@@ -43,6 +81,19 @@ message FinalizeConsensusMessage {
required bytes result = 2;
}
message DoViewChangeMessage {
required uint32 replicaIdx = 1;
// record is optional because a replica only sends its record to the
// leader of the new view.
optional RecordProto record = 2;
required uint64 new_view = 3;
required uint64 latest_normal_view = 4;
}
message StartViewMessage {
required RecordProto record = 1;
required uint64 new_view = 2;
}
message UnloggedRequestMessage {
required replication.UnloggedRequest req = 1;
}
......
......@@ -29,35 +29,52 @@
**********************************************************************/
#include "replication/ir/record.h"
#include <utility>
#include "lib/assert.h"
namespace replication {
namespace ir {
Record::Record(const proto::RecordProto &record_proto) {
for (const proto::RecordEntryProto &entry_proto : record_proto.entry()) {
const view_t view = entry_proto.view();
const opid_t opid = std::make_pair(entry_proto.opid().clientid(),
entry_proto.opid().clientreqid());
Request request;
request.set_op(entry_proto.op());
request.set_clientid(entry_proto.opid().clientid());
request.set_clientreqid(entry_proto.opid().clientreqid());
proto::RecordEntryState state = entry_proto.state();
proto::RecordEntryType type = entry_proto.type();
const std::string& result = entry_proto.result();
Add(view, opid, request, state, type, result);
}
}
RecordEntry &
Record::Add(const RecordEntry& entry) {
// Make sure this isn't a duplicate
ASSERT(entries.count(entry.opid) == 0);
entries[entry.opid] = entry;
return entries[entry.opid];
}
RecordEntry &
Record::Add(view_t view, opid_t opid, const Request &request,
RecordEntryState state)
proto::RecordEntryState state, proto::RecordEntryType type)
{
RecordEntry entry;
entry.view = view;
entry.opid = opid;
entry.request = request;
entry.state = state;
// Make sure this isn't a duplicate
ASSERT(entries.count(opid) == 0);
entries[opid] = entry;
return entries[opid];
return Add(RecordEntry(view, opid, state, type, request, ""));
}
RecordEntry &
Record::Add(view_t view, opid_t opid, const Request &request,
RecordEntryState state, const string &result)
proto::RecordEntryState state, proto::RecordEntryType type,
const string &result)
{
RecordEntry entry = Add(view, opid, request, state);
RecordEntry &entry = Add(view, opid, request, state, type);
entry.result = result;
return entries[opid];
}
......@@ -76,7 +93,7 @@ Record::Find(opid_t opid)
bool
Record::SetStatus(opid_t op, RecordEntryState state)
Record::SetStatus(opid_t op, proto::RecordEntryState state)
{
RecordEntry *entry = Find(op);
if (entry == NULL) {
......@@ -116,12 +133,33 @@ Record::Remove(opid_t opid)
{
entries.erase(opid);
}
bool
Record::Empty() const
{
return entries.empty();
}
void
Record::ToProto(proto::RecordProto *proto) const
{
for (const std::pair<const opid_t, RecordEntry> &p : entries) {
const RecordEntry &entry = p.second;
proto::RecordEntryProto *entry_proto = proto->add_entry();
entry_proto->set_view(entry.view);
entry_proto->mutable_opid()->set_clientid(entry.opid.first);
entry_proto->mutable_opid()->set_clientreqid(entry.opid.second);
entry_proto->set_state(entry.state);
entry_proto->set_type(entry.type);
entry_proto->set_op(entry.request.op());
entry_proto->set_result(entry.result);
}
}
const std::map<opid_t, RecordEntry> &Record::Entries() const {
return entries;
}
} // namespace ir
} // namespace replication
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
......@@ -49,11 +49,12 @@ public:
uint64_t clientid = 0);
virtual ~VRClient();
virtual void Invoke(const string &request,
continuation_t continuation);
continuation_t continuation,
error_continuation_t error_continuation = nullptr);
virtual void InvokeUnlogged(int replicaIdx,
const string &request,
continuation_t continuation,
timeout_continuation_t timeoutContinuation = nullptr,
error_continuation_t error_continuation = nullptr,
uint32_t timeout = DEFAULT_UNLOGGED_OP_TIMEOUT);
virtual void ReceiveMessage(const TransportAddress &remote,
const string &type, const string &data);
......@@ -80,14 +81,14 @@ protected:
struct PendingUnloggedRequest : public PendingRequest
{
timeout_continuation_t timeoutContinuation;
error_continuation_t error_continuation;
inline PendingUnloggedRequest(string request,
uint64_t clientReqId,
continuation_t continuation,
Timeout *timer,
timeout_continuation_t timeoutContinuation)
error_continuation_t error_continuation)
: PendingRequest(request, clientReqId, continuation, timer),
timeoutContinuation(timeoutContinuation) { };
error_continuation(error_continuation) { };
};
std::unordered_map<uint64_t, PendingRequest *> pendingReqs;
......
......@@ -349,16 +349,16 @@ void
VRReplica::ReceiveMessage(const TransportAddress &remote,
const string &type, const string &data)
{
static RequestMessage request;
static UnloggedRequestMessage unloggedRequest;
static PrepareMessage prepare;
static PrepareOKMessage prepareOK;
static CommitMessage commit;
static RequestStateTransferMessage requestStateTransfer;
static StateTransferMessage stateTransfer;
static StartViewChangeMessage startViewChange;
static DoViewChangeMessage doViewChange;
static StartViewMessage startView;
RequestMessage request;
UnloggedRequestMessage unloggedRequest;
PrepareMessage prepare;
PrepareOKMessage prepareOK;
CommitMessage commit;
RequestStateTransferMessage requestStateTransfer;
StateTransferMessage stateTransfer;
StartViewChangeMessage startViewChange;
DoViewChangeMessage doViewChange;
StartViewMessage startView;
if (type == request.GetTypeName()) {
request.ParseFromString(data);
......@@ -513,7 +513,8 @@ VRReplica::HandleUnloggedRequest(const TransportAddress &remote,
Debug("Received unlogged request %s", (char *)msg.req().op().c_str());
ExecuteUnlogged(msg.req(), reply);
reply.set_clientreqid(msg.req().clientreqid());
if (!(transport->SendMessage(this, remote, reply)))
Warning("Failed to send reply message");
}
......
This diff is collapsed.
syntax = "proto2";
import "replication/common/request.proto";
package replication.vr.proto;
......
This diff is collapsed.
......@@ -33,7 +33,7 @@
#include "lib/assert.h"
#include "lib/message.h"
#include <string>
#include <unordered_map>
#include <list>
......
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.