Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • syslab/tapir
  • aaasz/tapir
  • ashmrtnz/tapir
3 results
Show changes
Showing
with 1458 additions and 617 deletions
......@@ -12,7 +12,7 @@ OBJS-ir-client := $(o)ir-proto.o $(o)client.o \
OBJS-ir-replica := $(o)record.o $(o)replica.o $(o)ir-proto.o \
$(OBJS-replica) $(LIB-message) \
$(LIB-configuration)
$(LIB-configuration) $(LIB-persistent_register)
include $(d)tests/Rules.mk
This diff is collapsed.
......@@ -38,7 +38,10 @@
#include "replication/ir/ir-proto.pb.h"
#include <functional>
#include <map>
#include <memory>
#include <set>
#include <unordered_map>
namespace replication {
namespace ir {
......@@ -46,69 +49,178 @@ namespace ir {
class IRClient : public Client
{
public:
typedef std::function<string (const std::set<string> &)> decide_t;
using result_set_t = std::map<string, std::size_t>;
using decide_t = std::function<string(const result_set_t &)>;
IRClient(const transport::Configuration &config,
Transport *transport,
uint64_t clientid = 0);
virtual ~IRClient();
virtual void Invoke(const string &request,
continuation_t continuation);
virtual void InvokeInconsistent(const string &request,
continuation_t continuation);
virtual void InvokeConsensus(const string &request,
decide_t decide,
continuation_t continuation);
virtual void InvokeUnlogged(int replicaIdx,
const string &request,
continuation_t continuation,
timeout_continuation_t timeoutContinuation = nullptr,
uint32_t timeout = DEFAULT_UNLOGGED_OP_TIMEOUT);
virtual void ReceiveMessage(const TransportAddress &remote,
const string &type, const string &data);
protected:
uint64_t view;
uint64_t lastReqId;
QuorumSet<viewstamp_t, proto::ReplyInconsistentMessage> inconsistentReplyQuorum;
QuorumSet<viewstamp_t, proto::ReplyConsensusMessage> consensusReplyQuorum;
QuorumSet<viewstamp_t, proto::ConfirmMessage> confirmQuorum;
virtual void Invoke(
const string &request,
continuation_t continuation,
error_continuation_t error_continuation = nullptr) override;
virtual void InvokeUnlogged(
int replicaIdx,
const string &request,
continuation_t continuation,
error_continuation_t error_continuation = nullptr,
uint32_t timeout = DEFAULT_UNLOGGED_OP_TIMEOUT) override;
virtual void ReceiveMessage(
const TransportAddress &remote,
const string &type,
const string &data) override;
virtual void InvokeInconsistent(
const string &request,
continuation_t continuation,
error_continuation_t error_continuation = nullptr);
virtual void InvokeConsensus(
const string &request,
decide_t decide,
continuation_t continuation,
error_continuation_t error_continuation = nullptr);
struct PendingRequest
{
protected:
struct PendingRequest {
string request;
string decideReq;
uint64_t clientReqId;
decide_t decide;
continuation_t continuation;
timeout_continuation_t timeoutContinuation;
bool continuationInvoked = false;
std::unique_ptr<Timeout> timer;
QuorumSet<viewstamp_t, proto::ConfirmMessage> confirmQuorum;
inline PendingRequest(string request, uint64_t clientReqId,
continuation_t continuation)
: request(request), clientReqId(clientReqId),
continuation(continuation) { }
continuation_t continuation,
std::unique_ptr<Timeout> timer, int quorumSize)
: request(request),
clientReqId(clientReqId),
continuation(continuation),
timer(std::move(timer)),
confirmQuorum(quorumSize){};
virtual ~PendingRequest(){};
};
PendingRequest *pendingInconsistentRequest;
PendingRequest *pendingConsensusRequest;
PendingRequest *pendingUnloggedRequest;
Timeout *inconsistentRequestTimeout;
Timeout *consensusRequestTimeout;
Timeout *confirmationTimeout;
Timeout *unloggedRequestTimeout;
void SendInconsistent();
void ResendInconsistent();
void ConsensusSlowPath();
void ResendConfirmation();
struct PendingUnloggedRequest : public PendingRequest {
error_continuation_t error_continuation;
inline PendingUnloggedRequest(
string request, uint64_t clientReqId, continuation_t continuation,
error_continuation_t error_continuation,
std::unique_ptr<Timeout> timer)
: PendingRequest(request, clientReqId, continuation,
std::move(timer), 1),
error_continuation(error_continuation){};
};
struct PendingInconsistentRequest : public PendingRequest {
QuorumSet<viewstamp_t, proto::ReplyInconsistentMessage>
inconsistentReplyQuorum;
inline PendingInconsistentRequest(string request, uint64_t clientReqId,
continuation_t continuation,
std::unique_ptr<Timeout> timer,
int quorumSize)
: PendingRequest(request, clientReqId, continuation,
std::move(timer), quorumSize),
inconsistentReplyQuorum(quorumSize){};
};
struct PendingConsensusRequest : public PendingRequest {
QuorumSet<opnum_t, proto::ReplyConsensusMessage> consensusReplyQuorum;
decide_t decide;
string decideResult;
const std::size_t quorumSize;
const std::size_t superQuorumSize;
bool on_slow_path;
error_continuation_t error_continuation;
// The timer to give up on the fast path and transition to the slow
// path. After this timer is run for the first time, it is nulled.
std::unique_ptr<Timeout> transition_to_slow_path_timer;
// The view for which a majority result (or finalized result) was
// found. The view of a majority of confirms must match this view.
uint64_t reply_consensus_view = 0;
// True when a consensus request has already received a quorum or super
// quorum of replies and has already transitioned into the confirm
// phase.
bool sent_confirms = false;
inline PendingConsensusRequest(
string request, uint64_t clientReqId, continuation_t continuation,
std::unique_ptr<Timeout> timer,
std::unique_ptr<Timeout> transition_to_slow_path_timer,
int quorumSize, int superQuorum, decide_t decide,
error_continuation_t error_continuation)
: PendingRequest(request, clientReqId, continuation,
std::move(timer), quorumSize),
consensusReplyQuorum(quorumSize),
decide(decide),
quorumSize(quorumSize),
superQuorumSize(superQuorum),
on_slow_path(false),
error_continuation(error_continuation),
transition_to_slow_path_timer(
std::move(transition_to_slow_path_timer)){};
};
uint64_t lastReqId;
std::unordered_map<uint64_t, PendingRequest *> pendingReqs;
void SendInconsistent(const PendingInconsistentRequest *req);
void ResendInconsistent(const uint64_t reqId);
void SendConsensus(const PendingConsensusRequest *req);
void ResendConsensus(const uint64_t reqId);
// `TransitionToConsensusSlowPath` is called after a timeout to end the
// possibility of taking the fast path and transition into taking the slow
// path.
void TransitionToConsensusSlowPath(const uint64_t reqId);
// HandleSlowPathConsensus is called in one of two scenarios:
//
// 1. A finalized ReplyConsensusMessage was received. In this case, we
// immediately enter the slow path and use the finalized result. If
// finalized is true, req has already been populated with the
// finalized result.
// 2. We're in the slow path and receive a majority of
// ReplyConsensusMessages in the same view. In this case, we call
// decide to determine the final result.
//
// In either case, HandleSlowPathConsensus intitiates the finalize phase of
// a consensus request.
void HandleSlowPathConsensus(
const uint64_t reqid,
const std::map<int, proto::ReplyConsensusMessage> &msgs,
const bool finalized_result_found,
PendingConsensusRequest *req);
// HandleFastPathConsensus is called when we're on the fast path and
// receive a super quorum of responses from the same view.
// HandleFastPathConsensus will check to see if there is a superquorum of
// matching responses. If there is, it will return to the user and
// asynchronously intitiate the finalize phase of a consensus request.
// Otherwise, it transitions into the slow path which will also initiate
// the finalize phase of a consensus request, but not yet return to the
// user.
void HandleFastPathConsensus(
const uint64_t reqid,
const std::map<int, proto::ReplyConsensusMessage> &msgs,
PendingConsensusRequest *req);
void ResendConfirmation(const uint64_t reqId, bool isConsensus);
void HandleInconsistentReply(const TransportAddress &remote,
const proto::ReplyInconsistentMessage &msg);
void HandleConsensusReply(const TransportAddress &remote,
const proto::ReplyConsensusMessage &msg);
const proto::ReplyConsensusMessage &msg);
void HandleConfirm(const TransportAddress &remote,
const proto::ConfirmMessage &msg);
void HandleUnloggedReply(const TransportAddress &remote,
const proto::UnloggedReplyMessage &msg);
void UnloggedRequestTimeoutCallback();
void UnloggedRequestTimeoutCallback(const uint64_t reqId);
};
} // namespace replication::ir
......
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
......@@ -349,16 +349,16 @@ void
VRReplica::ReceiveMessage(const TransportAddress &remote,
const string &type, const string &data)
{
static RequestMessage request;
static UnloggedRequestMessage unloggedRequest;
static PrepareMessage prepare;
static PrepareOKMessage prepareOK;
static CommitMessage commit;
static RequestStateTransferMessage requestStateTransfer;
static StateTransferMessage stateTransfer;
static StartViewChangeMessage startViewChange;
static DoViewChangeMessage doViewChange;
static StartViewMessage startView;
RequestMessage request;
UnloggedRequestMessage unloggedRequest;
PrepareMessage prepare;
PrepareOKMessage prepareOK;
CommitMessage commit;
RequestStateTransferMessage requestStateTransfer;
StateTransferMessage stateTransfer;
StartViewChangeMessage startViewChange;
DoViewChangeMessage doViewChange;
StartViewMessage startView;
if (type == request.GetTypeName()) {
request.ParseFromString(data);
......@@ -513,7 +513,8 @@ VRReplica::HandleUnloggedRequest(const TransportAddress &remote,
Debug("Received unlogged request %s", (char *)msg.req().op().c_str());
ExecuteUnlogged(msg.req(), reply);
reply.set_clientreqid(msg.req().clientreqid());
if (!(transport->SendMessage(this, remote, reply)))
Warning("Failed to send reply message");
}
......
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.