Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • syslab/tapir
  • aaasz/tapir
  • ashmrtnz/tapir
3 results
Show changes
Showing
with 3881 additions and 55 deletions
d := $(dir $(lastword $(MAKEFILE_LIST)))
GTEST_SRCS += $(addprefix $(d), lockserver-test.cc)
$(d)lockserver-test: $(o)lockserver-test.o \
$(o)../locks-proto.o \
$(o)../server.o \
$(o)../client.o \
$(OBJS-ir-replica) \
$(OBJS-ir-client) \
$(LIB-configuration) \
$(LIB-repltransport) \
$(LIB-store-common) \
$(GTEST_MAIN)
TEST_BINS += $(d)lockserver-test
// -*- mode: c++; c-file-style: "k&r"; c-basic-offset: 4 -*-
/***********************************************************************
*
* lockserver_test.cc:
* test cases for lock server
*
* Copyright 2013 Dan R. K. Ports <drkp@cs.washington.edu>
*
* Permission is hereby granted, free of charge, to any person
* obtaining a copy of this software and associated documentation
* files (the "Software"), to deal in the Software without
* restriction, including without limitation the rights to use, copy,
* modify, merge, publish, distribute, sublicense, and/or sell copies
* of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be
* included in all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
* MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
* NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
* BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
* ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
* CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*
**********************************************************************/
#include <fstream>
#include <memory>
#include <thread>
#include <gtest/gtest.h>
#include "lib/configuration.h"
#include "lib/repltransport.h"
#include "lockserver/client.h"
#include "lockserver/server.h"
#include "replication/ir/replica.h"
class LockServerTest : public testing::Test {
protected:
std::vector<transport::ReplicaAddress> replica_addrs_;
std::unique_ptr<transport::Configuration> config_;
ReplTransport transport_;
std::vector<std::unique_ptr<lockserver::LockClient>> clients_;
std::vector<std::unique_ptr<lockserver::LockServer>> servers_;
std::vector<std::unique_ptr<replication::ir::IRReplica>> replicas_;
LockServerTest() {
replica_addrs_ = {{"replica", "0"},
{"replica", "1"},
{"replica", "2"},
{"replica", "3"},
{"replica", "4"}};
config_ = std::unique_ptr<transport::Configuration>(
new transport::Configuration(5, 2, replica_addrs_));
RemovePersistedFiles();
for (std::size_t i = 0; i < 3; ++i) {
auto client = std::unique_ptr<lockserver::LockClient>(
new lockserver::LockClient(&transport_, *config_));
client->lock_async(std::to_string(i));
clients_.push_back(std::move(client));
}
for (std::size_t i = 0; i < replica_addrs_.size(); ++i) {
auto server = std::unique_ptr<lockserver::LockServer>(
new lockserver::LockServer());
servers_.push_back(std::move(server));
auto replica = std::unique_ptr<replication::ir::IRReplica>(
new replication::ir::IRReplica(*config_, i, &transport_,
servers_[i].get()));
replicas_.push_back(std::move(replica));
}
}
virtual void TearDown() {
RemovePersistedFiles();
}
virtual void RemovePersistedFiles() {
for (std::size_t i = 0; i < replica_addrs_.size(); ++i) {
const transport::ReplicaAddress &addr = replica_addrs_[i];
const std::string filename =
addr.host + ":" + addr.port + "_" + std::to_string(i) + ".bin";
std::ifstream f(filename);
if (f.good()) {
int success = std::remove(filename.c_str());
ASSERT(success == 0);
}
}
}
};
// Note that these tests are all white box smoke tests. They depend on the
// low-level details of knowing exactly which timeouts are registered and which
// messages are sent. If an implementation detail is changed to make some of
// these tests fail, you should cal transport_.Run() and walk through the
// execution to trigger the desired behavior. Also, they only check to make
// sure that nothing crashes, though you can read through the Debug prints to
// make sure everything looks right.
//
// TODO: Use a ReplTransport for tests like the ones in ir-test.cc to assert
// that the correct messages are being sent.
TEST_F(LockServerTest, SuccessfulFastPathLock) {
// Send client 0's lock request.
transport_.TriggerTimer(1);
// Deliver lock request to replicas.
for (const auto &addr : replica_addrs_) {
transport_.DeliverMessage({addr.host, addr.port}, 0);
}
// Deliver lock reply to client.
for (std::size_t i = 0; i < replica_addrs_.size(); ++i) {
transport_.DeliverMessage({"client", "0"}, i);
}
// Deliver finalize to replicas.
for (const auto &addr : replica_addrs_) {
transport_.DeliverMessage({addr.host, addr.port}, 1);
}
// Deliver confirm to client.
int j = replica_addrs_.size();
for (std::size_t i = j; i < j + replica_addrs_.size(); ++i) {
transport_.DeliverMessage({"client", "0"}, i);
}
}
TEST_F(LockServerTest, SuccessfulSlowPathLock) {
// Send client 0's lock request.
transport_.TriggerTimer(1);
// Transition to slow path.
transport_.TriggerTimer(clients_.size() + replica_addrs_.size() + 1);
// Deliver lock request to replicas.
for (const auto &addr : replica_addrs_) {
transport_.DeliverMessage({addr.host, addr.port}, 0);
}
// Deliver lock reply to client.
for (std::size_t i = 0; i < replica_addrs_.size(); ++i) {
transport_.DeliverMessage({"client", "0"}, i);
}
// Deliver finalize to replicas.
for (const auto &addr : replica_addrs_) {
transport_.DeliverMessage({addr.host, addr.port}, 1);
}
// Deliver confirm to client.
int j = replica_addrs_.size();
for (std::size_t i = j; i < j + replica_addrs_.size(); ++i) {
transport_.DeliverMessage({"client", "0"}, i);
}
}
TEST_F(LockServerTest, SuccessfulViewChange) {
// Send client 0's lock request.
transport_.TriggerTimer(1);
// Deliver lock request to replicas.
for (const auto &addr : replica_addrs_) {
transport_.DeliverMessage({addr.host, addr.port}, 0);
}
// Initiate view changes on all replicas.
const std::size_t nclients = clients_.size();
const std::size_t nreplicas = replica_addrs_.size();
for (std::size_t i = nclients + 1; i < nclients + nreplicas + 1; ++i) {
transport_.TriggerTimer(i);
}
// Deliver DoViewChangeMessages to new primary.
const transport::ReplicaAddress& primary = replica_addrs_[1];
for (std::size_t i = 1; i < 1 + nreplicas - 1; ++i) {
transport_.DeliverMessage({primary.host, primary.port}, i);
}
// Deliver StartViewMessage to all replicas.
for (std::size_t i = 0; i < nreplicas; ++i) {
if (i == 1) {
continue;
}
const transport::ReplicaAddress& addr = replica_addrs_[i];
transport_.DeliverMessage({addr.host, addr.port}, nreplicas);
}
}
TEST_F(LockServerTest, SuccessfulViewChangeNonemptyRdu) {
const std::size_t nclients = clients_.size();
const std::size_t nreplicas = replica_addrs_.size();
ASSERT_GE(nclients, 3);
ASSERT_GE(nreplicas, 3);
// Send client 0's lock request.
transport_.TriggerTimer(1);
// Deliver lock request to replicas.
for (const auto &addr : replica_addrs_) {
transport_.DeliverMessage({addr.host, addr.port}, 0);
}
// Deliver lock reply to client.
for (std::size_t i = 0; i < replica_addrs_.size(); ++i) {
transport_.DeliverMessage({"client", "0"}, i);
}
// Deliver finalize to replicas.
for (const auto &addr : replica_addrs_) {
transport_.DeliverMessage({addr.host, addr.port}, 1);
}
// Send client 1's lock request.
transport_.TriggerTimer(2);
// Deliver lock request to first three replicas.
for (std::size_t i = 0; i < 3; ++i) {
const transport::ReplicaAddress &addr = replica_addrs_[i];
transport_.DeliverMessage({addr.host, addr.port}, 2);
}
// Send client 2's lock request.
transport_.TriggerTimer(3);
// Deliver lock request to first replica.
const transport::ReplicaAddress &addr = replica_addrs_[0];
transport_.DeliverMessage({addr.host, addr.port}, 3);
// View change first three replicas.
for (std::size_t i = nclients + 1; i < nclients + 1 + 3; ++i) {
transport_.TriggerTimer(i);
}
// Deliver DoViewChangeMessages to new primary.
const transport::ReplicaAddress& primary = replica_addrs_[1];
for (std::size_t i = 4; i < 4 + 2; ++i) {
transport_.DeliverMessage({primary.host, primary.port}, i);
}
// Deliver StartViewMessage to replica 0 and 2.
const transport::ReplicaAddress& addr0 = replica_addrs_[0];
const transport::ReplicaAddress& addr2 = replica_addrs_[2];
transport_.DeliverMessage({addr0.host, addr0.port}, 6);
transport_.DeliverMessage({addr2.host, addr2.port}, 6);
}
TEST_F(LockServerTest, FinalizeConsensusReply) {
const std::size_t nclients = clients_.size();
const std::size_t nreplicas = replica_addrs_.size();
// Send client 0's lock request.
transport_.TriggerTimer(1);
// Deliver lock request to replicas.
for (const auto &addr : replica_addrs_) {
transport_.DeliverMessage({addr.host, addr.port}, 0);
}
// Trigger view change.
for (std::size_t i = nclients + 1; i < nclients + 1 + nreplicas; ++i) {
transport_.TriggerTimer(i);
}
// Deliver DoViewChangeMessages to new primary.
const transport::ReplicaAddress& primary = replica_addrs_[1];
for (std::size_t i = 1; i < 1 + nreplicas - 1; ++i) {
transport_.DeliverMessage({primary.host, primary.port}, i);
}
// Deliver StartViewMessage to all replicas.
for (std::size_t i = 0; i < nreplicas; ++i) {
if (i == 1) {
continue;
}
const transport::ReplicaAddress& addr = replica_addrs_[i];
transport_.DeliverMessage({addr.host, addr.port}, nreplicas);
}
// Deliver lock request to replicas.
for (const auto &addr : replica_addrs_) {
transport_.DeliverMessage({addr.host, addr.port}, 0);
}
// Deliver finalized reply to client.
transport_.DeliverMessage({"client", "0"}, nreplicas);
}
TEST_F(LockServerTest, MismatchedConsensus) {
const std::size_t nclients = clients_.size();
const std::size_t nreplicas = replica_addrs_.size();
// Send client 0's lock request.
transport_.TriggerTimer(1);
// Transition to slow path.
transport_.TriggerTimer(nclients + nreplicas + 1);
// Deliver lock request to replicas.
for (const auto &addr : replica_addrs_) {
transport_.DeliverMessage({addr.host, addr.port}, 0);
}
// Deliver lock reply to client.
for (std::size_t i = 0; i < replica_addrs_.size(); ++i) {
transport_.DeliverMessage({"client", "0"}, i);
}
// Trigger view change.
for (std::size_t i = nclients + 1; i < nclients + 1 + nreplicas; ++i) {
transport_.TriggerTimer(i);
}
// Deliver DoViewChangeMessages to new primary.
const transport::ReplicaAddress& primary = replica_addrs_[1];
for (std::size_t i = 2; i < 2 + nreplicas - 1; ++i) {
transport_.DeliverMessage({primary.host, primary.port}, i);
}
// Deliver StartViewMessage to all replicas.
for (std::size_t i = 0; i < nreplicas; ++i) {
if (i == 1) {
continue;
}
const transport::ReplicaAddress& addr = replica_addrs_[i];
transport_.DeliverMessage({addr.host, addr.port}, 2 + nreplicas - 1);
}
// Deliver FinalizeConsensusMessage to all replicas.
for (const auto &addr : replica_addrs_) {
transport_.DeliverMessage({addr.host, addr.port}, 1);
}
// Deliver ConfirmMessages to client 0.
for (std::size_t i = nreplicas; i < nreplicas + nreplicas; ++i) {
transport_.DeliverMessage({"client", "0"}, i);
}
}
This diff is collapsed.
This diff is collapsed.
------------------------------- MODULE TAPIR -------------------------------
(***************************************************************************)
(* This is a TLA+ specification of the TAPIR algorithm. *)
(***************************************************************************)
EXTENDS FiniteSets, Naturals, TLC, TLAPS
Max(S) == IF S = {} THEN 0 ELSE CHOOSE i \in S: \A j \in S: j <= i
(***************************************************************************)
(* TAPIR constants: *)
(* 1. Shards: function from shard id to set of replica ids in the shard *)
(* 2. Transactions: set of all possible transactions *)
(* 3. nr_shards: number of shards *)
(***************************************************************************)
CONSTANTS Shards, Transactions, NrShards
\* Note: assume unique number ids for replicas
(***************************************************************************)
(* IR constants & variables (description in the IR module) *)
(***************************************************************************)
CONSTANTS Clients, Quorums, SuperQuorums,
max_vc, max_req, f
VARIABLES rState, rRecord, rCrtView, rLastView, rViewReplies, rNonce,
rViewOnDisk,
sentMsg, cCrtOp,
cCrtOpToFinalize, cMsgCounter, cCrtOpReplies, cCrtOpConfirms,
cState, aSuccessful, arRecord, aVisibility, gViewChangesNo
irReplicaVars == <<rState, rRecord, rCrtView, rViewOnDisk, rLastView,
rViewReplies, rNonce>>
irClientVars == <<cCrtOp, \* current operation at a client
cCrtOpReplies, \* current operation replies
cMsgCounter,
cState,
cCrtOpToFinalize,
cCrtOpConfirms>> \* Client variables.
irAppVars == <<aSuccessful, arRecord, aVisibility>> \* Application variables.
irOtherVars == <<sentMsg, gViewChangesNo>> \* Other variables.
IRMessageId == [cid: Clients, msgid: Nat]
(***************************************************************************)
(* TAPIR Variables/State: *)
(* 1. State at each replica: *)
(* rPrepareTxns = List of txns this replica is prepared *)
(* to commit *)
(* rTxnsLog = Log of committed and aborted txns in ts order *)
(* rStore = Versioned store *)
(* rBkpTable = Table of txns for which this replica *)
(* is the bkp coordinator *)
(* 2. State of communication medium: *)
(* sentMsg = sent (and duplicate) messages *)
(* 3. State at client: *)
(* cCrtTxn = crt txn requested by the client *)
(* *)
(***************************************************************************)
\* TAPIR variables & data structures
VARIABLES rPreparedTxns, rStore, rTxnsLogAborted, rTxnsLogCommitted,
rClock, cCrtTxn, cClock
tapirReplicaVars == <<rPreparedTxns, rStore, rTxnsLogAborted,
rTxnsLogCommitted,
rClock>>
tapirClientVars == <<cCrtTxn, cClock>>
vars == <<irReplicaVars, irClientVars, irAppVars, irOtherVars,
tapirReplicaVars, tapirClientVars>>
StoreEntry == [vs: Nat, val: Nat] \* vs = version
Store == [key: Nat,
entries: SUBSET StoreEntry,
latestVs: Nat,
latestVal: Nat]
TransactionTs == [cid: Clients, clock: Nat] \* Timestamp
ReadSet == [key: Nat, val: Nat, vs: Nat]
WriteSet == [key: Nat, val: Nat]
Transaction == [rSet: SUBSET ReadSet,
wSet: SUBSET WriteSet,
shards: SUBSET Nat]
TypeOK ==
/\ rStore \in [UNION {Shards[i]: i \in 1..NrShards} -> SUBSET Store]
/\ rPreparedTxns \in [UNION {Shards[i]: i \in 1..NrShards} -> SUBSET Transaction]
/\ rTxnsLogAborted \in [UNION {Shards[i]: i \in 1..NrShards} -> SUBSET Transaction]
/\ rTxnsLogCommitted \in [UNION {Shards[i]: i \in 1..NrShards} -> SUBSET Transaction]
TAPIRResults == {"Prepare-OK", "Retry", "Prepare-Abstain", "Abort"}
TAPIROpType == {"Prepare", "ABORT", "COMMIT"}
TAPIROpBody == [opType : TAPIROpType, txn: Transaction]
TAPIRClientFail == TRUE \* state we lose at the app level
TAPIRReplicaFail == TRUE \* state we lose at the app level
\* TAPIR implementation of IR interface
TAPIRExecInconsistent(op) == TRUE
TAPIRExecConsensus(op) == IF op.type = "Consensus" THEN "Prepare-OK" ELSE "Abort"
TAPIRDecide(results) == "Prepare-OK"
TAPIRMerge(R, d, u) == R \cup d \cup
{[msgid |-> x.msgid, op |-> x.op, res |-> "Prepare-OK"]: x \in u}
TAPIRSync(records) == TRUE
TAPIRSuccessfulInconsistentOp(c, S, op) == TRUE
TAPIRSuccessfulConsensusOp(c, S, op, res) == TRUE
\* Initialize for all shards
InitIR ==
/\ rState = [s \in 1..NrShards |-> [r \in Shards[s] |-> "NORMAL"]]
/\ rRecord = [s \in 1..NrShards |-> [r \in Shards[s] |-> {}]]
/\ rCrtView = [s \in 1..NrShards |-> [r \in Shards[s] |-> 0]]
/\ rViewOnDisk = [s \in 1..NrShards |-> [r \in Shards[s] |-> 0]]
/\ rLastView = [s \in 1..NrShards |-> [r \in Shards[s] |-> 0]]
/\ rViewReplies = [s \in 1..NrShards |-> [r \in Shards[s] |-> {}]]
/\ rNonce = [s \in 1..NrShards |-> [r \in Shards[s] |-> 0]]
/\ sentMsg = [s \in 1..NrShards |-> {}]
/\ cCrtOp = [s \in 1..NrShards |-> [c \in Clients |-> <<>>]]
/\ cCrtOpToFinalize = [s \in 1..NrShards |-> [c \in Clients |-> <<>>]]
/\ cMsgCounter = [s \in 1..NrShards |-> [c \in Clients |-> 0]]
/\ cCrtOpReplies = [s \in 1..NrShards |-> [c \in Clients |-> {}]]
/\ cCrtOpConfirms = [s \in 1..NrShards |-> [c \in Clients |-> {}]]
/\ cState = [c \in Clients |-> "NORMAL"]
/\ aSuccessful = {}
/\ arRecord = [s \in 1..NrShards |-> [r \in Shards[s] |-> {}]]
/\ aVisibility = [s \in 1..NrShards |-> [o \in IRMessageId |-> {}]]
/\ gViewChangesNo = [s \in 1..NrShards |-> 0]
\* IR instance per shard TODO: modify replica also
IR(s) == INSTANCE IR_consensus WITH AppClientFail <- TAPIRClientFail,
AppReplicaFail <- TAPIRReplicaFail,
OpBody <- TAPIROpBody,
ExecInconsistent <- TAPIRExecInconsistent,
ExecConsensus <- TAPIRExecConsensus,
Merge <- TAPIRMerge,
Sync <- TAPIRSync,
SuccessfulInconsistentOp <- TAPIRSuccessfulInconsistentOp,
SuccessfulConsensusOp <- TAPIRSuccessfulConsensusOp,
Decide <- TAPIRDecide,
Results <- TAPIRResults,
Replicas <- Shards[s],
Quorums <- Quorums[s],
SuperQuorums <- SuperQuorums[s],
S <- s
\* TAPIR messages
Message ==
[type: {"READ"},
key: Nat,
dst: UNION Shards]
\cup
[type: {"READ-REPLY"},
key: Nat,
val: Nat,
vs: Nat, \* version
dst: Clients]
\cup
[type: {"READ-VERSION"},
key: Nat,
vs: Nat,
dst: UNION Shards]
\cup
[type: {"READ-VERSION-REPLY"},
key: Nat,
vs: Nat,
dst: Clients]
InitTAPIR == /\ cCrtTxn = [c \in Clients |-> <<>>]
/\ cClock = [c \in Clients |-> 0]
/\ rPreparedTxns = [s \in 1..NrShards |-> [r \in Shards[s] |-> {}]]
/\ rStore = [r \in UNION {Shards[i]: i \in 1..NrShards} |-> {}]
/\ rTxnsLogAborted = [s \in 1..NrShards |-> [r \in Shards[s] |-> {}]]
/\ rTxnsLogCommitted = [s \in 1..NrShards |-> [r \in Shards[s] |-> {}]]
/\ rClock = [s \in 1..NrShards |-> [r \in Shards[s] |-> 0]]
Init == InitIR /\ InitTAPIR
-----------------------------------------------------------------------------
(***************************************************************************)
(* `^ \center \large{\textbf{Tapir replica actions}} ^' *)
(***************************************************************************)
\*TAPIRReplicaReceiveRead(r) == TRUE
\*TAPIRReplicaAction(r) ==
\* \/ /\ rState[r] = "NORMAL"
\* /\ \/ TAPIRReplicaReceiveRead(r)
-----------------------------------------------------------------------------
(***************************************************************************)
(* `^ \center \large{\textbf{Tapir client actions}} ^' *)
(***************************************************************************)
TAPIRClientExecuteTxn(c) ==
\* first, resolve all reads (read from any replica and get the vs)
\* then send prepares in all shard involved by seting the cCrtOp in the
\* respective IR shard instance
\* TODO: for now just simulate this, pick a transaction from
\* transaction pool, get some versions from the replica
\* stores
/\ cCrtTxn[c] = <<>>
/\ \E t \in Transactions:
LET rSet == {rse \in ReadSet:
/\ \E trse \in t.rSet : rse = trse
/\ LET
r == Max({r \in Shards[(rse.key % NrShards) + 1]:
\E se \in rStore[r]: rse.key = se.key})
IN
/\ r /= 0
/\ \E se \in rStore[r]:
/\ rse.key = se.key
/\ rse.val = se.latestVal
/\ rse.vs = se.latestVs
}
shards == {s \in 1..NrShards:
\/ \E trse \in t.rSet: s = (trse.key % NrShards) + 1
\/ \E twse \in t.wSet: s = (twse.key % NrShards) + 1 }
IN
/\ Cardinality(rSet) = Cardinality(t.rSet) \* found all the reads
/\ cCrtTxn' = [cCrtTxn EXCEPT ![c] = [rSet |-> rSet,
wSet |-> t.wSet,
shards |-> shards]]
/\ UNCHANGED <<irReplicaVars, irClientVars, irOtherVars, irAppVars,
tapirReplicaVars, cClock>>
TAPIRClientPrepareTxn(c) ==
/\ cCrtTxn[c] /= <<>>
/\ \E s \in cCrtTxn[c].shards: \* prepare in shard s
\* - ok if already prepared
/\ IR(s)!ClientRequest(c, [type |-> "Consensus",
body |-> [opType |-> "Prepare",
txn |-> cCrtTxn[c]]])
/\ UNCHANGED <<irReplicaVars, irAppVars,
cCrtOpReplies,
cCrtOpConfirms,
cCrtOpToFinalize,
gViewChangesNo,
cState, tapirClientVars, tapirReplicaVars>>
TAPIRClientAction(c) ==
\/ /\ cState[c] = "NORMAL"
/\ \/ TAPIRClientExecuteTxn(c) \* for now just simulate this
\* (don't send explicit READ messages)
\/ TAPIRClientPrepareTxn(c)
-----------------------------------------------------------------------------
(***************************************************************************)
(* `^ \center \large{\textbf{High-Level Actions}} ^' *)
(***************************************************************************)
Next ==
\/ \E c \in Clients: TAPIRClientAction(c)
\/ /\ \E s \in 1..NrShards: IR(s)!Next
/\ UNCHANGED <<tapirClientVars, tapirReplicaVars>>
\/ \* Avoid deadlock by termination
((\A s \in 1..NrShards:
\A i \in 1..Cardinality(Shards[s]):
rLastView[s][i] = max_vc) /\ UNCHANGED <<vars>>)
Inv == IR(1)!TypeOK /\ IR(1)!FaultTolerance
=============================================================================
\* Modification History
\* Last modified Mon Aug 31 12:55:38 PDT 2015 by aaasz
\* Created Sat Jan 31 18:31:52 PST 2015 by aaasz
--------------------------------- MODULE Test ---------------------------------
(***************************************************************************)
(* This is a TLA+ specification of the Inconsistent Replication algorithm. *)
(* (And a mechanically-checked proof of its correctness using TLAPS) *)
(***************************************************************************)
EXTENDS Naturals, FiniteSets, TLC
VARIABLES rViewReplies, recoveredOps
OpType == {"Inconsistent", "Consensus"}
OpStatus == {"TENTATIVE", "FINALIZED"}
Operations == [type: OpType, body: Nat]
TypeOK ==
/\ rViewReplies \in SUBSET ([lv: Nat,
r: SUBSET ([msgid: Nat,
op: Operations,
res: Nat,
status: OpStatus]
\cup [msgid: Nat,
op: Operations,
status: OpStatus]),
src: Nat])
A ==
rViewReplies
B ==
\* set of all records received in replies in A
UNION {x.r: x \in A}
test_recoveredConensusOps_R ==
\* any finalized consensus operation (in at least one record, in the maximum
\* latest view)
{x \in B:
/\ x.op.type = "Consensus"
/\ x.status = "FINALIZED"
/\ LET most_updated_reply ==
CHOOSE reply \in A:
/\ \E rec \in reply.r: /\ rec.msgid = x.msgid
/\ rec.status = "FINALIZED"
/\ \A rep \in A:
IF \E rec \in rep.r: /\ rec.msgid = x.msgid
/\ rec.status = "FINALIZED"
THEN rep.lv <= reply.lv
ELSE TRUE
IN
x \in most_updated_reply.r}
Init ==
/\ rViewReplies = {[lv |-> 1, r |-> {[msgid |-> 1, op |-> [type |-> "Consensus", body |-> 1], res |-> 1, status |-> "FINALIZED"]}, src |-> 1],
[lv |-> 2, r |-> {[msgid |-> 1, op |-> [type |-> "Consensus", body |-> 1], res |-> 2, status |-> "FINALIZED"]}, src |-> 2],
[lv |-> 3, r |-> {[msgid |-> 1, op |-> [type |-> "Consensus", body |-> 1], res |-> 3, status |-> "FINALIZED"]}, src |-> 3]}
/\ recoveredOps = {}
Next ==
/\ recoveredOps' = test_recoveredConensusOps_R
/\ Assert(Cardinality(recoveredOps) = 0, "Should fail")
/\ UNCHANGED <<rViewReplies>>
=============================================================================
\* Modification History
\* Last modified Fri Apr 24 14:34:42 PDT 2015 by aaasz
\* Created Fri Dec 12 17:42:14 PST 2014 by aaasz
This diff is collapsed.
Quorums <- {{1,2}, {1,3}, {2,3}}
Clients <- {1}
max_req <- 1
f <- 1
max_vc <- 3
Operations <- {1}
Replicas <- {1,2,3}
max_c <- 3
......@@ -9,8 +9,8 @@ PROTOS += $(addprefix $(d), \
LIB-request := $(o)request.o
OBJS-client := $(o)client.o \
$(LIB-message) $(LIB-configuration) $(LIB-transport) \
$(LIB-request)
$(LIB-message) $(LIB-configuration) \
$(LIB-transport) $(LIB-request)
OBJS-replica := $(o)replica.o $(o)log.o \
$(LIB-message) $(LIB-request) \
......
......@@ -36,7 +36,19 @@
#include <random>
namespace replication {
std::string ErrorCodeToString(ErrorCode err) {
switch (err) {
case ErrorCode::TIMEOUT:
return "TIMEOUT";
case ErrorCode::MISMATCHED_CONSENSUS_VIEWS:
return "MISMATCHED_CONSENSUS_VIEWS";
default:
Assert(false);
return "";
}
}
Client::Client(const transport::Configuration &config, Transport *transport,
uint64_t clientid)
: config(config), transport(transport)
......@@ -61,7 +73,7 @@ Client::~Client()
{
}
void
Client::ReceiveMessage(const TransportAddress &remote,
const string &type, const string &data)
......@@ -69,5 +81,5 @@ Client::ReceiveMessage(const TransportAddress &remote,
Panic("Received unexpected message type: %s",
type.c_str());
}
} // namespace replication
......@@ -5,7 +5,7 @@
* interface to replication client stubs
*
* Copyright 2013-2015 Irene Zhang <iyzhang@cs.washington.edu>
* Naveen Kr. Sharma <nksharma@cs.washington.edu>
* Naveen Kr. Sharma <naveenks@cs.washington.edu>
* Dan R. K. Ports <drkp@cs.washington.edu>
*
* Permission is hereby granted, free of charge, to any person
......@@ -42,32 +42,54 @@
namespace replication {
// A client's request may fail for various reasons. For example, if enough
// replicas are down, a client's request may time out. An ErrorCode indicates
// the reason that a client's request failed.
enum class ErrorCode {
// For whatever reason (failed replicas, slow network), the request took
// too long and timed out.
TIMEOUT,
// For IR, if a client issues a consensus operation and receives a majority
// of replies and confirms in different views, then the operation fails.
MISMATCHED_CONSENSUS_VIEWS
};
std::string ErrorCodeToString(ErrorCode err);
class Client : public TransportReceiver
{
public:
typedef std::function<void (const string &, const string &)> continuation_t;
typedef std::function<void (const string &)> timeout_continuation_t;
using continuation_t =
std::function<void(const string &request, const string &reply)>;
using error_continuation_t =
std::function<void(const string &request, ErrorCode err)>;
static const uint32_t DEFAULT_UNLOGGED_OP_TIMEOUT = 1000; // milliseconds
Client(const transport::Configuration &config, Transport *transport,
uint64_t clientid = 0);
virtual ~Client();
virtual void Invoke(const string &request,
continuation_t continuation) = 0;
virtual void InvokeUnlogged(int replicaIdx,
const string &request,
continuation_t continuation,
timeout_continuation_t timeoutContinuation = nullptr,
uint32_t timeout = DEFAULT_UNLOGGED_OP_TIMEOUT) = 0;
virtual void Invoke(
const string &request,
continuation_t continuation,
error_continuation_t error_continuation = nullptr) = 0;
virtual void InvokeUnlogged(
int replicaIdx,
const string &request,
continuation_t continuation,
error_continuation_t error_continuation = nullptr,
uint32_t timeout = DEFAULT_UNLOGGED_OP_TIMEOUT) = 0;
virtual void ReceiveMessage(const TransportAddress &remote,
const string &type,
const string &data);
protected:
transport::Configuration config;
Transport *transport;
uint64_t clientid;
};
......
......@@ -5,7 +5,7 @@
* a replica's log of pending and committed operations
*
* Copyright 2013-2015 Irene Zhang <iyzhang@cs.washington.edu>
* Naveen Kr. Sharma <nksharma@cs.washington.edu>
* Naveen Kr. Sharma <naveenks@cs.washington.edu>
* Dan R. K. Ports <drkp@cs.washington.edu>
*
* Permission is hereby granted, free of charge, to any person
......
......@@ -6,7 +6,7 @@
* replicas and determining whether a quorum of responses has been met
*
* Copyright 2013-2015 Irene Zhang <iyzhang@cs.washington.edu>
* Naveen Kr. Sharma <nksharma@cs.washington.edu>
* Naveen Kr. Sharma <naveenks@cs.washington.edu>
* Dan R. K. Ports <drkp@cs.washington.edu>
*
* Permission is hereby granted, free of charge, to any person
......@@ -35,7 +35,7 @@
#define _COMMON_QUORUMSET_H_
namespace replication {
template <class IDTYPE, class MSGTYPE>
class QuorumSet
{
......@@ -43,9 +43,9 @@ public:
QuorumSet(int numRequired)
: numRequired(numRequired)
{
}
void
Clear()
{
......@@ -80,7 +80,20 @@ public:
return &vsmessages;
} else {
return NULL;
}
}
}
const std::map<int, MSGTYPE> *
CheckForQuorum()
{
for (const auto &p : messages) {
const IDTYPE &vs = p.first;
const std::map<int, MSGTYPE> *quorum = CheckForQuorum(vs);
if (quorum != nullptr) {
return quorum;
}
}
return nullptr;
}
const std::map<int, MSGTYPE> *
......@@ -98,7 +111,7 @@ public:
}
vsmessages[replicaIdx] = msg;
return CheckForQuorum(vs);
}
......@@ -107,7 +120,7 @@ public:
{
AddAndCheckForQuorum(vs, replicaIdx, msg);
}
public:
int numRequired;
private:
......
......@@ -6,7 +6,7 @@
* replication protocol
*
* Copyright 2013-2015 Irene Zhang <iyzhang@cs.washington.edu>
* Naveen Kr. Sharma <nksharma@cs.washington.edu>
* Naveen Kr. Sharma <naveenks@cs.washington.edu>
* Dan R. K. Ports <drkp@cs.washington.edu>
*
* Permission is hereby granted, free of charge, to any person
......@@ -56,7 +56,9 @@ Replica::~Replica()
void
Replica::LeaderUpcall(opnum_t opnum, const string &op, bool &replicate, string &res)
{
Debug("Making leader upcall for operation %s", op.c_str());
app->LeaderUpcall(opnum, op, replicate, res);
Debug("Upcall result: %s %s", replicate ? "yes":"no", res.c_str());
}
void
......@@ -68,27 +70,6 @@ Replica::ReplicaUpcall(opnum_t opnum, const string &op, string &res)
Debug("Upcall result: %s", res.c_str());
}
void
Replica::Rollback(opnum_t current, opnum_t to, Log &log)
{
Debug("Making rollback-upcall from " FMT_OPNUM " to " FMT_OPNUM,
current, to);
std::map<opnum_t, string> reqs;
for (opnum_t x = current; x > to; x--) {
reqs.insert(std::pair<opnum_t, string>(x,
log.Find(x)->request.op()));
}
app->RollbackUpcall(current, to, reqs);
}
void
Replica::Commit(opnum_t op)
{
app->CommitUpcall(op);
}
void
Replica::UnloggedUpcall(const string &op, string &res)
{
......
......@@ -5,7 +5,7 @@
* interface to different vr protocols
*
* Copyright 2013-2015 Irene Zhang <iyzhang@cs.washington.edu>
* Naveen Kr. Sharma <nksharma@cs.washington.edu>
* Naveen Kr. Sharma <naveenks@cs.washington.edu>
* Dan R. K. Ports <drkp@cs.washington.edu>
*
* Permission is hereby granted, free of charge, to any person
......@@ -59,10 +59,6 @@ public:
virtual void LeaderUpcall(opnum_t opnum, const string &str1, bool &replicate, string &str2) { replicate = true; str2 = str1; };
// Invoke callback on all replicas
virtual void ReplicaUpcall(opnum_t opnum, const string &str1, string &str2) { };
// Rollback callback on failed speculative operations
virtual void RollbackUpcall(opnum_t current, opnum_t to, const std::map<opnum_t, string> &opMap) { };
// Commit callback to commit speculative operations
virtual void CommitUpcall(opnum_t) { };
// Invoke call back for unreplicated operations run on only one replica
virtual void UnloggedUpcall(const string &str1, string &str2) { };
};
......@@ -79,8 +75,6 @@ protected:
template<class MSG> void Execute(opnum_t opnum,
const Request & msg,
MSG &reply);
void Rollback(opnum_t current, opnum_t to, Log &log);
void Commit(opnum_t op);
void UnloggedUpcall(const string &op, string &res);
template<class MSG> void ExecuteUnlogged(const UnloggedRequest & msg,
MSG &reply);
......
syntax = "proto2";
package replication;
message Request {
......
d := $(dir $(lastword $(MAKEFILE_LIST)))
SRCS += $(addprefix $(d), \
record.cc client.cc replica.cc)
PROTOS += $(addprefix $(d), \
ir-proto.proto)
OBJS-ir-client := $(o)ir-proto.o $(o)client.o \
$(OBJS-client) $(LIB-message) \
$(LIB-configuration)
OBJS-ir-replica := $(o)record.o $(o)replica.o $(o)ir-proto.o \
$(OBJS-replica) $(LIB-message) \
$(LIB-configuration) $(LIB-persistent_register)
include $(d)tests/Rules.mk
This diff is collapsed.
This diff is collapsed.
syntax = "proto2";
import "replication/common/request.proto";
package replication.ir.proto;
message OpID {
required uint64 clientid = 1;
required uint64 clientreqid = 2;
}
// For the view change and recovery protocol, a replica stores two things on
// disk: (1) its current view and (2) the latest view during which it was
// NORMAL. Replicas pack this information into this proto buf and serialize it
// to disk.
message PersistedViewInfo {
required uint64 view = 1;
required uint64 latest_normal_view = 2;
}
enum RecordEntryState {
RECORD_STATE_TENTATIVE = 0;
RECORD_STATE_FINALIZED = 1;
}
enum RecordEntryType {
RECORD_TYPE_INCONSISTENT = 0;
RECORD_TYPE_CONSENSUS = 1;
}
message RecordEntryProto {
required uint64 view = 1;
required OpID opid = 2;
required RecordEntryState state = 3;
required RecordEntryType type = 4;
required bytes op = 5;
required bytes result = 6;
}
// TODO: Currently, replicas send entire records to one another. Figure out if
// there is a more efficient way to exchange records.
message RecordProto {
repeated RecordEntryProto entry = 1;
}
message ProposeInconsistentMessage {
required replication.Request req = 1;
}
message ReplyInconsistentMessage {
required uint64 view = 1;
required uint32 replicaIdx = 2;
required OpID opid = 3;
required bool finalized = 4;
}
message FinalizeInconsistentMessage {
required OpID opid = 1;
}
message ConfirmMessage {
required uint64 view = 1;
required uint32 replicaIdx = 2;
required OpID opid = 3;
}
message ProposeConsensusMessage {
required replication.Request req = 1;
}
message ReplyConsensusMessage {
required uint64 view = 1;
required uint32 replicaIdx = 2;
required OpID opid = 3;
required bytes result = 4;
required bool finalized = 5;
}
message FinalizeConsensusMessage {
required OpID opid = 1;
required bytes result = 2;
}
message DoViewChangeMessage {
required uint32 replicaIdx = 1;
// record is optional because a replica only sends its record to the
// leader of the new view.
optional RecordProto record = 2;
required uint64 new_view = 3;
required uint64 latest_normal_view = 4;
}
message StartViewMessage {
required RecordProto record = 1;
required uint64 new_view = 2;
}
message UnloggedRequestMessage {
required replication.UnloggedRequest req = 1;
}
message UnloggedReplyMessage {
required bytes reply = 1;
required uint64 clientreqid = 2;
}