Skip to content
Snippets Groups Projects
Commit 1c800461 authored by Irene Y Zhang's avatar Irene Y Zhang
Browse files

adding basic IR code

parent 140c4b92
No related branches found
No related tags found
No related merge requests found
d := $(dir $(lastword $(MAKEFILE_LIST))) d := $(dir $(lastword $(MAKEFILE_LIST)))
SRCS += $(addprefix $(d), \ SRCS += $(addprefix $(d), \
record.cc) record.cc client.cc replica.cc)
PROTOS += $(addprefix $(d), \ PROTOS += $(addprefix $(d), \
ir-proto.proto) ir-proto.proto)
OBJS-ir-client := $(o)ir-proto.o \ OBJS-ir-client := $(o)ir-proto.o $(o)client.o \
$(OBJS-client) $(LIB-message) \ $(OBJS-client) $(LIB-message) \
$(LIB-configuration) $(LIB-configuration)
OBJS-ir-replica := $(o)record.o $(o)ir-proto.o \ OBJS-ir-replica := $(o)record.o $(o)replica.o $(o)ir-proto.o \
$(OBJS-replica) $(LIB-message) \ $(OBJS-replica) $(LIB-message) \
$(LIB-configuration) $(LIB-configuration)
#include $(d)tests/Rules.mk include $(d)tests/Rules.mk
// -*- mode: c++; c-file-style: "k&r"; c-basic-offset: 4 -*-
/***********************************************************************
*
* ir/client.cc:
* Inconsistent replication client
*
* Copyright 2013-2015 Dan R. K. Ports <drkp@cs.washington.edu>
* Irene Zhang Ports <iyzhang@cs.washington.edu>
* Permission is hereby granted, free of charge, to any person
* obtaining a copy of this software and associated documentation
* files (the "Software"), to deal in the Software without
* restriction, including without limitation the rights to use, copy,
* modify, merge, publish, distribute, sublicense, and/or sell copies
* of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be
* included in all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
* MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
* NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
* BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
* ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
* CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*
**********************************************************************/
#include "replication/common/client.h"
#include "replication/common/request.pb.h"
#include "lib/assert.h"
#include "lib/message.h"
#include "lib/transport.h"
#include "replication/ir/client.h"
#include "replication/ir/ir-proto.pb.h"
#include <math.h>
namespace replication {
namespace ir {
using namespace std;
IRClient::IRClient(const transport::Configuration &config,
Transport *transport,
uint64_t clientid)
: Client(config, transport, clientid),
view(0),
lastReqId(0),
inconsistentReplyQuorum(config.QuorumSize()-1),
consensusReplyQuorum(config.QuorumSize() + ceil(0.5 * config.QuorumSize()) -1),
confirmQuorum(config.QuorumSize()-1)
{
pendingInconsistentRequest = NULL;
pendingConsensusRequest = NULL;
pendingUnloggedRequest = NULL;
inconsistentRequestTimeout = new Timeout(transport, 500, [this]() {
ResendInconsistent();
});
consensusRequestTimeout = new Timeout(transport, 500, [this]() {
ConsensusSlowPath();
});
confirmationTimeout = new Timeout(transport, 500, [this]() {
ResendConfirmation();
});
unloggedRequestTimeout = new Timeout(transport, 500, [this]() {
UnloggedRequestTimeoutCallback();
});
}
IRClient::~IRClient()
{
if (pendingInconsistentRequest) {
delete pendingInconsistentRequest;
}
if (pendingInconsistentRequest) {
delete pendingConsensusRequest;
}
if (pendingUnloggedRequest) {
delete pendingUnloggedRequest;
}
delete inconsistentRequestTimeout;
delete consensusRequestTimeout;
delete confirmationTimeout;
delete unloggedRequestTimeout;
}
void
IRClient::Invoke(const string &request,
continuation_t continuation)
{
InvokeInconsistent(request, continuation);
}
void
IRClient::InvokeInconsistent(const string &request,
continuation_t continuation)
{
// XXX Can only handle one pending request for now
if (pendingInconsistentRequest != NULL) {
Panic("Client only supports one pending request");
}
++lastReqId;
uint64_t reqId = lastReqId;
pendingInconsistentRequest = new PendingRequest(request, reqId, continuation);
SendInconsistent();
}
void
IRClient::SendInconsistent()
{
ASSERT(pendingInconsistentRequest != NULL);
proto::ProposeInconsistentMessage reqMsg;
reqMsg.mutable_req()->set_op(pendingInconsistentRequest->request);
reqMsg.mutable_req()->set_clientid(clientid);
reqMsg.mutable_req()->set_clientreqid(pendingInconsistentRequest->clientReqId);
if (transport->SendMessageToAll(this, reqMsg)) {
inconsistentRequestTimeout->Reset();
} else {
Warning("Could not send inconsistent request to replicas");
}
}
void
IRClient::InvokeConsensus(const string &request,
decide_t decide,
continuation_t continuation)
{
// XXX Can only handle one pending request for now
if (pendingConsensusRequest != NULL) {
Panic("Client only supports one pending request");
}
++lastReqId;
uint64_t reqId = lastReqId;
pendingConsensusRequest = new PendingRequest(request, reqId, continuation);
pendingConsensusRequest->decide = decide;
proto::ProposeConsensusMessage reqMsg;
reqMsg.mutable_req()->set_op(pendingConsensusRequest->request);
reqMsg.mutable_req()->set_clientid(clientid);
reqMsg.mutable_req()->set_clientreqid(pendingConsensusRequest->clientReqId);
if (transport->SendMessageToAll(this, reqMsg)) {
consensusRequestTimeout->Reset();
} else {
Warning("Could not send consensus request to replicas");
}
}
void
IRClient::InvokeUnlogged(int replicaIdx,
const string &request,
continuation_t continuation,
timeout_continuation_t timeoutContinuation,
uint32_t timeout)
{
// XXX Can only handle one pending request for now
if (pendingUnloggedRequest != NULL) {
Panic("Client only supports one pending unlogged request");
}
++lastReqId;
uint64_t reqId = lastReqId;
pendingUnloggedRequest = new PendingRequest(request, reqId, continuation);
pendingUnloggedRequest->timeoutContinuation = timeoutContinuation;
proto::UnloggedRequestMessage reqMsg;
reqMsg.mutable_req()->set_op(pendingUnloggedRequest->request);
reqMsg.mutable_req()->set_clientid(clientid);
reqMsg.mutable_req()->set_clientreqid(pendingUnloggedRequest->clientReqId);
ASSERT(!unloggedRequestTimeout->Active());
if (transport->SendMessageToReplica(this, replicaIdx, reqMsg)) {
unloggedRequestTimeout->SetTimeout(timeout);
unloggedRequestTimeout->Start();
} else {
Warning("Could not send unlogged request to replica");
}
}
void
IRClient::ResendInconsistent()
{
if (pendingInconsistentRequest == NULL) {
inconsistentRequestTimeout->Stop();
return;
}
Warning("Client timeout; resending inconsistent request: %lu", pendingInconsistentRequest->clientReqId);
SendInconsistent();
}
void
IRClient::ConsensusSlowPath()
{
// Give up on the fast path
consensusRequestTimeout->Stop();
if (pendingConsensusRequest == NULL) {
Warning("No consensus operation pending");
return;
}
Notice("Client timeout; taking consensus slow path: %lu", pendingConsensusRequest->clientReqId);
// get results so far
viewstamp_t vs = { view, pendingConsensusRequest->clientReqId };
auto msgs = consensusReplyQuorum.GetMessages(vs);
// construct result set
set<string> results;
for (auto &msg : msgs) {
results.insert(msg.second.result());
}
// Upcall into the application
ASSERT(pendingConsensusRequest->decide != NULL);
string result = pendingConsensusRequest->decide(results);
// Put the result in the request to store for later retries
pendingConsensusRequest->request = result;
// Send finalize message
proto::FinalizeConsensusMessage response;
response.mutable_opid()->set_clientid(clientid);
response.mutable_opid()->set_clientreqid(pendingConsensusRequest->clientReqId);
response.set_result(result);
if(transport->SendMessageToAll(this, response)) {
confirmationTimeout->Reset();
} else {
Warning("Could not send finalize message to replicas");
}
}
void
IRClient::ResendConfirmation()
{
if (pendingConsensusRequest == NULL) {
// Unless we are waiting for a confirm to finish up a
// consensus slow path, just ignore
confirmationTimeout->Stop();
} else {
proto::FinalizeConsensusMessage response;
response.mutable_opid()->set_clientid(clientid);
response.mutable_opid()->set_clientreqid(pendingConsensusRequest->clientReqId);
response.set_result(pendingConsensusRequest->request);
if(transport->SendMessageToAll(this, response)) {
confirmationTimeout->Reset();
} else {
Warning("Could not send finalize message to replicas");
}
}
}
void
IRClient::ReceiveMessage(const TransportAddress &remote,
const string &type,
const string &data)
{
static proto::ReplyInconsistentMessage replyInconsistent;
static proto::ReplyConsensusMessage replyConsensus;
static proto::ConfirmMessage confirm;
static proto::UnloggedReplyMessage unloggedReply;
if (type == replyInconsistent.GetTypeName()) {
replyInconsistent.ParseFromString(data);
HandleInconsistentReply(remote, replyInconsistent);
} else if (type == replyConsensus.GetTypeName()) {
replyConsensus.ParseFromString(data);
HandleConsensusReply(remote, replyConsensus);
} else if (type == confirm.GetTypeName()) {
confirm.ParseFromString(data);
HandleConfirm(remote, confirm);
} else if (type == unloggedReply.GetTypeName()) {
unloggedReply.ParseFromString(data);
HandleUnloggedReply(remote, unloggedReply);
} else {
Client::ReceiveMessage(remote, type, data);
}
}
void
IRClient::HandleInconsistentReply(const TransportAddress &remote,
const proto::ReplyInconsistentMessage &msg)
{
if (pendingInconsistentRequest == NULL) {
Warning("Received reply when no request was pending");
return;
}
if (msg.opid().clientreqid() != pendingInconsistentRequest->clientReqId) {
Debug("Received reply for a different request");
return;
}
Debug("Client received reply: %lu", pendingInconsistentRequest->clientReqId);
// Record replies
viewstamp_t vs = { msg.view(), msg.opid().clientreqid() };
if (inconsistentReplyQuorum.AddAndCheckForQuorum(vs, msg.replicaidx(), msg)) {
// If all quorum received, then send finalize and return to client
inconsistentRequestTimeout->Stop();
PendingRequest *req = pendingInconsistentRequest;
pendingInconsistentRequest = NULL;
// asynchronously send the finalize message
proto::FinalizeInconsistentMessage response;
*(response.mutable_opid()) = msg.opid();
if (!transport->SendMessageToAll(this, response)) {
Warning("Could not send finalize message to replicas");
} // don't use the confirmation timeout for async replies
// Return to client
req->continuation(req->request, "");
delete req;
}
}
void
IRClient::HandleConsensusReply(const TransportAddress &remote,
const proto::ReplyConsensusMessage &msg)
{
if (pendingConsensusRequest == NULL) {
Warning("Received reply when no request was pending");
return;
}
if (msg.opid().clientreqid() != pendingConsensusRequest->clientReqId) {
Debug("Received reply for a different request");
return;
}
Debug("Client received reply: %lu", pendingConsensusRequest->clientReqId);
// Record replies
viewstamp_t vs = { msg.view(), msg.opid().clientreqid() };
if (auto msgs =
(consensusReplyQuorum.AddAndCheckForQuorum(vs, msg.replicaidx(), msg))) {
// If all quorum received, then check return values
map<string, int> results;
// count matching results
for (auto &m : *msgs) {
results[m.second.result()] = results.count(m.second.result()) + 1;
}
// Check that there are a quorum of *matching* results
for (auto result : results) {
if (result.second >= consensusReplyQuorum.NumRequired()) {
consensusRequestTimeout->Stop();
PendingRequest *req = pendingConsensusRequest;
pendingConsensusRequest = NULL;
// asynchronously send the finalize message
proto::FinalizeConsensusMessage response;
*response.mutable_opid() = msg.opid();
response.set_result(result.first);
if(!transport->SendMessageToAll(this, response)) {
Warning("Could not send finalize message to replicas");
} // don't reset the confirm timeout on fast path
// Return to client
req->continuation(req->request, result.first);
delete req;
break;
}
}
}
}
void
IRClient::HandleConfirm(const TransportAddress &remote,
const proto::ConfirmMessage &msg)
{
if (pendingConsensusRequest == NULL) {
// if no pending request, then we were waiting for a synchronous confirmation
return;
}
if (msg.opid().clientreqid() != pendingConsensusRequest->clientReqId) {
Debug("Received reply for a different request");
return;
}
// otherwise, we are waiting on a finalized consensus result
// Record replies
viewstamp_t vs = { msg.view(), msg.opid().clientreqid() };
if (confirmQuorum.AddAndCheckForQuorum(vs, msg.replicaidx(), msg)) {
confirmationTimeout->Stop();
PendingRequest *req = pendingConsensusRequest;
pendingConsensusRequest = NULL;
// Return to client
req->continuation(req->request, pendingConsensusRequest->request);
delete req;
}
}
void
IRClient::HandleUnloggedReply(const TransportAddress &remote,
const proto::UnloggedReplyMessage &msg)
{
if (pendingUnloggedRequest == NULL) {
Warning("Received unloggedReply when no request was pending");
return;
}
Debug("Client received unloggedReply");
unloggedRequestTimeout->Stop();
PendingRequest *req = pendingUnloggedRequest;
pendingUnloggedRequest = NULL;
req->continuation(req->request, msg.reply());
delete req;
}
void
IRClient::UnloggedRequestTimeoutCallback()
{
PendingRequest *req = pendingUnloggedRequest;
pendingUnloggedRequest = NULL;
Warning("Unlogged request timed out");
unloggedRequestTimeout->Stop();
req->timeoutContinuation(req->request);
}
} // namespace ir
} // namespace replication
// -*- mode: c++; c-file-style: "k&r"; c-basic-offset: 4 -*-
/***********************************************************************
*
* replication/ir/client.h:
* Inconsistent replication client
*
* Copyright 2013-2015 Dan R. K. Ports <drkp@cs.washington.edu>
* Irene Zhang Ports <iyzhang@cs.washington.edu>
*
* Permission is hereby granted, free of charge, to any person
* obtaining a copy of this software and associated documentation
* files (the "Software"), to deal in the Software without
* restriction, including without limitation the rights to use, copy,
* modify, merge, publish, distribute, sublicense, and/or sell copies
* of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be
* included in all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
* MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
* NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
* BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
* ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
* CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*
**********************************************************************/
#ifndef _IR_CLIENT_H_
#define _IR_CLIENT_H_
#include "replication/common/client.h"
#include "replication/common/quorumset.h"
#include "lib/configuration.h"
#include "replication/ir/ir-proto.pb.h"
#include <functional>
#include <set>
namespace replication {
namespace ir {
class IRClient : public Client
{
public:
typedef std::function<string (const std::set<string>)> decide_t;
IRClient(const transport::Configuration &config,
Transport *transport,
uint64_t clientid = 0);
virtual ~IRClient();
virtual void Invoke(const string &request,
continuation_t continuation);
virtual void InvokeInconsistent(const string &request,
continuation_t continuation);
virtual void InvokeConsensus(const string &request,
decide_t decide,
continuation_t continuation);
virtual void InvokeUnlogged(int replicaIdx,
const string &request,
continuation_t continuation,
timeout_continuation_t timeoutContinuation = nullptr,
uint32_t timeout = DEFAULT_UNLOGGED_OP_TIMEOUT);
virtual void ReceiveMessage(const TransportAddress &remote,
const string &type, const string &data);
protected:
int view;
uint64_t lastReqId;
QuorumSet<viewstamp_t, proto::ReplyInconsistentMessage> inconsistentReplyQuorum;
QuorumSet<viewstamp_t, proto::ReplyConsensusMessage> consensusReplyQuorum;
QuorumSet<viewstamp_t, proto::ConfirmMessage> confirmQuorum;
struct PendingRequest
{
string request;
uint64_t clientReqId;
decide_t decide;
continuation_t continuation;
timeout_continuation_t timeoutContinuation;
inline PendingRequest(string request, uint64_t clientReqId,
continuation_t continuation)
: request(request), clientReqId(clientReqId),
continuation(continuation) { }
};
PendingRequest *pendingInconsistentRequest;
PendingRequest *pendingConsensusRequest;
PendingRequest *pendingUnloggedRequest;
Timeout *inconsistentRequestTimeout;
Timeout *consensusRequestTimeout;
Timeout *confirmationTimeout;
Timeout *unloggedRequestTimeout;
void SendInconsistent();
void ResendInconsistent();
void ConsensusSlowPath();
void ResendConfirmation();
void HandleInconsistentReply(const TransportAddress &remote,
const proto::ReplyInconsistentMessage &msg);
void HandleConsensusReply(const TransportAddress &remote,
const proto::ReplyConsensusMessage &msg);
void HandleConfirm(const TransportAddress &remote,
const proto::ConfirmMessage &msg);
void HandleUnloggedReply(const TransportAddress &remote,
const proto::UnloggedReplyMessage &msg);
void UnloggedRequestTimeoutCallback();
};
} // namespace replication::ir
} // namespace replication
#endif /* _IR_CLIENT_H_ */
import "replication/common/request.proto"; import "replication/common/request.proto";
package replication.vr.proto; package replication.ir.proto;
message OpID { message OpID {
required uint64 clientid = 1; required uint64 clientid = 1;
...@@ -35,12 +35,12 @@ message ReplyConsensusMessage { ...@@ -35,12 +35,12 @@ message ReplyConsensusMessage {
required uint64 view = 1; required uint64 view = 1;
required uint32 replicaIdx = 2; required uint32 replicaIdx = 2;
required OpID opid = 3; required OpID opid = 3;
required bytes reply = 4; required bytes result = 4;
} }
message FinalizeConsensusMessage { message FinalizeConsensusMessage {
required OpID opid = 1; required OpID opid = 1;
required bytes reply = 2; required bytes result = 2;
} }
message UnloggedRequestMessage { message UnloggedRequestMessage {
...@@ -49,4 +49,4 @@ message UnloggedRequestMessage { ...@@ -49,4 +49,4 @@ message UnloggedRequestMessage {
message UnloggedReplyMessage { message UnloggedReplyMessage {
required bytes reply = 1; required bytes reply = 1;
} }
\ No newline at end of file
...@@ -31,22 +31,17 @@ ...@@ -31,22 +31,17 @@
#include "replication/ir/record.h" #include "replication/ir/record.h"
#include "lib/assert.h" #include "lib/assert.h"
namespace replication {
namespace ir { namespace ir {
Record::Record()
{
}
RecordEntry & RecordEntry &
Record::Add(view_t view, opid_t opid, RecordEntryState state, Record::Add(view_t view, opid_t opid, const Request &request,
const Request &request, const string &reply) RecordEntryState state)
{ {
RecordEntry entry; RecordEntry entry;
entry.view = vs; entry.view = view;
entry.opid = opid; entry.opid = opid;
entry.request = requet; entry.request = request;
entry.state = state; entry.state = state;
// Make sure this isn't a duplicate // Make sure this isn't a duplicate
...@@ -56,6 +51,16 @@ Record::Add(view_t view, opid_t opid, RecordEntryState state, ...@@ -56,6 +51,16 @@ Record::Add(view_t view, opid_t opid, RecordEntryState state,
return entries[opid]; return entries[opid];
} }
RecordEntry &
Record::Add(view_t view, opid_t opid, const Request &request,
RecordEntryState state, const string &result)
{
RecordEntry entry = Add(view, opid, request, state);
entry.result = result;
return entries[opid];
}
// This really ought to be const // This really ought to be const
RecordEntry * RecordEntry *
Record::Find(opid_t opid) Record::Find(opid_t opid)
...@@ -65,13 +70,13 @@ Record::Find(opid_t opid) ...@@ -65,13 +70,13 @@ Record::Find(opid_t opid)
} }
RecordEntry *entry = &entries[opid]; RecordEntry *entry = &entries[opid];
ASSERT(entry->viewstamp.opid == opid); ASSERT(entry->opid == opid);
return entry; return entry;
} }
bool bool
Record::SetStatus(opnum_t op, RecordEntryState state) Record::SetStatus(opid_t op, RecordEntryState state)
{ {
RecordEntry *entry = Find(op); RecordEntry *entry = Find(op);
if (entry == NULL) { if (entry == NULL) {
...@@ -83,7 +88,19 @@ Record::SetStatus(opnum_t op, RecordEntryState state) ...@@ -83,7 +88,19 @@ Record::SetStatus(opnum_t op, RecordEntryState state)
} }
bool bool
Record::SetRequest(opnum_t op, const Request &req) Record::SetResult(opid_t op, const string &result)
{
RecordEntry *entry = Find(op);
if (entry == NULL) {
return false;
}
entry->result = result;
return true;
}
bool
Record::SetRequest(opid_t op, const Request &req)
{ {
RecordEntry *entry = Find(op); RecordEntry *entry = Find(op);
if (entry == NULL) { if (entry == NULL) {
...@@ -106,4 +123,5 @@ Record::Empty() const ...@@ -106,4 +123,5 @@ Record::Empty() const
return entries.empty(); return entries.empty();
} }
} // ir } // namespace ir
} // namespace replication
...@@ -39,8 +39,9 @@ ...@@ -39,8 +39,9 @@
#include <map> #include <map>
#include <string> #include <string>
#include <pair> #include <utility>
namespace replication {
namespace ir { namespace ir {
enum RecordEntryState { enum RecordEntryState {
...@@ -48,7 +49,7 @@ enum RecordEntryState { ...@@ -48,7 +49,7 @@ enum RecordEntryState {
RECORD_STATE_FINALIZED RECORD_STATE_FINALIZED
}; };
typedef pair<uint64_t, uint64_t> opid_t; typedef std::pair<uint64_t, uint64_t> opid_t;
struct RecordEntry struct RecordEntry
{ {
...@@ -58,24 +59,26 @@ struct RecordEntry ...@@ -58,24 +59,26 @@ struct RecordEntry
Request request; Request request;
std::string result; std::string result;
RecordEntry() { replyMessage = ""; } RecordEntry() { result = ""; }
RecordEntry(const RecordEntry &x) RecordEntry(const RecordEntry &x)
: view(x.view), opid(x.opid), state(x.state), request(x.request), : view(x.view), opid(x.opid), state(x.state), request(x.request),
result(x.result) { } result(x.result) { }
RecordEntry(view_t view, opid_t opid, RecordEntryState state, RecordEntry(view_t view, opid_t opid, RecordEntryState state,
const Request &request, const std::string &reply) const Request &request, const std::string &result)
: view(view), opid(opid), state(state), request(request), : view(view), opid(opid), state(state), request(request),
reply(reply) { } result(result) { }
virtual ~RecordEntry() { } virtual ~RecordEntry() { }
}; };
class Record class Record
{ {
public: public:
Record(); Record() {};
RecordEntry & Add(view_t v, const Request &req, RecordEntryState state); RecordEntry & Add(view_t view, opid_t opid, const Request &request, RecordEntryState state);
LogEntry * Find(opid_t opid); RecordEntry & Add(view_t view, opid_t opid, const Request &request, RecordEntryState state, const std::string &result);
RecordEntry * Find(opid_t opid);
bool SetStatus(opid_t opid, RecordEntryState state); bool SetStatus(opid_t opid, RecordEntryState state);
bool SetResult(opid_t opid, const std::string &result);
bool SetRequest(opid_t opid, const Request &req); bool SetRequest(opid_t opid, const Request &req);
void Remove(opid_t opid); void Remove(opid_t opid);
bool Empty() const; bool Empty() const;
...@@ -86,5 +89,5 @@ private: ...@@ -86,5 +89,5 @@ private:
}; };
} // namespace ir } // namespace ir
} // namespace replication
#endif /* _IR_RECORD_H_ */ #endif /* _IR_RECORD_H_ */
...@@ -6,19 +6,23 @@ ...@@ -6,19 +6,23 @@
* *
**********************************************************************/ **********************************************************************/
#include "replication/ir/replica.cc" #include "replication/ir/replica.h"
#include "common/tracer.h"
namespace replication {
namespace ir { namespace ir {
using namespace std; using namespace std;
using namespace proto; using namespace proto;
IRReplica::IRReplica(const transport::Configuration &configuration, int myIdx, IRReplica::IRReplica(transport::Configuration config, int myIdx,
Transport *transport) Transport *transport, IRAppReplica *app) :
: store(store), configuration(configuration), myIdx(myIdx), transport(transport) view(0),
myIdx(myIdx),
transport(transport),
app(app),
status(STATUS_NORMAL)
{ {
transport->Register(this, configuration, myIdx); transport->Register(this, config, myIdx);
} }
IRReplica::~IRReplica() { } IRReplica::~IRReplica() { }
...@@ -65,44 +69,165 @@ void ...@@ -65,44 +69,165 @@ void
IRReplica::HandleProposeInconsistent(const TransportAddress &remote, IRReplica::HandleProposeInconsistent(const TransportAddress &remote,
const ProposeInconsistentMessage &msg) const ProposeInconsistentMessage &msg)
{ {
// 1. Execute uint64_t clientid = msg.req().clientid();
uint64_t clientreqid = msg.req().clientreqid();
// 2. Add to record as tentative Debug("%lu:%lu Received inconsistent op: %s", clientid, clientreqid, (char *)msg.req().op().c_str());
opid_t opid = make_pair(clientid, clientreqid);
// Check record if we've already handled this request
RecordEntry *entry = record.Find(opid);
ReplyInconsistentMessage reply;
if (entry != NULL) {
// If we already have this op in our record, then just return it
reply.set_view(entry->view);
reply.set_replicaidx(myIdx);
reply.mutable_opid()->set_clientid(clientid);
reply.mutable_opid()->set_clientreqid(clientreqid);
} else {
// Otherwise, put it in our record as tentative
record.Add(view, opid, msg.req(), RECORD_STATE_TENTATIVE);
// 3. Return Reply
reply.set_view(view);
reply.set_replicaidx(myIdx);
reply.mutable_opid()->set_clientid(clientid);
reply.mutable_opid()->set_clientreqid(clientreqid);
}
// 3. Return Reply // Send the reply
transport->SendMessage(this, remote, reply);
} }
void void
IRReplica::HandleFinalizeInconsistent(const TransportAddress &remote, IRReplica::HandleFinalizeInconsistent(const TransportAddress &remote,
const FinalizeInconsistentMessage &msg) const FinalizeInconsistentMessage &msg)
{ {
// 1. Mark as finalized uint64_t clientid = msg.opid().clientid();
uint64_t clientreqid = msg.opid().clientreqid();
// 2. Return Confirm Debug("%lu:%lu Received finalize inconsistent op", clientid, clientreqid);
opid_t opid = make_pair(clientid, clientreqid);
// Check record for the request
RecordEntry *entry = record.Find(opid);
if (entry != NULL) {
// Mark entry as finalized
record.SetStatus(opid, RECORD_STATE_FINALIZED);
// Execute the operation
app->ExecInconsistentUpcall(entry->request.op());
// Send the reply
ConfirmMessage reply;
reply.set_view(view);
reply.set_replicaidx(myIdx);
*reply.mutable_opid() = msg.opid();
transport->SendMessage(this, remote, reply);
} else {
// Ignore?
}
} }
void void
IRReplica::HandleProposeConsensus(const TransportAddress &remote, IRReplica::HandleProposeConsensus(const TransportAddress &remote,
const ProposeConsensusMessage &msg) const ProposeConsensusMessage &msg)
{ {
// 1. Execute uint64_t clientid = msg.req().clientid();
uint64_t clientreqid = msg.req().clientreqid();
Debug("%lu:%lu Received consensus op: %s", clientid, clientreqid, (char *)msg.req().op().c_str());
opid_t opid = make_pair(clientid, clientreqid);
// Check record if we've already handled this request
RecordEntry *entry = record.Find(opid);
ReplyConsensusMessage reply;
if (entry != NULL) {
// If we already have this op in our record, then just return it
reply.set_view(entry->view);
reply.set_replicaidx(myIdx);
reply.mutable_opid()->set_clientid(clientid);
reply.mutable_opid()->set_clientreqid(clientreqid);
reply.set_result(entry->result);
} else {
// Execute op
string result;
// 2. Add to record as tentative with result app->ExecConsensusUpcall(msg.req().op(), result);
// 3. Return Reply // Put it in our record as tentative
record.Add(view, opid, msg.req(), RECORD_STATE_TENTATIVE, result);
// 3. Return Reply
reply.set_view(view);
reply.set_replicaidx(myIdx);
reply.mutable_opid()->set_clientid(clientid);
reply.mutable_opid()->set_clientreqid(clientreqid);
reply.set_result(result);
}
// Send the reply
transport->SendMessage(this, remote, reply);
} }
void void
IRReplica::HandleFinalizeConsensus(const TransportAddress &remote, IRReplica::HandleFinalizeConsensus(const TransportAddress &remote,
const FinalizeConsensusMessage &msg) const FinalizeConsensusMessage &msg)
{ {
// 1. Mark as finalized with result uint64_t clientid = msg.opid().clientid();
uint64_t clientreqid = msg.opid().clientreqid();
Debug("%lu:%lu Received finalize consensus op", clientid, clientreqid);
// 2. Return Confirm opid_t opid = make_pair(clientid, clientreqid);
// Check record for the request
RecordEntry *entry = record.Find(opid);
if (entry != NULL) {
// Mark entry as finalized
record.SetStatus(opid, RECORD_STATE_FINALIZED);
if (msg.result() != entry->result) {
// Update the result
entry->result = msg.result();
}
// Send the reply
ConfirmMessage reply;
reply.set_view(view);
reply.set_replicaidx(myIdx);
*reply.mutable_opid() = msg.opid();
if (!transport->SendMessage(this, remote, reply)) {
Warning("Failed to send reply message");
}
} else {
// Ignore?
Warning("Finalize request for unknown consensus operation");
}
} }
void HandleUnlogged(const TransportAddress &remote, void
IRReplica::HandleUnlogged(const TransportAddress &remote,
const UnloggedRequestMessage &msg) const UnloggedRequestMessage &msg)
{ {
// 1. Execute UnloggedReplyMessage reply;
string res;
Debug("Received unlogged request %s", (char *)msg.req().op().c_str());
app->UnloggedUpcall(msg.req().op(), res);
reply.set_reply(res);
if (!(transport->SendMessage(this, remote, reply)))
Warning("Failed to send reply message");
} }
} // namespace ir
} // namespace replication
...@@ -14,20 +14,24 @@ ...@@ -14,20 +14,24 @@
#include "lib/message.h" #include "lib/message.h"
#include "lib/udptransport.h" #include "lib/udptransport.h"
#include "lib/configuration.h" #include "lib/configuration.h"
#include "replication/ir/record.h"
#include "replication/common/replica.h"
#include "replication/ir/ir-proto.pb.h"
namespace replication {
namespace ir { namespace ir {
class AppReplica class IRAppReplica : public AppReplica
{ {
public: public:
AppReplica() { }; IRAppReplica() { };
virtual ~AppReplica() { }; virtual ~IRAppReplica() { };
// Invoke inconsistent operation, no return value // Invoke inconsistent operation, no return value
virtual void ExecInconsistentUpcall(const string &str1); virtual void ExecInconsistentUpcall(const string &str1);
// Invoke consensus operation // Invoke consensus operation
virtual void ReplicaUpcall(const string &str1, string &str2) { }; virtual void ExecConsensusUpcall(const string &str1, string &str2);
// Invoke call back for unreplicated operations run on only one replica // Invoke unreplicated operation
virtual void UnloggedUpcall(const string &str1, string &str2) { }; virtual void UnloggedUpcall(const string &str1, string &str2);
}; };
...@@ -41,9 +45,15 @@ private: ...@@ -41,9 +45,15 @@ private:
int myIdx; int myIdx;
Transport *transport; Transport *transport;
IRAppReplica *app;
ReplicaStatus status;
// record for this replica
Record record;
public: public:
IRReplica(const specpaxos::Configuration &configuration, int myIdx, IRReplica(transport::Configuration config, int myIdx,
Transport *transport); Transport *transport, IRAppReplica *app);
~IRReplica(); ~IRReplica();
void ReceiveMessage(const TransportAddress &remote, void ReceiveMessage(const TransportAddress &remote,
...@@ -62,12 +72,9 @@ public: ...@@ -62,12 +72,9 @@ public:
void HandleUnlogged(const TransportAddress &remote, void HandleUnlogged(const TransportAddress &remote,
const proto::UnloggedRequestMessage &msg); const proto::UnloggedRequestMessage &msg);
void Load(const std::string &key, const std::string &value, const Timestamp &timestamp);
}; };
} // namespace ir } // namespace ir
} // namespace replication
#endif /* _IR_REPLICA_H_ */ #endif /* _IR_REPLICA_H_ */
d := $(dir $(lastword $(MAKEFILE_LIST)))
GTEST_SRCS += $(d)ir-test.cc
$(d)ir-test: $(o)ir-test.o \
$(OBJS-ir-replica) $(OBJS-ir-client) \
$(LIB-simtransport) \
$(GTEST_MAIN)
TEST_BINS += $(d)ir-test
// -*- mode: c++; c-file-style: "k&r"; c-basic-offset: 4 -*-
/***********************************************************************
*
* ir-test.cc:
* test cases for Inconsistent Replication Protocol
*
* Copyright 2013 Dan R. K. Ports <drkp@cs.washington.edu>
* Copyright 2015 Irene Zhang Ports <iyzhang@cs.washington.edu>
*
* Permission is hereby granted, free of charge, to any person
* obtaining a copy of this software and associated documentation
* files (the "Software"), to deal in the Software without
* restriction, including without limitation the rights to use, copy,
* modify, merge, publish, distribute, sublicense, and/or sell copies
* of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be
* included in all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
* MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
* NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
* BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
* ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
* CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*
**********************************************************************/
#include "lib/configuration.h"
#include "lib/message.h"
#include "lib/transport.h"
#include "lib/simtransport.h"
#include "replication/common/client.h"
#include "replication/common/replica.h"
#include "replication/ir/client.h"
#include "replication/ir/replica.h"
#include <stdlib.h>
#include <stdio.h>
#include <gtest/gtest.h>
#include <vector>
#include <sstream>
static string replicaLastOp;
static string clientLastOp;
static string clientLastReply;
using google::protobuf::Message;
using namespace replication;
using namespace replication::ir;
using namespace replication::ir::proto;
class IRApp : public IRAppReplica {
std::vector<string> *iOps;
std::vector<string> *cOps;
std::vector<string> *unloggedOps;
public:
IRApp(std::vector<string> *i, std::vector<string> *c, std::vector<string> *u) : iOps(i), cOps(c), unloggedOps(u) { }
void ExecInconsistentUpcall(const string &req) {
iOps->push_back(req);
}
void ExecConsensusUpcall(const string &req, string &reply) {
cOps->push_back(req);
reply = "reply: " + req;
}
void UnloggedUpcall(const string &req, string &reply) {
unloggedOps->push_back(req);
reply = "unlreply: " + req;
}
};
class IRTest : public ::testing::TestWithParam<int>
{
protected:
std::vector<IRReplica *> replicas;
IRClient *client;
SimulatedTransport *transport;
transport::Configuration *config;
std::vector<std::vector<string> > iOps;
std::vector<std::vector<string> > cOps;
std::vector<std::vector<string> > unloggedOps;
int requestNum;
virtual void SetUp() {
std::vector<transport::ReplicaAddress> replicaAddrs =
{ { "localhost", "12345" },
{ "localhost", "12346" },
{ "localhost", "12347" }};
config = new transport::Configuration(3, 1, replicaAddrs);
transport = new SimulatedTransport();
iOps.resize(config->n);
cOps.resize(config->n);
unloggedOps.resize(config->n);
for (int i = 0; i < config->n; i++) {
replicas.push_back(new IRReplica(*config, i, transport,
new IRApp(&iOps[i], &cOps[i], &unloggedOps[i])));
}
client = new IRClient(*config, transport);
requestNum = -1;
// Only let tests run for a simulated minute. This prevents
// infinite retry loops, etc.
// transport->Timer(60000, [&]() {
// transport->CancelAllTimers();
// });
}
virtual string RequestOp(int n) {
std::ostringstream stream;
stream << "test: " << n;
return stream.str();
}
virtual string LastRequestOp() {
return RequestOp(requestNum);
}
virtual void ClientSendNextInconsistent(Client::continuation_t upcall) {
requestNum++;
client->InvokeInconsistent(LastRequestOp(), upcall);
}
virtual void ClientSendNextConsensus(Client::continuation_t upcall, IRClient::decide_t decide) {
requestNum++;
client->InvokeConsensus(LastRequestOp(), decide, upcall);
}
virtual void ClientSendNextUnlogged(int idx, Client::continuation_t upcall,
Client::timeout_continuation_t timeoutContinuation = nullptr,
uint32_t timeout = Client::DEFAULT_UNLOGGED_OP_TIMEOUT) {
requestNum++;
client->InvokeUnlogged(idx, LastRequestOp(), upcall, timeoutContinuation, timeout);
}
virtual void TearDown() {
for (auto x : replicas) {
delete x;
}
replicas.clear();
iOps.clear();
cOps.clear();
unloggedOps.clear();
delete client;
delete transport;
delete config;
}
};
// TEST_P(IRTest, OneOp)
// {
// auto upcall = [this](const string &req, const string &reply) {
// EXPECT_EQ(req, LastRequestOp());
// EXPECT_EQ(reply, "reply: "+LastRequestOp());
// // Not guaranteed that any replicas except the leader have
// // executed this request.
// EXPECT_EQ(ops[0].back(), req);
// transport->CancelAllTimers();
// };
// ClientSendNext(upcall);
// transport->Run();
// // By now, they all should have executed the last request.
// for (int i = 0; i < config->n; i++) {
// EXPECT_EQ(ops[i].size(), 1);
// EXPECT_EQ(ops[i].back(), LastRequestOp());
// }
// }
TEST_P(IRTest, Unlogged)
{
auto upcall = [this](const string &req, const string &reply) {
EXPECT_EQ(req, LastRequestOp());
EXPECT_EQ(reply, "unlreply: "+LastRequestOp());
EXPECT_EQ(unloggedOps[1].back(), req);
transport->CancelAllTimers();
};
int timeouts = 0;
auto timeout = [&](const string &req) {
timeouts++;
};
ClientSendNextUnlogged(1, upcall, timeout);
transport->Run();
for (unsigned int i = 0; i < iOps.size(); i++) {
EXPECT_EQ(0, iOps[i].size());
EXPECT_EQ((i == 1 ? 1 : 0), unloggedOps[i].size());
}
EXPECT_EQ(0, timeouts);
}
TEST_P(IRTest, UnloggedTimeout)
{
auto upcall = [this](const string &req, const string &reply) {
FAIL();
transport->CancelAllTimers();
};
int timeouts = 0;
auto timeout = [&](const string &req) {
timeouts++;
};
// Drop messages to or from replica 1
transport->AddFilter(10, [](TransportReceiver *src, int srcIdx,
TransportReceiver *dst, int dstIdx,
Message &m, uint64_t &delay) {
if ((srcIdx == 1) || (dstIdx == 1)) {
return false;
}
return true;
});
// Run for 10 seconds
transport->Timer(10000, [&]() {
transport->CancelAllTimers();
});
ClientSendNextUnlogged(1, upcall, timeout);
transport->Run();
for (unsigned int i = 0; i < iOps.size(); i++) {
EXPECT_EQ(0, iOps[i].size());
EXPECT_EQ(0, unloggedOps[i].size());
}
EXPECT_EQ(1, timeouts);
}
// TEST_P(IRTest, ManyOps)
// {
// Client::continuation_t upcall = [&](const string &req, const string &reply) {
// EXPECT_EQ(req, LastRequestOp());
// EXPECT_EQ(reply, "reply: "+LastRequestOp());
// // Not guaranteed that any replicas except the leader have
// // executed this request.
// EXPECT_EQ(ops[0].back(), req);
// if (requestNum < 9) {
// ClientSendNext(upcall);
// } else {
// transport->CancelAllTimers();
// }
// };
// ClientSendNext(upcall);
// transport->Run();
// // By now, they all should have executed the last request.
// for (int i = 0; i < config->n; i++) {
// EXPECT_EQ(10, ops[i].size());
// for (int j = 0; j < 10; j++) {
// EXPECT_EQ(RequestOp(j), ops[i][j]);
// }
// }
// }
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment