Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • syslab/tapir
  • aaasz/tapir
  • ashmrtnz/tapir
3 results
Show changes
Showing
with 4028 additions and 55 deletions
// -*- mode: c++; c-file-style: "k&r"; c-basic-offset: 4 -*-
/***********************************************************************
*
* lockserver_test.cc:
* test cases for lock server
*
* Copyright 2013 Dan R. K. Ports <drkp@cs.washington.edu>
*
* Permission is hereby granted, free of charge, to any person
* obtaining a copy of this software and associated documentation
* files (the "Software"), to deal in the Software without
* restriction, including without limitation the rights to use, copy,
* modify, merge, publish, distribute, sublicense, and/or sell copies
* of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be
* included in all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
* MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
* NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
* BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
* ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
* CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*
**********************************************************************/
#include <fstream>
#include <memory>
#include <thread>
#include <gtest/gtest.h>
#include "lib/configuration.h"
#include "lib/repltransport.h"
#include "lockserver/client.h"
#include "lockserver/server.h"
#include "replication/ir/replica.h"
class LockServerTest : public testing::Test {
protected:
std::vector<transport::ReplicaAddress> replica_addrs_;
std::unique_ptr<transport::Configuration> config_;
ReplTransport transport_;
std::vector<std::unique_ptr<lockserver::LockClient>> clients_;
std::vector<std::unique_ptr<lockserver::LockServer>> servers_;
std::vector<std::unique_ptr<replication::ir::IRReplica>> replicas_;
LockServerTest() {
replica_addrs_ = {{"replica", "0"},
{"replica", "1"},
{"replica", "2"},
{"replica", "3"},
{"replica", "4"}};
config_ = std::unique_ptr<transport::Configuration>(
new transport::Configuration(5, 2, replica_addrs_));
RemovePersistedFiles();
for (std::size_t i = 0; i < 3; ++i) {
auto client = std::unique_ptr<lockserver::LockClient>(
new lockserver::LockClient(&transport_, *config_));
client->lock_async(std::to_string(i));
clients_.push_back(std::move(client));
}
for (std::size_t i = 0; i < replica_addrs_.size(); ++i) {
auto server = std::unique_ptr<lockserver::LockServer>(
new lockserver::LockServer());
servers_.push_back(std::move(server));
auto replica = std::unique_ptr<replication::ir::IRReplica>(
new replication::ir::IRReplica(*config_, i, &transport_,
servers_[i].get()));
replicas_.push_back(std::move(replica));
}
}
virtual void TearDown() {
RemovePersistedFiles();
}
virtual void RemovePersistedFiles() {
for (std::size_t i = 0; i < replica_addrs_.size(); ++i) {
const transport::ReplicaAddress &addr = replica_addrs_[i];
const std::string filename =
addr.host + ":" + addr.port + "_" + std::to_string(i) + ".bin";
std::ifstream f(filename);
if (f.good()) {
int success = std::remove(filename.c_str());
ASSERT(success == 0);
}
}
}
};
// Note that these tests are all white box smoke tests. They depend on the
// low-level details of knowing exactly which timeouts are registered and which
// messages are sent. If an implementation detail is changed to make some of
// these tests fail, you should cal transport_.Run() and walk through the
// execution to trigger the desired behavior. Also, they only check to make
// sure that nothing crashes, though you can read through the Debug prints to
// make sure everything looks right.
//
// TODO: Use a ReplTransport for tests like the ones in ir-test.cc to assert
// that the correct messages are being sent.
TEST_F(LockServerTest, SuccessfulFastPathLock) {
// Send client 0's lock request.
transport_.TriggerTimer(1);
// Deliver lock request to replicas.
for (const auto &addr : replica_addrs_) {
transport_.DeliverMessage({addr.host, addr.port}, 0);
}
// Deliver lock reply to client.
for (std::size_t i = 0; i < replica_addrs_.size(); ++i) {
transport_.DeliverMessage({"client", "0"}, i);
}
// Deliver finalize to replicas.
for (const auto &addr : replica_addrs_) {
transport_.DeliverMessage({addr.host, addr.port}, 1);
}
// Deliver confirm to client.
int j = replica_addrs_.size();
for (std::size_t i = j; i < j + replica_addrs_.size(); ++i) {
transport_.DeliverMessage({"client", "0"}, i);
}
}
TEST_F(LockServerTest, SuccessfulSlowPathLock) {
// Send client 0's lock request.
transport_.TriggerTimer(1);
// Transition to slow path.
transport_.TriggerTimer(clients_.size() + replica_addrs_.size() + 1);
// Deliver lock request to replicas.
for (const auto &addr : replica_addrs_) {
transport_.DeliverMessage({addr.host, addr.port}, 0);
}
// Deliver lock reply to client.
for (std::size_t i = 0; i < replica_addrs_.size(); ++i) {
transport_.DeliverMessage({"client", "0"}, i);
}
// Deliver finalize to replicas.
for (const auto &addr : replica_addrs_) {
transport_.DeliverMessage({addr.host, addr.port}, 1);
}
// Deliver confirm to client.
int j = replica_addrs_.size();
for (std::size_t i = j; i < j + replica_addrs_.size(); ++i) {
transport_.DeliverMessage({"client", "0"}, i);
}
}
TEST_F(LockServerTest, SuccessfulViewChange) {
// Send client 0's lock request.
transport_.TriggerTimer(1);
// Deliver lock request to replicas.
for (const auto &addr : replica_addrs_) {
transport_.DeliverMessage({addr.host, addr.port}, 0);
}
// Initiate view changes on all replicas.
const std::size_t nclients = clients_.size();
const std::size_t nreplicas = replica_addrs_.size();
for (std::size_t i = nclients + 1; i < nclients + nreplicas + 1; ++i) {
transport_.TriggerTimer(i);
}
// Deliver DoViewChangeMessages to new primary.
const transport::ReplicaAddress& primary = replica_addrs_[1];
for (std::size_t i = 1; i < 1 + nreplicas - 1; ++i) {
transport_.DeliverMessage({primary.host, primary.port}, i);
}
// Deliver StartViewMessage to all replicas.
for (std::size_t i = 0; i < nreplicas; ++i) {
if (i == 1) {
continue;
}
const transport::ReplicaAddress& addr = replica_addrs_[i];
transport_.DeliverMessage({addr.host, addr.port}, nreplicas);
}
}
TEST_F(LockServerTest, SuccessfulViewChangeNonemptyRdu) {
const std::size_t nclients = clients_.size();
const std::size_t nreplicas = replica_addrs_.size();
ASSERT_GE(nclients, 3);
ASSERT_GE(nreplicas, 3);
// Send client 0's lock request.
transport_.TriggerTimer(1);
// Deliver lock request to replicas.
for (const auto &addr : replica_addrs_) {
transport_.DeliverMessage({addr.host, addr.port}, 0);
}
// Deliver lock reply to client.
for (std::size_t i = 0; i < replica_addrs_.size(); ++i) {
transport_.DeliverMessage({"client", "0"}, i);
}
// Deliver finalize to replicas.
for (const auto &addr : replica_addrs_) {
transport_.DeliverMessage({addr.host, addr.port}, 1);
}
// Send client 1's lock request.
transport_.TriggerTimer(2);
// Deliver lock request to first three replicas.
for (std::size_t i = 0; i < 3; ++i) {
const transport::ReplicaAddress &addr = replica_addrs_[i];
transport_.DeliverMessage({addr.host, addr.port}, 2);
}
// Send client 2's lock request.
transport_.TriggerTimer(3);
// Deliver lock request to first replica.
const transport::ReplicaAddress &addr = replica_addrs_[0];
transport_.DeliverMessage({addr.host, addr.port}, 3);
// View change first three replicas.
for (std::size_t i = nclients + 1; i < nclients + 1 + 3; ++i) {
transport_.TriggerTimer(i);
}
// Deliver DoViewChangeMessages to new primary.
const transport::ReplicaAddress& primary = replica_addrs_[1];
for (std::size_t i = 4; i < 4 + 2; ++i) {
transport_.DeliverMessage({primary.host, primary.port}, i);
}
// Deliver StartViewMessage to replica 0 and 2.
const transport::ReplicaAddress& addr0 = replica_addrs_[0];
const transport::ReplicaAddress& addr2 = replica_addrs_[2];
transport_.DeliverMessage({addr0.host, addr0.port}, 6);
transport_.DeliverMessage({addr2.host, addr2.port}, 6);
}
TEST_F(LockServerTest, FinalizeConsensusReply) {
const std::size_t nclients = clients_.size();
const std::size_t nreplicas = replica_addrs_.size();
// Send client 0's lock request.
transport_.TriggerTimer(1);
// Deliver lock request to replicas.
for (const auto &addr : replica_addrs_) {
transport_.DeliverMessage({addr.host, addr.port}, 0);
}
// Trigger view change.
for (std::size_t i = nclients + 1; i < nclients + 1 + nreplicas; ++i) {
transport_.TriggerTimer(i);
}
// Deliver DoViewChangeMessages to new primary.
const transport::ReplicaAddress& primary = replica_addrs_[1];
for (std::size_t i = 1; i < 1 + nreplicas - 1; ++i) {
transport_.DeliverMessage({primary.host, primary.port}, i);
}
// Deliver StartViewMessage to all replicas.
for (std::size_t i = 0; i < nreplicas; ++i) {
if (i == 1) {
continue;
}
const transport::ReplicaAddress& addr = replica_addrs_[i];
transport_.DeliverMessage({addr.host, addr.port}, nreplicas);
}
// Deliver lock request to replicas.
for (const auto &addr : replica_addrs_) {
transport_.DeliverMessage({addr.host, addr.port}, 0);
}
// Deliver finalized reply to client.
transport_.DeliverMessage({"client", "0"}, nreplicas);
}
TEST_F(LockServerTest, MismatchedConsensus) {
const std::size_t nclients = clients_.size();
const std::size_t nreplicas = replica_addrs_.size();
// Send client 0's lock request.
transport_.TriggerTimer(1);
// Transition to slow path.
transport_.TriggerTimer(nclients + nreplicas + 1);
// Deliver lock request to replicas.
for (const auto &addr : replica_addrs_) {
transport_.DeliverMessage({addr.host, addr.port}, 0);
}
// Deliver lock reply to client.
for (std::size_t i = 0; i < replica_addrs_.size(); ++i) {
transport_.DeliverMessage({"client", "0"}, i);
}
// Trigger view change.
for (std::size_t i = nclients + 1; i < nclients + 1 + nreplicas; ++i) {
transport_.TriggerTimer(i);
}
// Deliver DoViewChangeMessages to new primary.
const transport::ReplicaAddress& primary = replica_addrs_[1];
for (std::size_t i = 2; i < 2 + nreplicas - 1; ++i) {
transport_.DeliverMessage({primary.host, primary.port}, i);
}
// Deliver StartViewMessage to all replicas.
for (std::size_t i = 0; i < nreplicas; ++i) {
if (i == 1) {
continue;
}
const transport::ReplicaAddress& addr = replica_addrs_[i];
transport_.DeliverMessage({addr.host, addr.port}, 2 + nreplicas - 1);
}
// Deliver FinalizeConsensusMessage to all replicas.
for (const auto &addr : replica_addrs_) {
transport_.DeliverMessage({addr.host, addr.port}, 1);
}
// Deliver ConfirmMessages to client 0.
for (std::size_t i = nreplicas; i < nreplicas + nreplicas; ++i) {
transport_.DeliverMessage({"client", "0"}, i);
}
}
--------------------------------- MODULE IR ---------------------------------
(***************************************************************************)
(* This is a TLA+ specification of the Inconsistent Replication algorithm. *)
(* (And a mechanically-checked proof of its correctness using TLAPS) *)
(***************************************************************************)
EXTENDS FiniteSets, Naturals, TLC, TLAPS
-----------------------------------------------------------------------------
(***************************************************************************)
(* `^ \center \large{\textbf{Constants}} ^' *)
(***************************************************************************)
(***************************************************************************)
(* Constant parameters: *)
(* Replicas: the set of all replicas (Replica IDs) *)
(* Clients: the set of all clients (Client IDs) *)
(* Quorums: the set of all quorums *)
(* SuperQuorums: the set of all super quorums *)
(* Results: the set of all possible result types *)
(* OperationBody: the set of all possible operation bodies *)
(* (with arguments, etc. - can be infinite) *)
(* *)
(* f: maximum number of failures allowed (half of n) *)
(* *)
(* Constants used to bound variables, for model checking (Nat is bounded) *)
(* max_vc: maximum number of View-Changes allowed for each replicas *)
(* max_req: maximum number of op requests performed by clients *)
(***************************************************************************)
CONSTANTS Replicas, Clients, Quorums, SuperQuorums, Results, OperationBody,
max_vc, max_req, f
ASSUME IsFiniteSet(Replicas)
(***************************************************************************)
(* The possible states of a replica and the two types of operations *)
(* currently defined by IR. *)
(***************************************************************************)
ReplicaState == {"NORMAL", "FAILED", "RECOVERING", "VIEW-CHANGING"}
ClientState == {"NORMAL", "FAILED"}
OperationType == {"Inconsistent", "Consensus"}
(***************************************************************************)
(* Definition of operation space *)
(***************************************************************************)
MessageId == Clients \X Nat
Operations == OperationType \X OperationBody
(***************************************************************************)
(* Message is defined to be the set of all possible messages *)
(***************************************************************************)
\* TODO: Assumptions
\* Assume unique message ids
\* Assume no more than f replica failures
Message ==
[type: {"REQUEST"},
id: MessageId,
op: Operations]
\cup
[type: {"REPLY"},
id: MessageId,
v: Nat,
res: Results,
src: Replicas]
\* v = view num.
\cup
[type: {"START-VIEW-CHANGE"},
v: Nat,
src: Replicas]
\cup
[type: {"DO-VIEW-CHANGE"},
r: SUBSET (MessageId \X Operations \X Results
\cup MessageId \X Operations),
v: Nat,
src: Replicas,
dst: Replicas]
\cup
[type: {"START-VIEW"},
v: Nat,
src: Replicas]
\cup
[type: {"START-VIEW-REPLY"},
v: Nat,
src: Replicas,
dst: Replicas]
-----------------------------------------------------------------------------
(***************************************************************************)
(* `^ \center \large{\textbf{Variables and State Predicates}} ^' *)
(***************************************************************************)
(***************************************************************************)
(* Variables: *)
(* 1. State at each replica: *)
(* rState = Denotes current replica state. Either: *)
(* - NORMAL (processing operations) *)
(* - VIEW-CHANGING (participating in recovery) *)
(* rRecord = Unordered set of operations and their results *)
(* rViewNumber = current view number *)
(* 2. State of communication medium: *)
(* sentMsg = sent (but not yet received) messages *)
(* 3. State at client: *)
(* cCurrentOperation = crt operation requested by the client *)
(* cMmessageCounter = the message I must use for *)
(* the next operation *)
(* *)
(***************************************************************************)
VARIABLES rState, rRecord, rViewNumber, rViewReplies, sentMsg, cCrtOp,
cMsgCounter, cCrtOpReplies, cState, aSuccessful, aFinalized,
gViewChangesNo
(***************************************************************************)
(* Defining these tuples makes it easier to express which varibles remain *)
(* unchanged *)
(***************************************************************************)
rVars == <<rState, rRecord, rViewNumber, rViewReplies>>\* Replica variables.
cVars == <<cCrtOp, \* current operation at a client
cCrtOpReplies, \* current operation replies
cMsgCounter,
cState>> \* Client variables.
aVars == <<aSuccessful, aFinalized>> \* Application variables
oVars == <<sentMsg, gViewChangesNo>> \* Other variables.
vars == <<rVars, cVars, oVars>> \* All variables.
TypeOK ==
/\ rState \in [Replicas -> ReplicaState]
/\ rRecord \in [Replicas -> SUBSET (MessageId \X
Operations \X
Results
\cup MessageId \X
Operations)]
/\ rViewNumber \in [Replicas -> Nat]
/\ rViewReplies \in [Replicas -> SUBSET [type: {"do-view-change",
"start-view-reply"},
viewNumber: Nat,
r: SUBSET (MessageId \X
Operations \X
Results
\cup MessageId \X
Operations),
src: Replicas]]
/\ sentMsg \in SUBSET Message
/\ cCrtOp \in [Clients -> Operations \cup {<<>>}]
/\ cCrtOpReplies \in [Clients -> SUBSET [viewNumber: Nat,
res: Results,
src: Replicas]]
/\ cMsgCounter \in [Clients -> Nat]
/\ cState \in [Clients -> ClientState]
/\ aSuccessful \in SUBSET (MessageId \X
Operations \X
Results
\cup MessageId \X
Operations)
/\ aFinalized \in SUBSET (MessageId \X
Operations \X
Results)
/\ gViewChangesNo \in Nat
Init ==
/\ rState = [r \in Replicas |-> "NORMAL"]
/\ rRecord = [r \in Replicas |-> {}]
/\ rViewNumber = [r \in Replicas |-> 0]
/\ rViewReplies = [r \in Replicas |-> {}]
/\ sentMsg = {}
/\ cCrtOp = [c \in Clients |-> <<>>]
/\ cCrtOpReplies = [c \in Clients |-> {}]
/\ cMsgCounter = [c \in Clients |-> 0]
/\ cState = [c \in Clients |-> "NORMAL"]
/\ aSuccessful = {}
/\ aFinalized = {}
/\ gViewChangesNo = 0
-----------------------------------------------------------------------------
(***************************************************************************)
(* `^ \center \large{\textbf{Actions}} ^' *)
(***************************************************************************)
Send(m) == sentMsg' = sentMsg \cup {m}
Receive(m) == sentMsg' = sentMsg \ {m}
-----------------------------------------------------------------------------
(***************************************************************************)
(* `^ \center \large{\textbf{Client Actions}} ^' *)
(***************************************************************************)
\* Note: CHOOSE does not introduce nondeterminism (the same value is chosen
\* each time)
\*Client sends a request
ClientRequest(c) ==
\E opType \in OperationType: \E opBody \in OperationBody:
/\ cCrtOp[c] = <<>> \*\* the client is not waiting for a result
\*\* of another operation
/\ Send([type |-> "REQUEST", id |-> <<c, cMsgCounter[c]>>,
op |-> <<opType, opBody>> ])
/\ cCrtOp' = [cCrtOp EXCEPT ![c] = <<opType, opBody>>]
/\ cMsgCounter' = [cMsgCounter EXCEPT ![c] = cMsgCounter[c] + 1]
/\ UNCHANGED <<rVars, aVars, cCrtOpReplies, cState, gViewChangesNo>>
/\ cMsgCounter[c] < max_req \* BOUND the number of requests a client can make
\*Client received a reply
ClientReceiveReply(c) ==
\E msg \in sentMsg:
/\ msg.type = "REPLY"
/\ cCrtOp[c] # <<>>
/\ msg.id = <<c, cMsgCounter[c] - 1>> \* reply to c's request for crt op
\* TODO: if already reply from src, keep the most recent one (biggest view Number)
\* /\ Assert(Cardinality(cCrtOpReplies[c])< 10, "cCrtOpReplies cardinality bound")
/\ cCrtOpReplies' = [cCrtOpReplies EXCEPT ![c] = @ \cup
{[viewNumber |-> msg.v,
res |-> msg.res,
src |-> msg.src]}]
/\ UNCHANGED <<cCrtOp, cMsgCounter, cState, rVars, aVars, oVars>>
\*An operation is successful at a client and result returned to the application
ClientSuccessfulOp(c) ==
/\ cCrtOp[c] /= <<>>
/\ \E Q \in Quorums:
\* a quorum of replies with matching view numbers
/\ \A r \in Q:
/\ \E reply \in cCrtOpReplies[c]: reply.src = r
/\ \A p \in Q: \E rr, pr \in cCrtOpReplies[c]:
/\ rr.src = r
/\ pr.src = p
/\ rr.viewNumber = pr.viewNumber
/\ \/ /\ cCrtOp[c][1] = "Inconsistent"
/\ cCrtOp' = [cCrtOp EXCEPT ![c] = <<>>]
/\ cCrtOpReplies' = [cCrtOpReplies EXCEPT ![c] = {}]
/\ aSuccessful' = aSuccessful \cup
{<<<<c, cMsgCounter[c] - 1>>,
cCrtOp[c]>>}
/\ UNCHANGED <<cMsgCounter, cState>>
\/ /\ cCrtOp[c][1] = "Consensus"
/\ \A r, p \in Q: \E rr, pr \in cCrtOpReplies[c]:
/\ rr.src = r
/\ pr.src = p
/\ rr.res = pr.res
/\ \E reply \in cCrtOpReplies[c]:
/\ reply.src \in Q
/\ aSuccessful' = aSuccessful \cup
{<<<<c, cMsgCounter[c] - 1>>,
cCrtOp[c],
reply.res>>}
/\ UNCHANGED <<cVars>>
/\ UNCHANGED <<rVars, aFinalized, oVars>>
\*An operation is finalized by a client and result returned to the application
ClientFinalizedOp(c) ==
/\ cCrtOp[c] /= <<>> /\ cCrtOp[c][1] = "Consensus"
/\ \E Q \in SuperQuorums:
\* a superquorum of replies with matching view numbers and results
/\ \A r \in Q:
/\ \E reply \in cCrtOpReplies[c]: reply.src = r
/\ \A p \in Q: \E rr, pr \in cCrtOpReplies[c]:
/\ rr.src = r
/\ pr.src = p
/\ rr.viewNumber = pr.viewNumber
/\ rr.res = pr.res
/\ \E reply \in cCrtOpReplies[c]:
/\ reply.src \in Q
/\ aFinalized' = aFinalized \cup
{<<<<c, cMsgCounter[c] - 1>>,
cCrtOp[c],
reply.res >>}
/\ cCrtOp' = [cCrtOp EXCEPT ![c] = <<>>]
/\ cCrtOpReplies' = [cCrtOpReplies EXCEPT ![c] = {}]
/\ UNCHANGED <<rVars, cMsgCounter, cState, aSuccessful, oVars>>
\*Client fails and looses all data
ClientFail(c) ==
/\ cState' = [cState EXCEPT ![c] = "FAILED"]
/\ cMsgCounter' = [cMsgCounter EXCEPT ![c] = 0]
/\ cCrtOp' = [cCrtOp EXCEPT ![c] = <<>>]
/\ cCrtOpReplies' = [cCrtOpReplies EXCEPT ![c] = {}]
/\ UNCHANGED <<rVars, aVars, oVars>>
\*Client recovers
ClientRecover(c) == FALSE
-----------------------------------------------------------------------------
(***************************************************************************)
(* `^ \center \large{\textbf{Replica Actions}} ^' *)
(***************************************************************************)
\* Replica sends a reply
ReplicaReply(r) == \* TODO: Might need to check for duplicate state action?
\E op \in Operations, id \in MessageId, result \in Results:
/\ [type |-> "REQUEST", id |-> id, op |-> op] \in sentMsg
/\ ~ \E rec \in rRecord[r]: rec[1] = id
\* not alredy replied for this op
/\ rRecord' = [rRecord EXCEPT ![r] = @ \cup {<<id, op, result>>}]
/\ Assert(rViewNumber[r] < 10, "viewNumber bound")
/\ Send([type |-> "REPLY",
id |-> id,
v |-> rViewNumber[r],
res |-> result,
src |-> r])
/\ UNCHANGED <<rState, rViewNumber, rViewReplies, cVars, aVars, gViewChangesNo>>
\*A replica starts the view change procedure
\* supports concurrent view changes (id by src)
ReplicaStartViewChange(r) ==
/\ Send([type |-> "START-VIEW-CHANGE", v |-> rViewNumber[r], src |-> r])
/\ rState' = [rState EXCEPT ![r] = "RECOVERING"]
/\ UNCHANGED <<rViewNumber, rViewReplies, rRecord, cVars, aVars>>
/\ gViewChangesNo < max_vc \* BOUND on number of view changes
/\ gViewChangesNo' = gViewChangesNo + 1
\* A replica received a message to start view change
ReplicaReceiveStartViewChange(r) ==
/\ \E msg \in sentMsg:
/\ msg.type = "START-VIEW-CHANGE"
/\ LET v_new ==
IF msg.v > rViewNumber[r] THEN msg.v
ELSE rViewNumber[r]
IN
/\ ~\E m \in sentMsg: \* not already sent (just to bound the model checker)
/\ m.type = "DO-VIEW-CHANGE"
/\ m.v >= msg.v
/\ m.dst = msg.src
/\ m.src = r
/\ Send([type |-> "DO-VIEW-CHANGE",
v |-> v_new + 1,
r |-> rRecord[r],
src |-> r,
dst |-> msg.src])
/\ rViewNumber' = [rViewNumber EXCEPT ![r] = v_new + 1]
/\ rState' = [rState EXCEPT ![r] = "VIEW-CHANGING"]
/\ UNCHANGED <<cVars, rRecord, rViewReplies, aVars, gViewChangesNo>>
\* Replica received DO-VIEW-CHANGE message
ReplicaReceiveDoViewChange(r) ==
\*/\ Assert(Cardinality(sentMsg) < 20, "bound on sentMsg")
/\ \E msg \in sentMsg:
/\ msg.type = "DO-VIEW-CHANGE"
/\ msg.dst = r
/\ msg.v > rViewNumber[r]
/\ rViewReplies' = [rViewReplies EXCEPT ![r] = @ \cup
{[type |-> "do-view-change",
viewNumber |-> msg.v,
r |-> msg.r,
src |-> msg.src]}]
/\ UNCHANGED <<cVars, rViewNumber, rRecord, rState, aVars, oVars>>
RecoverOpsResults(ops) ==
\*/\ Assert(Cardinality(ops) < 3, "recovered ops result")
/\ TRUE
RecoverOps(ops) ==
\*/\ Assert(Cardinality(ops) < 3, "recovered ops")
/\ TRUE
\* A replica received enough view change replies to start processing in the new view
ReplicaDecideNewView(r) ==
/\ \E Q \in Quorums:
/\ \A rep \in Q: \E reply \in rViewReplies[r]: /\ reply.src = rep
/\ reply.type = "do-view-change"
\* received at least a quorum of replies
/\ LET recoveredConensusOps_a ==
\* any consensus operation found in at least a majority of a Quorum
{x \in UNION {y.r: y \in {z \in rViewReplies[r]: z.src \in Q}}:
/\ x[2][1] = "Consensus"
/\ \E P \in SuperQuorums:
\A rep \in Q \cap P:
\E reply \in rViewReplies[r]:
/\ reply.src = rep
/\ x \in reply.r} \* same op, same result
recoveredConensusOps_b == \* TODO: what result? from the app?
\* the rest of consensus ops found in at least one record (discard the result)
{<<z[1], z[2]>>:
z \in {x \in UNION {y.r: y \in {z \in rViewReplies[r]: z.src \in Q}}:
/\ x[2][1] = "Consensus"
/\ ~ x \in recoveredConensusOps_a}}
recoveredInconsistentOps_c ==
\* any inconsistent operation found in any received record (discard the result)
{<<z[1], z[2]>>:
z \in {x \in UNION {y.r: y \in {z \in rViewReplies[r]: z.src \in Q}}:
x[2][1] = "Inconsistent"}}
IN
/\ RecoverOpsResults(recoveredConensusOps_a)
/\ RecoverOps(recoveredConensusOps_b)
/\ RecoverOps(recoveredInconsistentOps_c)
/\ rRecord' = [rRecord EXCEPT ![r] = @ \cup recoveredConensusOps_a
\cup recoveredConensusOps_b
\cup recoveredInconsistentOps_c]
/\ LET v_new ==
\* max view number received
CHOOSE v \in {x.viewNumber: x \in rViewReplies[r]}:
\A y \in rViewReplies[r]:
y.viewNumber <= v
IN
/\ Send([type |-> "START-VIEW",
v |-> v_new,
src |-> r])
/\ rViewNumber' = [rViewNumber EXCEPT ![r] = v_new]
/\ rViewReplies' = [rViewReplies EXCEPT ![r] = {}]
/\ UNCHANGED <<rState, cVars, aVars, gViewChangesNo>>
\*A replica receives a start view message
ReplicaReceiveStartView(r) ==
/\ \E msg \in sentMsg:
/\ msg.type = "START-VIEW"
/\ msg.v >= rViewNumber[r]
/\ msg.src # r \* don't reply to myself
/\ Send([type |-> "START-VIEW-REPLY",
v |-> msg.v,
src |-> r,
dst |-> msg.src])
/\ rViewNumber' = [rViewNumber EXCEPT ![r] = msg.v]
/\ rState' = [rState EXCEPT ![r] = "NORMAL"]
/\ UNCHANGED <<rRecord, rViewReplies, cVars, aVars, gViewChangesNo>>
ReplicaReceiveStartViewReply(r) ==
/\ \E msg \in sentMsg:
/\ msg.type = "START-VIEW-REPLY"
/\ msg.dst = r
/\ msg.v > rViewNumber[r] \* receive only if bigger than the last view I was in
/\ rViewReplies' = [rViewReplies EXCEPT ![r] = @ \cup
{[type |-> "start-view-reply",
viewNumber |-> msg.v,
r |-> {},
src |-> msg.src]}]
/\ UNCHANGED <<rRecord, rState, rViewNumber, cVars, aVars, oVars>>
ReplicaRecover(r) == \* we received enough START-VIEW-REPLY messages
\E Q \in Quorums:
/\ r \in Q
/\ \A p \in Q: \/ p = r
\/ /\ p # r
/\ \E reply \in rViewReplies[r]: /\ reply.src = p
/\ reply.type = "start-view-reply"
/\ rViewReplies' = [rViewReplies EXCEPT ![r] = {}]
/\ rState' = [rState EXCEPT ![r] = "NORMAL"]
/\ UNCHANGED <<rRecord, rViewNumber, cVars, aVars, oVars>>
ReplicaResumeViewChange(r) == \* TODO: On timeout
FALSE
\*A replica fails and looses everything
ReplicaFail(r) == \* TODO: check cardinality
/\ rState' = [rState EXCEPT ![r] = "FAILED"]
/\ rRecord' = [rRecord EXCEPT ![r] = {}]
\*/\ rViewNumber' = [rViewNumber EXCEPT ![r] = 0] \* TODO: check what happens if we loose the view number
/\ rViewReplies' = [rViewReplies EXCEPT ![r] = {}]
/\ UNCHANGED <<rViewNumber, cVars, aVars, oVars>>
/\ Cardinality({re \in Replicas:
\* We assume less than f replicas are allowed to fail
\/ rState[re] = "FAILED"
\/ rState[re] = "RECOVERING"}) < f
-----------------------------------------------------------------------------
(***************************************************************************)
(* `^ \center \large{\textbf{High-Level Actions}} ^' *)
(***************************************************************************)
ClientAction(c) ==
\/ /\ cState[c] = "NORMAL"
/\ \/ ClientRequest(c) \* some client tries to replicate commit an operation
\/ ClientReceiveReply(c) \* some client receives a reply from a replica
\*\/ ClientFail(c) \* some client fails
\/ ClientSuccessfulOp(c) \* an operation is successful at some client
\/ ClientFinalizedOp(c) \* an operation was finalized at some client
\/ /\ cState[c] = "FAILED"
/\ \/ ClientRecover(c)
ReplicaAction(r) ==
\/ /\ rState[r] = "NORMAL"
/\ \/ ReplicaReply(r) \* some replica sends a reply to a REQUEST msg
\/ ReplicaReceiveStartViewChange(r)
\/ ReplicaReceiveStartView(r)
\/ ReplicaFail(r) \* some replica fails
\/ /\ rState[r] = "FAILED"
/\ \/ ReplicaStartViewChange(r) \* some replica starts to recover
\/ /\ rState[r] = "RECOVERING" \* just to make it clear
/\ \/ ReplicaReceiveDoViewChange(r)
\/ ReplicaDecideNewView(r)
\/ ReplicaReceiveStartViewReply(r)
\/ ReplicaRecover(r)
\/ /\ rState[r] = "VIEW-CHANGING"
/\ \/ ReplicaReceiveStartViewChange(r)
\/ ReplicaReceiveStartView(r)
\/ ReplicaResumeViewChange(r) \* some timeout expired and view change not finished
\/ ReplicaFail(r)
Next ==
\/ \E c \in Clients: ClientAction(c)
\/ \E r \in Replicas: ReplicaAction(r)
Spec ==
TypeOK /\ Init /\ [] [Next]_vars
FaultTolerance ==
/\ \A successfulOp \in aSuccessful, Q \in Quorums:
(\A r \in Q: rState[r] = "NORMAL" \/ rState[r] = "VIEW-CHANGING")
=> (\E p \in Q: \E rec \in rRecord[p]:
/\ successfulOp[1] = rec[1]
/\ successfulOp[2] = rec[2]) \* Not necessarily same result
/\ \A finalizedOp \in aFinalized, Q \in Quorums:
(\A r \in Q: rState[r] = "NORMAL" \/ rState[r] = "VIEW-CHANGING")
=> (\E P \in SuperQuorums:
\A p \in Q \cap P:
\E rec \in rRecord[p]:
finalizedOp = rec)
Inv ==
/\ TypeOK
/\ FaultTolerance
THEOREM Spec => []Inv
PROOF
<1>1. Init => Inv
PROOF BY DEF Init, Inv, TypeOK, FaultTolerance, ReplicaState, ClientState
<1>2. Inv /\ [Next]_vars => Inv'
PROOF OBVIOUS
<1>3. QED
PROOF BY <1>1, <1>2, PTL DEF Spec
=============================================================================
\* Modification History
\* Last modified Sun Jan 25 14:09:05 PST 2015 by aaasz
\* Created Fri Dec 12 17:42:14 PST 2014 by aaasz
------------------------ MODULE IR_consensus --------------------------------
(***************************************************************************)
(* This is a TLA+ specification of the Inconsistent Replication algorithm. *)
(* (And a mechanically-checked proof of its correctness using TLAPS) *)
(***************************************************************************)
EXTENDS FiniteSets, Naturals, TLC, TLAPS
-----------------------------------------------------------------------------
(***************************************************************************)
(* `^ \center \large{\textbf{Constants}} ^' *)
(***************************************************************************)
(***************************************************************************)
(* Constant parameters: *)
(* Replicas: the set of all replicas (Replica IDs) *)
(* Clients: the set of all clients (Client IDs) *)
(* Quorums: the set of all quorums *)
(* SuperQuorums: the set of all super quorums *)
(* Results: the set of all possible result types *)
(* OperationBody: the set of all possible operation bodies *)
(* (with arguments, etc. - can be infinite) *)
(* S: shard id of the shard Replicas constitute *)
(* f: maximum number of failures allowed (half of n) *)
(* *)
(* Constants used to bound variables, for model checking (Nat is bounded) *)
(* max_vc: maximum number of View-Changes allowed for each replicas *)
(* max_req: maximum number of op requests performed by clients *)
(***************************************************************************)
CONSTANTS Replicas, Clients, Quorums, SuperQuorums, Results, OpBody,
AppClientFail, AppReplicaFail,
SuccessfulInconsistentOp(_,_,_), SuccessfulConsensusOp(_,_,_,_),
Merge(_,_,_),
Sync(_),
ExecInconsistent(_),
ExecConsensus(_),
Decide(_),
f,
S, Shards, \* S = shard id
max_vc, max_req
ASSUME IsFiniteSet(Replicas)
ASSUME QuorumAssumption ==
/\ Quorums \subseteq SUBSET Replicas
/\ SuperQuorums \subseteq SUBSET Replicas
/\ \A Q1, Q2 \in Quorums: Q1 \cap Q2 # {}
/\ \A Q \in Quorums, R1, R2 \in SuperQuorums:
Q \cap R1 \cap R2 # {}
ASSUME FailuresAssumption ==
\A Q \in Quorums: Cardinality(Q) > f
(***************************************************************************)
(* The possible states of a replica and the two types of operations *)
(* currently defined by IR. *)
(***************************************************************************)
ReplicaState == {"NORMAL", "FAILED", "RECOVERING", "VIEW-CHANGING"}
ClientState == {"NORMAL", "FAILED"}
OpType == {"Inconsistent", "Consensus"}
OpStatus == {"TENTATIVE", "FINALIZED"}
(***************************************************************************)
(* Definition of operation space *)
(***************************************************************************)
MessageId == [cid: Clients, msgid: Nat]
Operations == [type: OpType, body: OpBody] \cup {<<>>}
(***************************************************************************)
(* Message is defined to be the set of all possible messages *)
(***************************************************************************)
\* Assume unique message ids
\* Assume no more than f replica failures
\* We use shard to specify for what shard this message was
\* (we share the variables)
Message ==
[type: {"PROPOSE"},
id: MessageId,
op: Operations,
v: Nat]
\cup [type: {"REPLY"}, \* reply no result
id: MessageId,
v: Nat,
src: Replicas]
\cup
[type: {"REPLY"}, \* reply with result
id: MessageId,
v: Nat,
res: Results,
src: Replicas]
\* v = view num.
\cup
[type: {"START-VIEW-CHANGE"},
v: Nat,
src: Replicas]
\cup
[type: {"DO-VIEW-CHANGE"},
r: SUBSET ([msgid: MessageId,
op: Operations,
res: Results,
status: OpStatus]
\cup [msgid: MessageId,
op: Operations,
status: OpStatus]),
v: Nat,
lv: Nat,
src: Replicas,
dst: SUBSET Replicas]
\cup
[type: {"START-VIEW"},
v: Nat,
r: SUBSET ([msgid: MessageId,
op: Operations,
res: Results,
status: OpStatus]
\cup [msgid: MessageId,
op: Operations,
status: OpStatus]),
src: Replicas]
\cup
[type: {"FINALIZE"}, \* finalize with no result
id: MessageId,
op: Operations,
res: Results,
v: Nat]
\cup
[type: {"FINALIZE"}, \* finalize with result
id: MessageId,
op: Operations,
res: Results,
v: Nat]
\cup
[type: {"CONFIRM"},
v: Nat,
id: MessageId,
op: Operations,
res: Results,
src: Replicas]
-----------------------------------------------------------------------------
(***************************************************************************)
(* `^ \center \large{\textbf{Variables and State Predicates}} ^' *)
(***************************************************************************)
(***************************************************************************)
(* \* Variables: *)
(* 1. State at each replica: *)
(* rState = Denotes current replica state. Either: *)
(* - NORMAL (processing operations) *)
(* - VIEW-CHANGING (participating in recovery) *)
(* rRecord = Unordered set of operations and their results *)
(* rViewNumber = current view number *)
(* 2. State of communication medium: *)
(* sentMsg = sent (but not yet received) messages *)
(* 3. State at client: *)
(* cCurrentOperation = crt operation requested by the client *)
(* cMmessageCounter = the message I must use for *)
(* the next operation *)
(* *\ *)
(***************************************************************************)
VARIABLES rState, rRecord, rCrtView, rLastView, rViewReplies,
rViewOnDisk,
rNonce, sentMsg, cCrtOp,
cCrtOpToFinalize, cMsgCounter, cCrtOpReplies, cCrtOpConfirms,
cState, aSuccessful, arRecord, aVisibility, gViewChangesNo
(***************************************************************************)
(* Defining these tuples makes it easier to express which varibles remain *)
(* unchanged *)
(***************************************************************************)
\* Replica variables.
rVars == <<rState, rRecord, rCrtView, rViewOnDisk, rLastView,
rViewReplies, rNonce>>
\* Client variables.
cVars == <<cCrtOp, \* current operation at a client
cCrtOpToFinalize,
cCrtOpReplies, \* current operation replies
cCrtOpConfirms,
cMsgCounter,
cState>>
\* Application variables.
aVars == <<aSuccessful, arRecord, aVisibility>> \* we use them to write invariants
\* Other variables.
oVars == <<sentMsg, gViewChangesNo>>
\* All variables.
vars == <<rVars, cVars, aVars, oVars>>
TypeOK ==
/\ rState[S] \in [Replicas -> ReplicaState]
/\ rRecord[S] \in [Replicas -> SUBSET ([msgid: MessageId,
op: Operations,
res: Results,
status: OpStatus]
\cup [msgid: MessageId,
op: Operations,
status: OpStatus])]
/\ rCrtView[S] \in [Replicas -> Nat]
/\ rViewOnDisk[S] \in [Replicas -> Nat]
/\ rLastView[S] \in [Replicas -> Nat]
/\ rViewReplies[S] \in [Replicas -> SUBSET ([type: {"start-view-change"},
v: Nat,
src: Replicas]
\cup [type: {"do-view-change"},
v: Nat,
lv: Nat,
r: SUBSET ([msgid: MessageId,
op: Operations,
res: Results,
status: OpStatus]
\cup [msgid: MessageId,
op: Operations,
status: OpStatus]),
src: Replicas])]
/\ rNonce[S] \in [Replicas -> Nat]
/\ sentMsg[S] \in SUBSET Message
/\ cCrtOp[S] \in [Clients -> Operations]
/\ cCrtOpToFinalize[S] \in [Clients -> Operations]
/\ cCrtOpReplies[S] \in [Clients -> SUBSET ([viewNumber: Nat,
res: Results,
src: Replicas]
\cup [viewNumber: Nat,
src: Replicas])]
/\ cCrtOpConfirms[S] \in [Clients -> SUBSET [viewNumber: Nat,
res: Results,
src: Replicas]]
/\ cMsgCounter[S] \in [Clients -> Nat]
/\ cState \in [Clients -> ClientState]
/\ aSuccessful \in SUBSET ([mid: MessageId,
op: Operations,
res: Results]
\cup [mid: MessageId,
op: Operations])
/\ aVisibility[S] \in [MessageId -> SUBSET MessageId]
/\ arRecord[S] \in [Replicas -> SUBSET ([msgid: MessageId,
op: Operations,
res: Results,
status: OpStatus]
\cup [msgid: MessageId,
op: Operations,
status: OpStatus])]
/\ gViewChangesNo[S] \in Nat
Init ==
/\ rState = [r \in Replicas |-> "NORMAL"]
/\ rRecord = [r \in Replicas |-> {}]
/\ rCrtView = [r \in Replicas |-> 0]
/\ rViewOnDisk = [r \in Replicas |-> 0]
/\ rLastView = [r \in Replicas |-> 0]
/\ rViewReplies = [r \in Replicas |-> {}]
/\ rNonce = [r \in Replicas |-> 0]
/\ sentMsg = {}
/\ cCrtOp = [c \in Clients |-> <<>>]
/\ cCrtOpToFinalize = [c \in Clients |-> <<>>]
/\ cCrtOpReplies = [c \in Clients |-> {}]
/\ cCrtOpConfirms = [c \in Clients |-> {}]
/\ cMsgCounter = [c \in Clients |-> 0]
/\ cState = [c \in Clients |-> "NORMAL"]
/\ aSuccessful = {}
/\ aVisibility = [o \in MessageId |-> {}]
/\ arRecord = [r \in Replicas |-> {}]
/\ gViewChangesNo = 0
-----------------------------------------------------------------------------
(***************************************************************************)
(* `^ \center \large{\textbf{Actions}} ^' *)
(***************************************************************************)
Send(m) == sentMsg' = [sentMsg EXCEPT ![S] = @ \cup {m}]
AmLeader(r, v) == r = (v % Cardinality(Replicas)) + 1
IsLeader(r, v) == AmLeader(r, v)
NotIsLeader(r, v) == r /= (v % Cardinality(Replicas)) + 1
LeaderOf(v) == CHOOSE x \in Replicas: IsLeader(x, v)
-----------------------------------------------------------------------------
(***************************************************************************)
(* `^ \center \large{\textbf{Client Actions}} ^' *)
(***************************************************************************)
\* Note: CHOOSE does not introduce nondeterminism (the same value is chosen
\* each time)
\* \* Client sends a request
ClientRequest(c, op) ==
/\ cCrtOp[S][c] = <<>> \* the client is not waiting for a result
\* of another operation
/\ cCrtOpToFinalize[S][c] = <<>> \* the client is not waiting
\* to finalize operation
/\ cMsgCounter' = [cMsgCounter EXCEPT ![S][c] = @ + 1]
/\ cCrtOp' = [cCrtOp EXCEPT ![S][c] = op]
/\ Send([type |-> "PROPOSE",
id |-> [cid |-> c, msgid |-> cMsgCounter[S][c] + 1],
op |-> op,
v |-> 0])
/\ UNCHANGED <<rVars, aVars, cCrtOpReplies, cCrtOpToFinalize,
cCrtOpConfirms,cState, gViewChangesNo>>
/\ cMsgCounter[S][c] < max_req \* BOUND the number of requests a client can make
\* (useful for model checking)
\* \* Client received a reply
ClientReceiveReply(c) ==
\E msg \in sentMsg[S]:
/\ msg.type = "REPLY"
/\ cCrtOp[S][c] /= <<>>
/\ \* reply to c's request for crt op
msg.id = [cid |-> c, msgid |-> cMsgCounter[S][c]]
/\ \/ /\ cCrtOp[S][c].type = "Inconsistent"
/\ cCrtOpReplies' = [cCrtOpReplies EXCEPT ![S][c] = @ \cup
{[viewNumber |-> msg.v,
src |-> msg.src]}]
\/ /\ cCrtOp[S][c].type = "Consensus"
/\ cCrtOpReplies' = [cCrtOpReplies EXCEPT ![S][c] = @ \cup
{[viewNumber |-> msg.v,
res |-> msg.res,
src |-> msg.src]}]
/\ UNCHANGED <<cCrtOp, cCrtOpToFinalize, cCrtOpConfirms,
cMsgCounter, cState, rVars, aVars, oVars>>
\* \* "Helper" formulas
__matchingViewNumbers(Q, c) ==
\* a (super)quorum of replies with matching view numbers
/\ \A r \in Q:
\*/\ \E reply \in cCrtOpReplies[S][c]: reply.src = r
/\ \A p \in Q: \E rr, pr \in cCrtOpReplies[S][c]:
/\ rr.src = r
/\ pr.src = p
/\ rr.viewNumber = pr.viewNumber
__matchingViewNumbersAndResults(Q, c) ==
\* a (super)quorum of replies with matching view numbers
\* and results
/\ \A r \in Q:
\*/\ \E reply \in cCrtOpReplies[S][c]: reply.src = r
/\ \A p \in Q: \E rr, pr \in cCrtOpReplies[S][c]:
/\ rr.src = r
/\ pr.src = p
/\ rr.viewNumber = pr.viewNumber
/\ rr.res = pr.res
\* \* IR Client received enough responses to decide
\* \* what to do with the operation
ClientSendFinalize(c) ==
/\ cCrtOp[S][c] /= <<>>
/\ \/ \E Q \in Quorums:
\* I. The IR Client got a simple quorum of replies
/\ \A r \in Q:
\E reply \in cCrtOpReplies[S][c]: reply.src = r
/\ \/ /\ cCrtOp[S][c].type = "Inconsistent"
/\ __matchingViewNumbers(Q, c)
/\ aSuccessful' = aSuccessful \cup
{[mid |-> [cid |-> c,
msgid |-> cMsgCounter[S][c]],
op |-> cCrtOp[S][c]]}
/\ SuccessfulInconsistentOp(c, S, cCrtOp[S][c])
/\ Send([type |-> "FINALIZE",
id |-> [cid |-> c, msgid |-> cMsgCounter[S][c]],
op |-> cCrtOp[S][c],
v |-> 0])
/\ UNCHANGED <<cCrtOpToFinalize>>
\/ /\ cCrtOp[S][c].type = "Consensus"
/\ __matchingViewNumbers(Q, c)
/\ LET res == IF __matchingViewNumbersAndResults(Q, c)
THEN
CHOOSE result \in
{res \in Results:
\E reply \in cCrtOpReplies[S][c]:
/\ reply.src \in Q
/\ reply.res = res}: TRUE
ELSE
Decide(cCrtOpReplies[S][c])
IN
/\ Send([type |-> "FINALIZE",
id |-> [cid |-> c, msgid |-> cMsgCounter[S][c]],
op |-> cCrtOp[S][c],
res |-> res,
v |-> 0])
/\ cCrtOpToFinalize' = [cCrtOp EXCEPT ![S][c] = cCrtOp[S][c]]
/\ UNCHANGED <<aSuccessful>>
\/ \E SQ \in SuperQuorums:
\* II. The IR Client got super quorum of responses
/\ \A r \in SQ:
\E reply \in cCrtOpReplies[S][c]: reply.src = r
/\ cCrtOp[S][c].type = "Consensus" \* only care if consensus op
/\ __matchingViewNumbersAndResults(SQ, c)
/\ LET res == CHOOSE result \in
{res \in Results:
\E reply \in cCrtOpReplies[S][c]:
/\ reply.src \in SQ
/\ reply.res = res}: TRUE
IN
/\ Send([type |-> "FINALIZE",
id |-> [cid |-> c, msgid |-> cMsgCounter[S][c]],
op |-> cCrtOp[S][c],
res |-> res,
v |-> 0])
/\ aSuccessful' = aSuccessful \cup
{[mid |-> [cid |-> c,
msgid |-> cMsgCounter[S][c]],
op |-> cCrtOp[S][c],
res |-> res]}
/\ SuccessfulConsensusOp(c, S, cCrtOp[S][c], res)
/\ UNCHANGED <<cCrtOpToFinalize>>
/\ cCrtOp' = [cCrtOp EXCEPT ![S][c] = <<>>]
/\ cCrtOpReplies' = [cCrtOpReplies EXCEPT ![S][c] = {}]
/\ UNCHANGED <<cMsgCounter, cState, cCrtOpConfirms, rVars, arRecord, aVisibility, gViewChangesNo>>
\* \* Client received a confirm
ClientReceiveConfirm(c) ==
\E msg \in sentMsg[S]:
/\ msg.type = "CONFIRM"
/\ cCrtOpToFinalize[S][c] /= <<>>
/\ msg.id = [cid |-> c, msgid |-> cMsgCounter[S][c]] \* reply to c's request for crt op
/\ cCrtOpConfirms' = [cCrtOpConfirms EXCEPT ![S][c] = @ \cup
{[viewNumber |-> msg.v,
res |-> msg.res,
src |-> msg.src]}]
/\ UNCHANGED <<cCrtOp, cCrtOpReplies, cCrtOpToFinalize, cMsgCounter,
cState, rVars, aVars, oVars>>
\* \* An operation is finalized by a client and result returned to the application
ClientFinalizeOp(c) ==
/\ cCrtOpToFinalize[S][c] /= <<>>
/\ \E Q \in Quorums:
\* IR client received a quorum of confirms
/\ \A r \in Q:
\E reply \in cCrtOpConfirms[S][c]: reply.src = r
/\ LET
\* take the result in the biggest view number
reply == CHOOSE reply \in cCrtOpConfirms[S][c]:
~ \E rep \in cCrtOpConfirms[S][c]:
rep.viewNumber > reply.viewNumber
IN
/\ aSuccessful' = aSuccessful \cup
{[mid |-> [cid |-> c,
msgid |-> cMsgCounter[S][c]],
op |-> cCrtOpToFinalize[S][c],
res |-> reply.res]}
/\ SuccessfulConsensusOp(c, S, cCrtOp[S][c], reply.res) \* respond to app
/\ cCrtOpToFinalize' = [cCrtOpToFinalize EXCEPT ![S][c] = <<>>]
/\ cCrtOpConfirms' = [cCrtOpConfirms EXCEPT ![S][c] = {}]
/\ UNCHANGED <<rVars, cCrtOp, cCrtOpReplies,
cMsgCounter, cState, arRecord, aVisibility, oVars>>
\* \* Client fails and looses all data
ClientFail(c) ==
/\ cState' = [cState EXCEPT ![S][c] = "FAILED"]
/\ cMsgCounter' = [cMsgCounter EXCEPT ![S][c] = 0]
/\ cCrtOp' = [cCrtOp EXCEPT ![S][c] = <<>>]
/\ cCrtOpReplies' = [cCrtOpReplies EXCEPT ![S][c] = {}]
/\ AppClientFail
/\ UNCHANGED <<rVars, aVars, oVars>>
\* \* Client recovers
\* \* Not implemented yet
ClientRecover(c) == FALSE
-----------------------------------------------------------------------------
(***************************************************************************)
(* `^ \center \large{\textbf{Replica Actions}} ^' *)
(***************************************************************************)
\* \* Replica sends a reply
ReplicaReceiveRequest(r) ==
\E msg \in sentMsg[S]:
/\ msg.type = "PROPOSE"
/\ \* not already replied for this op
~ (\E rec \in rRecord[S][r]: rec.msgid = msg.id)
/\ \/ /\ msg.op.type = "Inconsistent"
/\ Send([type |-> "REPLY",
id |-> msg.id,
v |-> rCrtView[S][r],
src |-> r])
/\ rRecord' = [rRecord EXCEPT ![S][r] =
@ \cup {[msgid |-> msg.id,
op |-> msg.op,
status |-> "TENTATIVE"]}]
\/ /\ msg.op.type = "Consensus"
/\ LET res == ExecConsensus(msg.op)
IN
/\ Send([type |-> "REPLY",
id |-> msg.id,
v |-> rCrtView[S][r],
res |-> res,
src |-> r])
/\ rRecord' = [rRecord EXCEPT ![S][r] =
@ \cup {[msgid |-> msg.id,
op |-> msg.op,
res |-> res,
status |-> "TENTATIVE"]}]
/\ UNCHANGED <<rState, rCrtView, rLastView, rViewOnDisk,
rViewReplies, rNonce, cVars, aVars, gViewChangesNo>>
\* \* Replica receives a message from an IR Client to finalize an op
\* \* For inconsistent oprations the replica just
\* \* executes the operation.
ReplicaReceiveFinalize(r) ==
\E msg \in sentMsg[S]:
/\ msg.type = "FINALIZE"
/\ msg.v >= rCrtView[S][r]
/\ LET recs == {rec \in rRecord[S][r]: \* Must be only 1 record
/\ rec.msgid = msg.id
/\ rec.op = msg.op}
IN
\/ /\ msg.op.type = "Inconsistent"
/\ IF
\* Replica knows of this op
recs /= {}
THEN
IF \A rec \in recs: rec.status /= "FINALIZED"
THEN ExecInconsistent(msg.op)
ELSE TRUE
ELSE
\* Replica didn't hear of this op
ExecInconsistent(msg.op)
/\ rRecord' = [rRecord EXCEPT ![S][r] = (@ \ recs) \cup
{[msgid |-> msg.id,
op |-> msg.op,
status |-> "FINALIZED"]}]
/\ UNCHANGED <<sentMsg>>
\/ /\ msg.op.type = "Consensus"
/\ rRecord' = [rRecord EXCEPT ![S][r] = (@ \ recs) \cup
{[msgid |-> msg.id,
op |-> msg.op,
res |-> msg.res,
status |-> "FINALIZED"]}]
/\ Send([type |-> "CONFIRM",
v |-> rCrtView[S][r],
id |-> msg.id,
op |-> msg.op,
res |-> msg.res,
src |-> r])
/\ UNCHANGED <<rState, rCrtView, rLastView, rViewReplies, rViewOnDisk,
rNonce, cVars, aVars, gViewChangesNo>>
\* \* A recovering replica starts the view change procedure
ReplicaSendDoViewChange(r) ==
/\ \/ /\ rState[S][r] = "NORMAL" \/ rState[S][r] = "VIEW-CHANGING"
/\ rCrtView' = [rCrtView EXCEPT ![S][r] = @ + 1]
/\ rViewOnDisk' = [rViewOnDisk EXCEPT ![S][r] = rCrtView[S][r] + 1]
/\ rState' = [rState EXCEPT ![S][r] = "VIEW-CHANGING"]
/\ Send([type |-> "DO-VIEW-CHANGE",
v |-> rCrtView[S][r] + 1,
lv |-> rLastView[S][r],
r |-> rRecord[S][r],
src |-> r,
dst |-> Replicas])
\/ /\ rState[S][r] = "FAILED"
/\ rState' = [rState EXCEPT ![S][r] = "RECOVERING"]
/\ rCrtView' = [rCrtView EXCEPT ![S][r] = rViewOnDisk[S][r]]
/\ Send([type |-> "DO-VIEW-CHANGE",
v |-> rViewOnDisk[S][r],
lv |-> rLastView[S][r],
r |-> rRecord[S][r],
src |-> r,
dst |-> Replicas \ {x \in Replicas: IsLeader(x, rViewOnDisk[S][r])}])
/\ UNCHANGED <<rViewOnDisk>>
\/ /\ rState[S][r] = "RECOVERING"
/\ rCrtView' = [rCrtView EXCEPT ![S][r] = @ + 1]
/\ Send([type |-> "DO-VIEW-CHANGE",
v |-> rCrtView[S][r] + 1,
lv |-> rLastView[S][r],
r |-> rRecord[S][r],
src |-> r,
dst |-> Replicas \ {x \in Replicas: IsLeader(x, rCrtView[S][r] + 1)}])
/\ UNCHANGED <<rViewOnDisk, rState>>
/\ UNCHANGED <<cVars, rLastView, rViewReplies, rRecord,
rNonce, aVars>>
/\ gViewChangesNo[S] < max_vc \* BOUND on number of view changes
/\ gViewChangesNo' = [gViewChangesNo EXCEPT ![S] = @ + 1]
\* \* Replica received DO-VIEW-CHANGE message
ReplicaReceiveDoViewChange(r) ==
/\ \E msg \in sentMsg[S]:
/\ msg.type = "DO-VIEW-CHANGE"
/\ r \in msg.dst
/\ \/ /\ rState[S][r] = "NORMAL"
/\ msg.v > rCrtView[S][r]
\/ /\ rState[S][r] = "VIEW-CHANGING"
/\ msg.v >= rCrtView[S][r]
/\ rState' = [rState EXCEPT ![S][r] = "VIEW-CHANGING"]
\* keep only the one with the higher view (v)
/\ \/ /\ IsLeader(r, msg.v)
/\ LET
existingRecord == {x \in rViewReplies[S][r]:
/\ x.type = "do-view-change"
/\ x.src = msg.src} \* should only be one item in set
IN
IF \A x \in existingRecord: x.v < msg.v
THEN rViewReplies' = [rViewReplies EXCEPT ![S][r] =
(@ \ existingRecord) \cup
{[type |-> "do-view-change",
v |-> msg.v,
lv |-> msg.lv,
r |-> msg.r,
src |-> msg.src]}]
ELSE FALSE
\/ UNCHANGED <<rViewReplies>>
/\ rCrtView' = [rCrtView EXCEPT ![S][r] = msg.v]
/\ rViewOnDisk' = [rViewOnDisk EXCEPT ![S][r] = msg.v]
/\ Send([type |-> "DO-VIEW-CHANGE",
v |-> msg.v,
lv |-> rLastView[S][r],
r |-> rRecord[S][r],
src |-> r,
dst |-> Replicas])
/\ UNCHANGED <<cVars, rLastView, rRecord, rNonce,
aVars, gViewChangesNo>>
\* Note: Assume one reply for view change per replica in Q
\* (in ReplicaReceiveDoViewChange we keep only the most recent reply)
ReplicaSendStartView(r) ==
/\ \E Q \in Quorums:
/\ \A r1 \in Q:
/\ \A r2 \in Q: \E rr, pr \in rViewReplies[S][r]:
/\ rr.type = "do-view-change"
/\ pr.type = "do-view-change"
/\ rr.src = r1
/\ pr.src = r2
/\ rr.v = pr.v
/\ rr.v >= rCrtView[S][r]
\* received at a least a quorum of replies
/\ LET
A ==
\* set of all do-view-change replies from Q,
{x \in rViewReplies[S][r]: /\ x.src \in Q
/\ x.type = "do-view-change"}
B ==
\* keep only the replies from the maximum view
{x \in A: \A y \in A: y.lv <= x.lv}
C ==
\* set of all records received in replies in B
UNION {x.r: x \in B}
recoveredConsensusOps_R ==
\* any finalized consensus operation (in at least one record,
\* in the maximum latest view)
{[msgid |-> y.msgid, op |-> y.op, res |-> y.res]: y \in
{x \in C:
/\ x.op.type = "Consensus"
/\ x.status = "FINALIZED"}}
recoveredConsensusOps_d ==
\* any consensus operation found in at least a majority of a Quorum
{[msgid |-> y.msgid, op |-> y.op, res |-> y.res]: y \in
{x \in C:
/\ x.op.type = "Consensus"
/\ x.status = "TENTATIVE"
/\ \E P \in SuperQuorums:
\A replica \in Q \cap P:
\E reply \in B:
/\ reply.src = replica
/\ x \in reply.r}} \ recoveredConsensusOps_R
recoveredConsensusOps_u ==
\* the rest of consensus ops found in at least one record
\* (discard the result)
{[msgid |-> z.msgid, op |-> z.op]: z \in
(({[msgid |-> y.msgid, op |-> y.op, res |-> y.res]: y \in
{x \in C: x.op.type = "Consensus"}}
\ recoveredConsensusOps_d) \ recoveredConsensusOps_R)}
recoveredInconsistentOps_R ==
\* any inconsistent operation found in any received record
{[msgid |-> y.msgid, op |-> y.op]: y \in
{x \in C: x.op.type = "Inconsistent"}}
mergedRecordInconsistent ==
{x \in Merge(recoveredConsensusOps_R
\cup recoveredInconsistentOps_R,
recoveredConsensusOps_d,
recoveredConsensusOps_u): x.op.type = "Inconsistent"}
mergedRecordConsensus ==
{x \in Merge(recoveredConsensusOps_R
\cup recoveredInconsistentOps_R,
recoveredConsensusOps_d,
recoveredConsensusOps_u): x.op.type = "Consensus"}
masterRecord ==
{[msgid |-> x.msgid,
op |-> x.op,
status |-> "FINALIZED"]:
x \in mergedRecordInconsistent}
\cup
{[msgid |-> x.msgid,
op |-> x.op,
res |-> x.res,
status |-> "FINALIZED"]:
x \in mergedRecordConsensus}
v_new ==
\* the one decided by quorum Q
CHOOSE v \in {x.v: x \in A}: TRUE
IN
/\ rRecord' = [rRecord EXCEPT ![S][r] = masterRecord]
/\ Sync(masterRecord)
/\ Send([type |-> "START-VIEW",
v |-> v_new,
r |-> masterRecord,
src |-> r])
/\ rCrtView' = [rCrtView EXCEPT ![S][r] = v_new]
/\ rLastView' = [rLastView EXCEPT ![S][r] = v_new]
\*/\ Assert(Cardinality(masterRecord) = 0, "Should fail - ReplicaSendStartView")
/\ rState' = [rState EXCEPT ![S][r] = "NORMAL"]
/\ rViewReplies' = [rViewReplies EXCEPT ![S][r] = {}]
/\ UNCHANGED <<rNonce, rViewOnDisk, cVars, aVars, gViewChangesNo>>
\* \* A replica receives a start view message
ReplicaReceiveStartView(r) ==
/\ \E msg \in sentMsg[S]:
/\ msg.type = "START-VIEW"
/\ \/ /\ rState[S][r] = "NORMAL"
/\ msg.v > rCrtView[S][r]
\/ /\ \/ rState[S][r] = "VIEW-CHANGING"
\/ rState[S][r] = "RECOVERING"
/\ msg.v >= rCrtView[S][r]
/\ rCrtView' = [rCrtView EXCEPT ![S][r] = msg.v]
/\ rLastView' = [rLastView EXCEPT ![S][r] = msg.v]
/\ rRecord' = [rRecord EXCEPT ![S][r] = msg.r]
/\ Sync(msg.r)
/\ \* Check if the operations received in the master record
\* must be added to the aSuccessful
LET
successfulOps == {x \in msg.r: \E Q \in Quorums:
\A r1 \in Q:
\/ x \in rRecord[S][r1]
\/ x \in arRecord[S][r1]
\/ r1 = r}
IN
aSuccessful' = aSuccessful \cup
{[mid |-> x.msgid,
op |-> x.op,
res |-> x.res]:
x \in
{y \in successfulOps:
y.op.type = "Consensus"}}
\cup
{[mid |-> x.msgid,
op |-> x.op]:
x \in
{ y \in successfulOps:
y.op.type = "Inconsistent"}}
/\ rViewOnDisk' = [rViewOnDisk EXCEPT ![S][r] = msg.v + 1]
/\ rState' = [rState EXCEPT ![S][r] = "NORMAL"]
/\ rViewReplies' = [rViewReplies EXCEPT ![S][r] = {}]
/\ UNCHANGED <<rNonce, cVars, arRecord, aVisibility, oVars>>
\* \* A replica fails and looses everything except the view number
\* \* The view number has been written to disk
ReplicaFail(r) ==
/\ rState' = [rState EXCEPT ![S][r] = "FAILED"]
/\ rRecord' = [rRecord EXCEPT ![S][r] = {}]
/\ arRecord' = [arRecord EXCEPT ![S][r] = rRecord[S][r]]
\* save record only for
\* invariant purposes
/\ rCrtView' = [rCrtView EXCEPT ![S][r] = 0]
/\ rLastView' = [rLastView EXCEPT ![S][r] = 0]
/\ rViewReplies' = [rViewReplies EXCEPT ![S][r] = {}]
/\ UNCHANGED <<rViewOnDisk, rNonce, cVars, aSuccessful, aVisibility, oVars>>
/\ \* We assume less than f replicas fail simultaneously
Cardinality({re \in Replicas:
\/ rState[S][re] = "FAILED"
\/ rState[S][re] = "RECOVERING"}) < f
-----------------------------------------------------------------------------
(***************************************************************************)
(* `^ \center \large{\textbf{High-Level Actions}} ^' *)
(***************************************************************************)
ClientAction(c) ==
\/ /\ cState[c] = "NORMAL"
/\ \* \/ ClientRequest(c) \* some client tries to replicate commit an operation
\/ ClientReceiveReply(c) \* some client receives a reply from a replica
\/ ClientReceiveConfirm(c) \* some client receives a confirm from a replica
\* \/ ClientFail(c) \* some client fails
\/ ClientSendFinalize(c) \* an operation is successful at some client
\/ ClientFinalizeOp(c) \* an operation was finalized at some client
\/ /\ cState[c] = "FAILED"
/\ \/ ClientRecover(c)
ReplicaAction(r) ==
\/ /\ rState[S][r] = "NORMAL"
/\ \/ ReplicaReceiveRequest(r) \* some replica sends a reply to a PROPOSE msg
\/ ReplicaReceiveFinalize(r)
\/ ReplicaSendDoViewChange(r)
\/ ReplicaReceiveDoViewChange(r)
\/ ReplicaReceiveStartView(r)
\/ ReplicaFail(r) \* some replica fails
\/ /\ rState[S][r] = "FAILED"
/\ \/ ReplicaSendDoViewChange(r) \* start view-change protocol
\/ /\ rState[S][r] = "RECOVERING"
/\ \/ ReplicaSendDoViewChange(r)\* re-start view-change protocol (assume a
\* timeout and still no response from the new leader)
\/ ReplicaReceiveStartView(r)
\/ /\ rState[S][r] = "VIEW-CHANGING"
/\ \/ ReplicaSendDoViewChange(r)
\/ ReplicaReceiveDoViewChange(r)
\/ ReplicaSendStartView(r)
\/ ReplicaReceiveStartView(r)
\/ ReplicaFail(r)
Next ==
\/ \E c \in Clients: ClientAction(c)
\/ \E r \in Replicas: ReplicaAction(r)
\*\/ \* Avoid deadlock by termination
\* (\A i \in 1..Cardinality(Replicas): rLastView[S][i] = max_vc) \/ UNCHANGED <<vars>>
Spec == Init /\ [] [Next]_vars
FaultTolerance ==
/\ \A successfulOp \in aSuccessful, Q \in Quorums:
(\A r \in Q: rState[S][r] = "NORMAL" \/ rState[S][r] = "VIEW-CHANGING")
=> (\E p \in Q: \E rec \in rRecord[S][p]:
/\ successfulOp.mid = rec.msgid
/\ successfulOp.op = rec.op) \* Not necessarily same result
=============================================================================
\* Modification History
\* Last modified Fri Aug 28 10:58:02 PDT 2015 by aaasz
\* Created Fri Dec 12 17:42:14 PST 2014 by aaasz
------------------------------- MODULE TAPIR -------------------------------
(***************************************************************************)
(* This is a TLA+ specification of the TAPIR algorithm. *)
(***************************************************************************)
EXTENDS FiniteSets, Naturals, TLC, TLAPS
Max(S) == IF S = {} THEN 0 ELSE CHOOSE i \in S: \A j \in S: j <= i
(***************************************************************************)
(* TAPIR constants: *)
(* 1. Shards: function from shard id to set of replica ids in the shard *)
(* 2. Transactions: set of all possible transactions *)
(* 3. nr_shards: number of shards *)
(***************************************************************************)
CONSTANTS Shards, Transactions, NrShards
\* Note: assume unique number ids for replicas
(***************************************************************************)
(* IR constants & variables (description in the IR module) *)
(***************************************************************************)
CONSTANTS Clients, Quorums, SuperQuorums,
max_vc, max_req, f
VARIABLES rState, rRecord, rCrtView, rLastView, rViewReplies, rNonce,
rViewOnDisk,
sentMsg, cCrtOp,
cCrtOpToFinalize, cMsgCounter, cCrtOpReplies, cCrtOpConfirms,
cState, aSuccessful, arRecord, aVisibility, gViewChangesNo
irReplicaVars == <<rState, rRecord, rCrtView, rViewOnDisk, rLastView,
rViewReplies, rNonce>>
irClientVars == <<cCrtOp, \* current operation at a client
cCrtOpReplies, \* current operation replies
cMsgCounter,
cState,
cCrtOpToFinalize,
cCrtOpConfirms>> \* Client variables.
irAppVars == <<aSuccessful, arRecord, aVisibility>> \* Application variables.
irOtherVars == <<sentMsg, gViewChangesNo>> \* Other variables.
IRMessageId == [cid: Clients, msgid: Nat]
(***************************************************************************)
(* TAPIR Variables/State: *)
(* 1. State at each replica: *)
(* rPrepareTxns = List of txns this replica is prepared *)
(* to commit *)
(* rTxnsLog = Log of committed and aborted txns in ts order *)
(* rStore = Versioned store *)
(* rBkpTable = Table of txns for which this replica *)
(* is the bkp coordinator *)
(* 2. State of communication medium: *)
(* sentMsg = sent (and duplicate) messages *)
(* 3. State at client: *)
(* cCrtTxn = crt txn requested by the client *)
(* *)
(***************************************************************************)
\* TAPIR variables & data structures
VARIABLES rPreparedTxns, rStore, rTxnsLogAborted, rTxnsLogCommitted,
rClock, cCrtTxn, cClock
tapirReplicaVars == <<rPreparedTxns, rStore, rTxnsLogAborted,
rTxnsLogCommitted,
rClock>>
tapirClientVars == <<cCrtTxn, cClock>>
vars == <<irReplicaVars, irClientVars, irAppVars, irOtherVars,
tapirReplicaVars, tapirClientVars>>
StoreEntry == [vs: Nat, val: Nat] \* vs = version
Store == [key: Nat,
entries: SUBSET StoreEntry,
latestVs: Nat,
latestVal: Nat]
TransactionTs == [cid: Clients, clock: Nat] \* Timestamp
ReadSet == [key: Nat, val: Nat, vs: Nat]
WriteSet == [key: Nat, val: Nat]
Transaction == [rSet: SUBSET ReadSet,
wSet: SUBSET WriteSet,
shards: SUBSET Nat]
TypeOK ==
/\ rStore \in [UNION {Shards[i]: i \in 1..NrShards} -> SUBSET Store]
/\ rPreparedTxns \in [UNION {Shards[i]: i \in 1..NrShards} -> SUBSET Transaction]
/\ rTxnsLogAborted \in [UNION {Shards[i]: i \in 1..NrShards} -> SUBSET Transaction]
/\ rTxnsLogCommitted \in [UNION {Shards[i]: i \in 1..NrShards} -> SUBSET Transaction]
TAPIRResults == {"Prepare-OK", "Retry", "Prepare-Abstain", "Abort"}
TAPIROpType == {"Prepare", "ABORT", "COMMIT"}
TAPIROpBody == [opType : TAPIROpType, txn: Transaction]
TAPIRClientFail == TRUE \* state we lose at the app level
TAPIRReplicaFail == TRUE \* state we lose at the app level
\* TAPIR implementation of IR interface
TAPIRExecInconsistent(op) == TRUE
TAPIRExecConsensus(op) == IF op.type = "Consensus" THEN "Prepare-OK" ELSE "Abort"
TAPIRDecide(results) == "Prepare-OK"
TAPIRMerge(R, d, u) == R \cup d \cup
{[msgid |-> x.msgid, op |-> x.op, res |-> "Prepare-OK"]: x \in u}
TAPIRSync(records) == TRUE
TAPIRSuccessfulInconsistentOp(c, S, op) == TRUE
TAPIRSuccessfulConsensusOp(c, S, op, res) == TRUE
\* Initialize for all shards
InitIR ==
/\ rState = [s \in 1..NrShards |-> [r \in Shards[s] |-> "NORMAL"]]
/\ rRecord = [s \in 1..NrShards |-> [r \in Shards[s] |-> {}]]
/\ rCrtView = [s \in 1..NrShards |-> [r \in Shards[s] |-> 0]]
/\ rViewOnDisk = [s \in 1..NrShards |-> [r \in Shards[s] |-> 0]]
/\ rLastView = [s \in 1..NrShards |-> [r \in Shards[s] |-> 0]]
/\ rViewReplies = [s \in 1..NrShards |-> [r \in Shards[s] |-> {}]]
/\ rNonce = [s \in 1..NrShards |-> [r \in Shards[s] |-> 0]]
/\ sentMsg = [s \in 1..NrShards |-> {}]
/\ cCrtOp = [s \in 1..NrShards |-> [c \in Clients |-> <<>>]]
/\ cCrtOpToFinalize = [s \in 1..NrShards |-> [c \in Clients |-> <<>>]]
/\ cMsgCounter = [s \in 1..NrShards |-> [c \in Clients |-> 0]]
/\ cCrtOpReplies = [s \in 1..NrShards |-> [c \in Clients |-> {}]]
/\ cCrtOpConfirms = [s \in 1..NrShards |-> [c \in Clients |-> {}]]
/\ cState = [c \in Clients |-> "NORMAL"]
/\ aSuccessful = {}
/\ arRecord = [s \in 1..NrShards |-> [r \in Shards[s] |-> {}]]
/\ aVisibility = [s \in 1..NrShards |-> [o \in IRMessageId |-> {}]]
/\ gViewChangesNo = [s \in 1..NrShards |-> 0]
\* IR instance per shard TODO: modify replica also
IR(s) == INSTANCE IR_consensus WITH AppClientFail <- TAPIRClientFail,
AppReplicaFail <- TAPIRReplicaFail,
OpBody <- TAPIROpBody,
ExecInconsistent <- TAPIRExecInconsistent,
ExecConsensus <- TAPIRExecConsensus,
Merge <- TAPIRMerge,
Sync <- TAPIRSync,
SuccessfulInconsistentOp <- TAPIRSuccessfulInconsistentOp,
SuccessfulConsensusOp <- TAPIRSuccessfulConsensusOp,
Decide <- TAPIRDecide,
Results <- TAPIRResults,
Replicas <- Shards[s],
Quorums <- Quorums[s],
SuperQuorums <- SuperQuorums[s],
S <- s
\* TAPIR messages
Message ==
[type: {"READ"},
key: Nat,
dst: UNION Shards]
\cup
[type: {"READ-REPLY"},
key: Nat,
val: Nat,
vs: Nat, \* version
dst: Clients]
\cup
[type: {"READ-VERSION"},
key: Nat,
vs: Nat,
dst: UNION Shards]
\cup
[type: {"READ-VERSION-REPLY"},
key: Nat,
vs: Nat,
dst: Clients]
InitTAPIR == /\ cCrtTxn = [c \in Clients |-> <<>>]
/\ cClock = [c \in Clients |-> 0]
/\ rPreparedTxns = [s \in 1..NrShards |-> [r \in Shards[s] |-> {}]]
/\ rStore = [r \in UNION {Shards[i]: i \in 1..NrShards} |-> {}]
/\ rTxnsLogAborted = [s \in 1..NrShards |-> [r \in Shards[s] |-> {}]]
/\ rTxnsLogCommitted = [s \in 1..NrShards |-> [r \in Shards[s] |-> {}]]
/\ rClock = [s \in 1..NrShards |-> [r \in Shards[s] |-> 0]]
Init == InitIR /\ InitTAPIR
-----------------------------------------------------------------------------
(***************************************************************************)
(* `^ \center \large{\textbf{Tapir replica actions}} ^' *)
(***************************************************************************)
\*TAPIRReplicaReceiveRead(r) == TRUE
\*TAPIRReplicaAction(r) ==
\* \/ /\ rState[r] = "NORMAL"
\* /\ \/ TAPIRReplicaReceiveRead(r)
-----------------------------------------------------------------------------
(***************************************************************************)
(* `^ \center \large{\textbf{Tapir client actions}} ^' *)
(***************************************************************************)
TAPIRClientExecuteTxn(c) ==
\* first, resolve all reads (read from any replica and get the vs)
\* then send prepares in all shard involved by seting the cCrtOp in the
\* respective IR shard instance
\* TODO: for now just simulate this, pick a transaction from
\* transaction pool, get some versions from the replica
\* stores
/\ cCrtTxn[c] = <<>>
/\ \E t \in Transactions:
LET rSet == {rse \in ReadSet:
/\ \E trse \in t.rSet : rse = trse
/\ LET
r == Max({r \in Shards[(rse.key % NrShards) + 1]:
\E se \in rStore[r]: rse.key = se.key})
IN
/\ r /= 0
/\ \E se \in rStore[r]:
/\ rse.key = se.key
/\ rse.val = se.latestVal
/\ rse.vs = se.latestVs
}
shards == {s \in 1..NrShards:
\/ \E trse \in t.rSet: s = (trse.key % NrShards) + 1
\/ \E twse \in t.wSet: s = (twse.key % NrShards) + 1 }
IN
/\ Cardinality(rSet) = Cardinality(t.rSet) \* found all the reads
/\ cCrtTxn' = [cCrtTxn EXCEPT ![c] = [rSet |-> rSet,
wSet |-> t.wSet,
shards |-> shards]]
/\ UNCHANGED <<irReplicaVars, irClientVars, irOtherVars, irAppVars,
tapirReplicaVars, cClock>>
TAPIRClientPrepareTxn(c) ==
/\ cCrtTxn[c] /= <<>>
/\ \E s \in cCrtTxn[c].shards: \* prepare in shard s
\* - ok if already prepared
/\ IR(s)!ClientRequest(c, [type |-> "Consensus",
body |-> [opType |-> "Prepare",
txn |-> cCrtTxn[c]]])
/\ UNCHANGED <<irReplicaVars, irAppVars,
cCrtOpReplies,
cCrtOpConfirms,
cCrtOpToFinalize,
gViewChangesNo,
cState, tapirClientVars, tapirReplicaVars>>
TAPIRClientAction(c) ==
\/ /\ cState[c] = "NORMAL"
/\ \/ TAPIRClientExecuteTxn(c) \* for now just simulate this
\* (don't send explicit READ messages)
\/ TAPIRClientPrepareTxn(c)
-----------------------------------------------------------------------------
(***************************************************************************)
(* `^ \center \large{\textbf{High-Level Actions}} ^' *)
(***************************************************************************)
Next ==
\/ \E c \in Clients: TAPIRClientAction(c)
\/ /\ \E s \in 1..NrShards: IR(s)!Next
/\ UNCHANGED <<tapirClientVars, tapirReplicaVars>>
\/ \* Avoid deadlock by termination
((\A s \in 1..NrShards:
\A i \in 1..Cardinality(Shards[s]):
rLastView[s][i] = max_vc) /\ UNCHANGED <<vars>>)
Inv == IR(1)!TypeOK /\ IR(1)!FaultTolerance
=============================================================================
\* Modification History
\* Last modified Mon Aug 31 12:55:38 PDT 2015 by aaasz
\* Created Sat Jan 31 18:31:52 PST 2015 by aaasz
--------------------------------- MODULE Test ---------------------------------
(***************************************************************************)
(* This is a TLA+ specification of the Inconsistent Replication algorithm. *)
(* (And a mechanically-checked proof of its correctness using TLAPS) *)
(***************************************************************************)
EXTENDS Naturals, FiniteSets, TLC
VARIABLES rViewReplies, recoveredOps
OpType == {"Inconsistent", "Consensus"}
OpStatus == {"TENTATIVE", "FINALIZED"}
Operations == [type: OpType, body: Nat]
TypeOK ==
/\ rViewReplies \in SUBSET ([lv: Nat,
r: SUBSET ([msgid: Nat,
op: Operations,
res: Nat,
status: OpStatus]
\cup [msgid: Nat,
op: Operations,
status: OpStatus]),
src: Nat])
A ==
rViewReplies
B ==
\* set of all records received in replies in A
UNION {x.r: x \in A}
test_recoveredConensusOps_R ==
\* any finalized consensus operation (in at least one record, in the maximum
\* latest view)
{x \in B:
/\ x.op.type = "Consensus"
/\ x.status = "FINALIZED"
/\ LET most_updated_reply ==
CHOOSE reply \in A:
/\ \E rec \in reply.r: /\ rec.msgid = x.msgid
/\ rec.status = "FINALIZED"
/\ \A rep \in A:
IF \E rec \in rep.r: /\ rec.msgid = x.msgid
/\ rec.status = "FINALIZED"
THEN rep.lv <= reply.lv
ELSE TRUE
IN
x \in most_updated_reply.r}
Init ==
/\ rViewReplies = {[lv |-> 1, r |-> {[msgid |-> 1, op |-> [type |-> "Consensus", body |-> 1], res |-> 1, status |-> "FINALIZED"]}, src |-> 1],
[lv |-> 2, r |-> {[msgid |-> 1, op |-> [type |-> "Consensus", body |-> 1], res |-> 2, status |-> "FINALIZED"]}, src |-> 2],
[lv |-> 3, r |-> {[msgid |-> 1, op |-> [type |-> "Consensus", body |-> 1], res |-> 3, status |-> "FINALIZED"]}, src |-> 3]}
/\ recoveredOps = {}
Next ==
/\ recoveredOps' = test_recoveredConensusOps_R
/\ Assert(Cardinality(recoveredOps) = 0, "Should fail")
/\ UNCHANGED <<rViewReplies>>
=============================================================================
\* Modification History
\* Last modified Fri Apr 24 14:34:42 PDT 2015 by aaasz
\* Created Fri Dec 12 17:42:14 PST 2014 by aaasz
------------------------ MODULE VR ------------------------------------------
(***************************************************************************)
(* This is a TLA+ specification of the VR algorithm. *)
(***************************************************************************)
EXTENDS FiniteSets, Naturals, TLC, TLAPS
-----------------------------------------------------------------------------
(***************************************************************************)
(* `^ \center \large{\textbf{Constants}} ^' *)
(***************************************************************************)
(***************************************************************************)
(* Constant parameters: *)
(* Replicas: the set of all replicas (Replica IDs) *)
(* Clients: the set of all clients (Client IDs) *)
(* Quorums: the set of all quorums *)
(* SuperQuorums: the set of all super quorums *)
(* Results: the set of all possible result types *)
(* OperationBody: the set of all possible operation bodies *)
(* (with arguments, etc. - can be infinite) *)
(* S: shard id of the shard Replicas constitute *)
(* f: maximum number of failures allowed (half of n) *)
(* *)
(* Constants used to bound variables, for model checking (Nat is bounded) *)
(* max_vc: maximum number of View-Changes allowed for each replicas *)
(* max_req: maximum number of op requests performed by clients *)
(***************************************************************************)
CONSTANTS Replicas, Clients, Quorums, Operations, f, max_req, max_vc, max_c
ASSUME IsFiniteSet(Replicas)
ASSUME QuorumAssumption ==
/\ Quorums \subseteq SUBSET Replicas
/\ \A Q1, Q2 \in Quorums: Q1 \cap Q2 # {}
(***************************************************************************)
(* The possible states of a replica and the two types of operations *)
(* currently defined by IR. *)
(***************************************************************************)
ReplicaState == {"NORMAL", "FAILED", "RECOVERING", "VIEW-CHANGING"}
ClientState == {"NORMAL", "FAILED"}
(***************************************************************************)
(* Definition of operation space *)
(***************************************************************************)
Operation == [op: Nat,
c: Nat,
s: Nat] \cup {<<>>}
(***************************************************************************)
(* Message is defined to be the set of all possible messages *)
(***************************************************************************)
\* TODO: Assumptions
\* Assume unique message ids
\* Assume no more than f replica failures
Message ==
[type: {"REQUEST"},
op: Nat,
c: Clients,
s: Nat]
\cup [type: {"REPLY"},
v: Nat,
s: Nat,
x: Nat,
dst: Clients]
\cup
[type: {"PREPARE"},
v: Nat,
m: Operation,
n: Nat,
k: Nat]
\cup
[type: {"PREPARE-OK"},
v: Nat,
n: Nat,
src: Replicas]
\cup
[type: {"COMMIT"},
v: Nat,
k: Nat]
\cup
[type: {"START-VIEW-CHANGE"},
v: Nat,
src: Replicas]
\cup
[type: {"DO-VIEW-CHANGE"},
v: Nat,
l: [1..max_req -> Operation],
vp: Nat,
n: Nat,
k: Nat,
src: Replicas,
dst: Replicas]
\cup
[type: {"START-VIEW"},
v: Nat,
l: [1..max_req -> Operation],
n: Nat,
k: Nat,
src: Replicas]
\cup
[type: {"RECOVERY"},
x: Nat, \* nonce
src: Replicas]
\cup
[type: {"RECOVERY-RESPONSE"},
v: Nat,
x: Nat,
l: [1..max_req -> Operation],
n: Nat,
k: Nat,
dst: Replicas,
src: Replicas]
\cup
[type: {"STATE-REQUEST"},
v: Nat,
n: Nat,
src: Replicas]
\cup
[type: {"STATE-RESPONSE"},
v: Nat,
l: [1..max_req -> Operation],
n: Nat,
k: Nat,
src: Replicas]
-----------------------------------------------------------------------------
(***************************************************************************)
(* `^ \center \large{\textbf{Variables and State Predicates}} ^' *)
(***************************************************************************)
(***************************************************************************)
(* Variables: *)
(* 1. State at each replica: *)
(* rStatus = Denotes current replica state. Either: *)
(* - NORMAL (processing operations) *)
(* - VIEW-CHANGING (participating in recovery) *)
(* - RECOVERING (recovering replica) *)
(* rLog = operations log and their results *)
(* rViewNr = current view number *)
(* rOpNr = assigned to the most recently received request *)
(* rCommitNr = the OpNr of the most recently commited op *)
(* rClientTable = for each client, the number of its most recent*)
(* request *)
(* 2. State of communication medium: *)
(* sentMsg = sent (but not yet received) messages *)
(* 3. State at client: *)
(* cCrtOp = crt operation requested by the client *)
(* cReqNr = crt request number *)
(* *)
(***************************************************************************)
VARIABLES rStatus, rLog, rViewNr, rOpNr, rLastView, rReplies,
rClientTable, rCommitNr, rNonce, rCrtOp,
sentMsg,
cCrtOp, cReqNr, cStatus,
aSuccessful,
gViewChangesNo,
gCrashesNo
(***************************************************************************)
(* Defining these tuples makes it easier to express which varibles remain *)
(* unchanged *)
(***************************************************************************)
\* Replica variables.
rVars == <<rStatus, rLog, rViewNr, rOpNr, rLastView,
rClientTable, rCommitNr, rReplies, rNonce, rCrtOp>>
\* Client variables.
cVars == <<cCrtOp, \* current operation at a client
cReqNr,
cStatus>>
\* Application variables
aVars == <<aSuccessful>> \* we'll use them to write invariants
\* Other variables.
oVars == <<sentMsg, gViewChangesNo, gCrashesNo>>
\* All variables.
vars == <<rVars, cVars, aVars, oVars>>
TypeOK ==
/\ rStatus \in [Replicas -> ReplicaState]
/\ rLog \in [Replicas -> [1..max_req -> Operation]]
/\ rViewNr \in [Replicas -> Nat]
/\ rOpNr \in [Replicas -> Nat]
/\ rCommitNr \in [Replicas -> Nat]
/\ rLastView \in [Replicas -> Nat]
/\ rReplies \in [Replicas -> SUBSET ([type: {"prepare-ok"},
v: Nat,
n: Nat,
src: Replicas]
\cup [type: {"start-view-change"},
v: Nat,
src: Replicas]
\cup [type: {"do-view-change"},
v: Nat,
l: [1..max_req -> Operation],
vp: Nat,
n: Nat,
k: Nat,
src: Replicas]
\cup [type: {"recovery-response"},
v: Nat,
x: Nat,
l: [1..max_req -> Operation],
n: Nat,
k: Nat,
src: Replicas])]
/\ rNonce \in [Replicas -> Nat]
/\ rClientTable \in [Replicas -> [Clients -> Nat]]
/\ rCrtOp \in [Replicas -> Operation]
/\ sentMsg \in SUBSET Message
/\ cCrtOp \in [Clients -> Operation]
/\ cReqNr \in [Clients -> Nat]
/\ cStatus \in [Clients -> ClientState]
/\ aSuccessful \in SUBSET Operation
/\ gViewChangesNo \in Nat
/\ gCrashesNo \in Nat
Init ==
/\ rStatus = [r \in Replicas |-> "NORMAL"]
/\ rLog = [r \in Replicas |-> [i \in 1..max_req |-> <<>>]]
/\ rViewNr = [r \in Replicas |-> 0]
/\ rLastView = [r \in Replicas |-> 0]
/\ rOpNr = [r \in Replicas |-> 0]
/\ rCommitNr = [r \in Replicas |-> 0]
/\ rReplies = [r \in Replicas |-> {}]
/\ rNonce = [r \in Replicas |-> 0]
/\ rClientTable = [r \in Replicas |-> [c \in Clients |-> 0]]
/\ rCrtOp = [r \in Replicas |-> <<>>]
/\ sentMsg = {}
/\ cCrtOp = [c \in Clients |-> <<>>]
/\ cReqNr = [c \in Clients |-> 0]
/\ cStatus = [c \in Clients |-> "NORMAL"]
/\ aSuccessful = {}
/\ gViewChangesNo = 0
/\ gCrashesNo = 0
-----------------------------------------------------------------------------
(***************************************************************************)
(* `^ \center \large{\textbf{Actions}} ^' *)
(***************************************************************************)
Send(m) == sentMsg' = sentMsg \cup {m}
AmLeader(r, v) == r = (v % Cardinality(Replicas)) + 1
IsLeader(r, v) == AmLeader(r, v)
LeaderOf(v) == CHOOSE x \in Replicas: IsLeader(x, v)
-----------------------------------------------------------------------------
(***************************************************************************)
(* `^ \center \large{\textbf{Client Actions}} ^' *)
(***************************************************************************)
\* Note: CHOOSE does not introduce nondeterminism (the same value is chosen
\* each time)
\*Client sends a request
ClientRequest(c) ==
\E op \in Operations:
/\ cCrtOp[c] = <<>> \* the client is not waiting for a result
\* of another operation
/\ cReqNr' = [cReqNr EXCEPT ![c] = @ + 1]
/\ cCrtOp' = [cCrtOp EXCEPT ![c] = [op |-> op,
c |-> c,
s |-> cReqNr[c] + 1]]
/\ Send([type |-> "REQUEST",
op |-> op,
c |-> c,
s |-> cReqNr[c] + 1])
/\ UNCHANGED <<cStatus, rVars, aVars, gViewChangesNo, gCrashesNo>>
/\ cReqNr[c] < max_req \* BOUND the number of requests a client can make
\*Client received a reply
ClientReceiveReply(c) ==
\E msg \in sentMsg:
/\ msg.type = "REPLY"
/\ msg.s = cReqNr[c] \* reply to c's request for crt op
/\ cCrtOp[c] /= <<>>
/\ cCrtOp' = [cCrtOp EXCEPT ![c] = <<>>]
/\ aSuccessful' = aSuccessful \cup {cCrtOp[c]}
/\ UNCHANGED <<cReqNr, cStatus, rVars, oVars>>
-----------------------------------------------------------------------------
(***************************************************************************)
(* `^ \center \large{\textbf{Replica Actions}} ^' *)
(***************************************************************************)
(***************************************************************************)
(* Normal Operation protocol *)
(***************************************************************************)
\* Replica sends prepares
ReplicaReceiveRequest(r) ==
/\ r = 1
/\ AmLeader(r, rViewNr[r])
/\ \A replica \in {3}: /\ rViewNr[replica] = 0 /\ rStatus[replica] = "NORMAL"\* PRUNE send request only after view-change
/\ \E reply \in rReplies[2]: reply.type="do-view-change" /\ reply.src = replica
/\ rCrtOp[r] = <<>>
/\ \E msg \in sentMsg:
/\ msg.type = "REQUEST"
/\ msg.s > rClientTable[r][msg.c]
/\ LET operation == [op |-> msg.op,
c |-> msg.c,
s |-> msg.s]
IN
/\ rCrtOp' = [rCrtOp EXCEPT ![r] = operation]
/\ rLog' = [rLog EXCEPT ![r][rOpNr[r] + 1] = operation]
/\ Send([type |-> "PREPARE",
v |-> rViewNr[r],
m |-> operation,
n |-> rOpNr[r] + 1,
k |-> rCommitNr[r]])
/\ rClientTable' = [rClientTable EXCEPT ![r][msg.c] = msg.s]
/\ rOpNr' = [rOpNr EXCEPT ![r] = @ + 1]
/\ UNCHANGED <<rStatus, rLastView, rReplies,
rNonce, rViewNr, rCommitNr, cVars, aVars, gViewChangesNo, gCrashesNo>>
\* Replica receives a prepare request from the leader
ReplicaReceivePrepare(r) ==
/\ r \in {1,3} \* PRUNE
/\ \E msg \in sentMsg:
/\ msg.type = "PREPARE"
/\ \/ /\ msg.v > rViewNr[r] \/ msg.n > rOpNr[r] + 1 \* Need state transfer
\* (miss a prepare or in a lower view)
/\ Send([type |-> "STATE-REQUEST",
v |-> rViewNr[r],
n |-> rOpNr[r],
src |-> r])
/\ UNCHANGED <<rVars, cVars, aVars, gViewChangesNo>>
\/ /\ msg.v = rViewNr[r] /\ \/ msg.n = rOpNr[r] + 1
\/ msg.n = rOpNr[r]
/\ rLog' = [rLog EXCEPT ![r][msg.n] = msg.m]
/\ rOpNr' = [rOpNr EXCEPT ![r] = msg.n]
/\ rClientTable' = [rClientTable EXCEPT ![r][msg.m.c] = msg.m.s]
/\ Send([type |-> "PREPARE-OK",
v |-> rViewNr[r],
n |-> msg.n,
src |-> r])
/\ UNCHANGED <<rCrtOp, rStatus, rViewNr, rLastView,
rReplies, rCommitNr,
rNonce, cVars, aVars, gViewChangesNo, gCrashesNo>>
ReplicaReceiveStateRequest(r) ==
\E msg \in sentMsg:
/\ msg.type = "STATE-REQUEST"
/\ msg.v < rViewNr[r] /\ msg.n < rOpNr[r]
/\ Send([type |-> "STATE-RESPONSE",
v |-> rViewNr[r],
l |-> rLog[r],
n |-> rOpNr[r],
k |-> rCommitNr[r],
src |-> r])
/\ UNCHANGED <<rVars, cVars, aVars, gViewChangesNo, gCrashesNo>>
ReplicaReceiveStateResponse(r) ==
\E msg \in sentMsg: \* TODO: some more cases to check to merge
\* these - for now don't need, just one operation req
/\ msg.type = "STATE-RESPONSE"
/\ \/ msg.v = rViewNr[r] /\ msg.n > rOpNr[r]
\/ msg.v > rViewNr[r]
/\ rViewNr' = [rViewNr EXCEPT ![r] = msg.v]
/\ rLog' = [rLog EXCEPT ![r] = msg.l]
/\ rOpNr' = [rOpNr EXCEPT ![r] = msg.n]
/\ UNCHANGED <<rClientTable, rCrtOp, rCommitNr,
rStatus, rLastView, rReplies,
rNonce, cVars, aVars, oVars>>
\* Leader receives a prepare ok
ReplicaReceivePrepareOK(r) ==
/\ \E msg \in sentMsg:
/\ msg.type = "PREPARE-OK"
/\ msg.v = rViewNr[r]
/\ msg.n = rOpNr[r]
/\ AmLeader(r, msg.v)
/\ rReplies' = [rReplies EXCEPT ![r] = @ \cup
{[type |-> "prepare-ok",
v |-> msg.v,
n |-> msg.n,
src |-> msg.src
]}]
/\ UNCHANGED <<rClientTable, rCrtOp, rOpNr, rCommitNr,
rStatus, rLastView, rLog, rViewNr,
rNonce, cVars, aVars, oVars>>
\* Leader received enough replies
ReplicaSendReply(r) ==
/\ AmLeader(r, rViewNr[r])
/\ \E Q \in Quorums:
/\ \A rep \in Q: \E reply \in rReplies[r]:
/\ reply.src = rep
/\ reply.type = "prepare-ok"
/\ reply.v = rViewNr[r]
/\ reply.n = rOpNr[r]
/\ Send([type |-> "REPLY",
v |-> rViewNr[r],
s |-> rLog[r][rOpNr[r]].s,
x |-> 1,
dst |-> rLog[r][rOpNr[r]].c])
/\ UNCHANGED <<rVars, cVars, aVars, gViewChangesNo, gCrashesNo>>
(***************************************************************************)
(* View-change protocol *)
(***************************************************************************)
\* A replica starts a view change procedure
ReplicaSendStartViewChange(r) ==
/\ r \in {2, 3} \* PRUNE just replicas 2 and 3 start a view change
/\ rViewNr' = [rViewNr EXCEPT ![r] = @ + 1]
/\ Send([type |-> "START-VIEW-CHANGE",
v |-> rViewNr[r] + 1,
src |-> r])
/\ rStatus' = [rStatus EXCEPT ![r] = "VIEW-CHANGING"]
/\ UNCHANGED <<rReplies, rNonce, rClientTable,
rCrtOp, rOpNr, rCommitNr,
rLastView, rLog, cVars, aVars, gCrashesNo>>
/\ gViewChangesNo < max_vc \* BOUND on number of view changes
/\ gViewChangesNo' = gViewChangesNo + 1
\* A replica received a message to start view change
ReplicaReceiveStartViewChange(r) ==
/\ r \in {2, 3} \* PRUNE just these paricipate in a view change
/\ \E r1 \in {2, 3}:
rStatus[r1] = "NORMAL" \* PRUNE
/\ \E msg \in sentMsg:
/\ msg.type = "START-VIEW-CHANGE"
/\ \/ /\ rStatus[r] = "NORMAL"
/\ msg.v > rViewNr[r]
/\ rStatus' = [rStatus EXCEPT ![r] = "VIEW-CHANGING"]
\/ /\ rStatus[r] = "VIEW-CHANGING"
/\ msg.v >= rViewNr[r]
/\ UNCHANGED <<rStatus>>
/\ rReplies' = [rReplies EXCEPT ![r] = @ \cup
{[type |-> "start-view-change",
v |-> msg.v,
src |-> msg.src]}]
/\ rViewNr' = [rViewNr EXCEPT ![r] = msg.v]
/\ Send([type |-> "START-VIEW-CHANGE",
v |-> msg.v,
src |-> r])
/\ UNCHANGED <<rClientTable, rCrtOp, rOpNr, rCommitNr,
rNonce, rLog, rLastView, cVars,
aVars, gViewChangesNo, gCrashesNo>>
\* We received enough view change replies to be able to
\* send a do-view-change
ReplicaSendDoViewChange(r) ==
/\ r \in {2,3} \* PRUNE just these participate in a view change
/\ \E Q \in Quorums:
/\ r \in Q
/\ \A rep \in Q: rep = r \/ \E reply \in rReplies[r]:
/\ reply.src = rep
/\ reply.type = "start-view-change"
/\ reply.v = rViewNr[r]
/\ Send([type |-> "DO-VIEW-CHANGE",
v |-> rViewNr[r],
l |-> rLog[r],
vp |-> rLastView[r],
n |-> rOpNr[r],
k |-> rCommitNr[r],
src |-> r,
dst |-> LeaderOf(rViewNr[r])])
/\ UNCHANGED <<rVars, cVars, aVars, gViewChangesNo, gCrashesNo>>
\* Replica received DO-VIEW-CHANGE message
ReplicaReceiveDoViewChange(r) ==
/\ r = 2 \* PRUNE just the new leader switches view
\* /\ \A rr \in {2,3}: rStatus[rr] = "NORMAL" /\ rNonce[rr] > 0 /\ rViewNr[rr] = 0
/\ \E msg \in sentMsg:
/\ msg.type = "DO-VIEW-CHANGE"
/\ msg.dst = r
/\ \/ /\ rStatus[r] = "NORMAL"
/\ msg.v > rViewNr[r]
\/ /\ rStatus[r] = "VIEW-CHANGING"
/\ msg.v >= rViewNr[r]
/\ rStatus' = [rStatus EXCEPT ![r] = "VIEW-CHANGING"]
\* keep only the one with the higher view (v)
/\ LET
existingRecord == {x \in rReplies[r]:
/\ x.type = "do-view-change"
/\ x.src = msg.src} \* should only be one item in set
IN
IF \A x \in existingRecord: x.v < msg.v
THEN rReplies' = [rReplies EXCEPT ![r] =
(@ \ existingRecord) \cup
{[type |-> "do-view-change",
v |-> msg.v,
l |-> msg.l,
vp |-> msg.vp,
n |-> msg.n,
k |-> msg.k,
src |-> msg.src]}]
ELSE FALSE
/\ UNCHANGED <<cVars, rClientTable, rCrtOp, rOpNr, rCommitNr,
rViewNr, rLastView, rLog, rNonce, aVars, oVars>>
\* Note: Assume one reply for view change per replica in Q
\* (in ReplicaReceiveDoViewChange we keep only the most recent reply)
ReplicaSendStartView(r) ==
/\ \E Q \in Quorums:
/\ r \in Q
/\ \A r1 \in Q:
/\ \A r2 \in Q: \E rr, pr \in rReplies[r]:
/\ rr.type = "do-view-change"
/\ pr.type = "do-view-change"
/\ rr.src = r1
/\ pr.src = r2
/\ rr.v = pr.v
\*/\ rr.v > rLastView[r]
/\ AmLeader(r, rr.v)
\* received at a least a quorum of replies
/\ LET
A ==
\* set of all do-view-change replies from Q
{x \in rReplies[r]: /\ x.src \in Q
/\ x.type = "do-view-change"}
B ==
\* set of all do-view-change replies in A that have the biggest vp
{x \in A: \A rep \in A: rep.vp <= x.vp}
replyWithMostCompleteLog ==
\* if multiple replies in B, choose the one with the largest op number
CHOOSE x \in B: \A rep \in B: rep.n <= x.n
ops(c) ==
{x \in Operation:
\E i \in 1..max_req:
/\ replyWithMostCompleteLog.l[i] = x
/\ x /= <<>>
/\ x.c = c}
v_new ==
\* the one decided by quorum Q
CHOOSE v \in {x.v: x \in A}: TRUE
IN
/\ rLog' = [rLog EXCEPT ![r] = replyWithMostCompleteLog.l]
/\ Send([type |-> "START-VIEW",
v |-> v_new,
l |-> replyWithMostCompleteLog.l,
n |-> replyWithMostCompleteLog.n,
k |-> replyWithMostCompleteLog.k,
src |-> r])
/\ rOpNr' = [rOpNr EXCEPT ![r] = replyWithMostCompleteLog.n]
/\ rCommitNr' = [rCommitNr EXCEPT ![r] = replyWithMostCompleteLog.k]
/\ rViewNr' = [rViewNr EXCEPT ![r] = v_new]
/\ rLastView' = [rLastView EXCEPT ![r] = v_new]
\* Update client table based on the latest op commited from each client
/\ rClientTable' = [rClientTable EXCEPT ![r] = [c \in Clients |->
IF ops(c) /= {}
THEN CHOOSE s \in {op.s: op \in ops(c)}:
\A op \in ops(c): op.s <= s
ELSE 0]]
/\ rCrtOp' = [rCrtOp EXCEPT ![r] = <<>>]
/\ rStatus' = [rStatus EXCEPT ![r] = "NORMAL"]
/\ rReplies' = [rReplies EXCEPT ![r] = {}]
/\ UNCHANGED <<rNonce, cVars, aVars, gViewChangesNo, gCrashesNo>>
\*A replica receives a start view message
ReplicaReceiveStartView(r) ==
/\ aSuccessful /= {} \* PRUNE possible paths
/\ \E msg \in sentMsg:
/\ msg.type = "START-VIEW"
/\ \/ /\ rStatus[r] = "NORMAL"
/\ msg.v > rViewNr[r]
\/ /\ \/ rStatus[r] = "VIEW-CHANGING"
\/ rStatus[r] = "RECOVERING"
/\ msg.v >= rViewNr[r]
/\ rViewNr' = [rViewNr EXCEPT ![r] = msg.v]
/\ rLastView' = [rLastView EXCEPT ![r] = msg.v]
/\ rLog' = [rLog EXCEPT ![r] = msg.l]
/\ LET ops(c) ==
{x \in Operation:
\E i \in 1..max_req:
/\ msg.l[i] = x
/\ x /= <<>>
/\ x.c = c}
IN
rClientTable' = [rClientTable EXCEPT ![r] = [c \in Clients |->
IF ops(c) /= {}
THEN CHOOSE s \in {op.s: op \in ops(c)}:
\A op \in ops(c): op.s <= s
ELSE 0]]
/\ rStatus' = [rStatus EXCEPT ![r] = "NORMAL"]
/\ rReplies' = [rReplies EXCEPT ![r] = {}]
/\ UNCHANGED <<rCrtOp, rOpNr, rCommitNr,
rNonce, cVars, aVars, oVars>>
(***************************************************************************)
(* Recovery protocol *)
(***************************************************************************)
\*A replica fails and looses everything
ReplicaFail(r) ==
/\ r \in {2,3} \* PRUNE just these are allowed to fail
/\ rNonce[r] < 3 \* BOUND
/\ gCrashesNo < max_c \* BOUND
/\ gCrashesNo' = gCrashesNo + 1
\*/\ \E msg \in sentMsg: msg.type = "START-VIEW-CHANGE" /\ msg.src = r \* PRUNE
/\ rStatus' = [rStatus EXCEPT ![r] = "FAILED"]
/\ rViewNr' = [rViewNr EXCEPT ![r] = 0]
/\ rReplies' = [rReplies EXCEPT ![r] = {}]
/\ rCrtOp' = [rCrtOp EXCEPT ![r] = <<>>]
/\ rCommitNr' = [rCommitNr EXCEPT ![r] = 0]
/\ rOpNr' = [rOpNr EXCEPT ![r] = 0]
/\ rClientTable' = [rClientTable EXCEPT![r] = [c \in Clients |-> 0]]
/\ rLog' = [rLog EXCEPT![r] = [i \in 1..max_req |-> <<>>]]
/\ UNCHANGED <<rLastView, rNonce, cVars, aSuccessful, sentMsg, gViewChangesNo>>
/\ \* We assume less than f replicas fail simultaneously
Cardinality({re \in Replicas:
\/ rStatus[re] = "FAILED"
\/ rStatus[re] = "RECOVERING"}) < f
\* Recovery protocol
ReplicaRecover(r) ==
/\ Send([type |-> "RECOVERY",
x |-> rNonce[r] + 1,
src |-> r])
/\ rStatus' = [rStatus EXCEPT ![r] = "RECOVERING"]
/\ rNonce' = [rNonce EXCEPT ![r] = @ + 1]
/\ UNCHANGED <<rLog, rViewNr, rReplies,
rClientTable, rCrtOp,
rLastView, rCommitNr,
rOpNr,
cVars, aVars,
gViewChangesNo, gCrashesNo>>
ReplicaSendRecoveryResponse(r) ==
/\ rViewNr[r] = 0
/\ \E msg \in sentMsg:
/\ msg.type = "RECOVERY"
\* TODO: send nil if not leader,
\* does not really matter for our purposes
\* right now
/\ Send([type |-> "RECOVERY-RESPONSE",
v |-> rViewNr[r],
x |-> msg.x,
l |-> rLog[r],
n |-> rOpNr[r],
k |-> rCommitNr[r],
src |-> r,
dst |-> msg.src])
/\ UNCHANGED <<rVars, cVars, aVars, gViewChangesNo, gCrashesNo>>
ReplicaReceiveRecoveryResponse(r) ==
\E msg \in sentMsg:
/\ msg.type = "RECOVERY-RESPONSE"
/\ msg.x = rNonce[r]
/\ msg.dst = r
/\ rReplies' = [rReplies EXCEPT ![r] = @ \cup
{[type |-> "recovery-response",
v |-> msg.v,
x |-> msg.x,
l |-> msg.l,
n |-> msg.n,
k |-> msg.k,
src |-> msg.src]}]
/\ UNCHANGED <<rStatus, rLastView, rLog, rViewNr,
rClientTable, rCrtOp, rOpNr, rCommitNr,
rNonce, cVars, aVars, oVars>>
ReplicaFinishRecovery(r) ==
/\ \E Q \in Quorums:
/\ \A rep \in Q: \E reply \in rReplies[r]:
/\ reply.type = "recovery-response"
/\ reply.src = rep
/\ reply.x = rNonce[r]
\* received at a least a quorum of replies
/\ LET
A ==
\* set of all recovery-response replies from Q
{x \in rReplies[r]: /\ x.src \in Q
/\ x.type = "recovery-response"
/\ x.x = rNonce[r]}
B ==
\* set of all recovery-response replies in A from the biggest view
{x \in A: \A rep \in A: rep.v <= x.v}
leaderReply ==
\* reply from leader
IF ~ \E x \in B: IsLeader(x.src, x.v)
THEN <<>>
ELSE CHOOSE x \in B: IsLeader(x.src, x.v)
IN
/\ leaderReply /= <<>>
/\ Assert(leaderReply.v = 0, "Assert that we recover just in view 0")
/\ rLog' = [rLog EXCEPT ![r] = leaderReply.l]
/\ rOpNr' = [rOpNr EXCEPT ![r] = leaderReply.n]
/\ rViewNr' = [rViewNr EXCEPT ![r] = leaderReply.v]
/\ rLastView' = [rLastView EXCEPT ![r] = leaderReply.v]
\* TODO: rClientTable
/\ rStatus' = [rStatus EXCEPT ![r] = "NORMAL"]
/\ rReplies' = [rReplies EXCEPT ![r] = {}]
/\ UNCHANGED <<rNonce, cVars, aVars, gViewChangesNo, gCrashesNo, sentMsg, rClientTable,
rCrtOp, rCommitNr>>
-----------------------------------------------------------------------------
(***************************************************************************)
(* `^ \center \large{\textbf{High-Level Actions}} ^' *)
(***************************************************************************)
ClientAction(c) ==
\/ /\ cStatus[c] = "NORMAL"
/\ \/ ClientRequest(c) \* some client tries to replicate commit an operation
\/ ClientReceiveReply(c) \* some client receives a reply from a replica
\*\/ ClientFail(c) \* some client fails
\/ /\ cStatus[c] = "FAILED"
\*/\ \/ ClientRecover(c)
ReplicaAction(r) ==
\/ /\ rStatus[r] = "NORMAL"
/\ \/ ReplicaReceiveRequest(r)
\/ ReplicaSendReply(r)
\/ ReplicaReceivePrepare(r)
\/ ReplicaReceivePrepareOK(r)
\/ ReplicaSendStartViewChange(r)
\/ ReplicaReceiveStartViewChange(r)
\/ ReplicaSendDoViewChange(r)
\/ ReplicaReceiveDoViewChange(r)
\/ ReplicaSendStartView(r)
\/ ReplicaReceiveStartView(r)
\/ ReplicaSendRecoveryResponse(r)
\/ ReplicaReceiveStateRequest(r)
\/ ReplicaReceiveStateResponse(r)
\*\/ ReplicaFail(r) \* some replica fails
\/ /\ rStatus[r] = "FAILED"
/\ \/ ReplicaRecover(r) \* start view-change protocol
\/ /\ rStatus[r] = "RECOVERING"
/\ \/ ReplicaReceiveRecoveryResponse(r) \* Replica received a
\* recovery response
\/ ReplicaFinishRecovery(r)
\/ /\ rStatus[r] = "VIEW-CHANGING"
/\ \/ ReplicaReceiveStartViewChange(r)
\/ ReplicaSendDoViewChange(r)
\/ ReplicaReceiveDoViewChange(r)
\/ ReplicaSendStartView(r)
\/ ReplicaReceiveStartView(r)
\/ ReplicaFail(r)
Next ==
\/ \E c \in Clients: ClientAction(c)
\/ \E r \in Replicas: ReplicaAction(r)
\/ \* Avoid deadlock by termination
(\A i \in 1..Cardinality(Replicas): rLastView[i] = max_vc) /\ UNCHANGED <<vars>>
Spec == Init /\ [] [Next]_vars
FaultTolerance ==
/\ \A successfulOp \in aSuccessful, Q \in Quorums:
(\A r \in Q: rStatus[r] = "NORMAL" \/ rStatus[r] = "VIEW-CHANGING")
=> (\E r \in Q: \E i \in 1..max_req: rLog[r][i] = successfulOp)
Inv == FaultTolerance
Inv2 == aSuccessful = {}
=============================================================================
\* Modification History
\* Last modified Sat Aug 01 13:57:28 PDT 2015 by aaasz
\* Created Fri Dec 12 17:42:14 PST 2014 by aaasz
Quorums <- {{1,2}, {1,3}, {2,3}}
Clients <- {1}
max_req <- 1
f <- 1
max_vc <- 3
Operations <- {1}
Replicas <- {1,2,3}
max_c <- 3
......@@ -9,8 +9,8 @@ PROTOS += $(addprefix $(d), \
LIB-request := $(o)request.o
OBJS-client := $(o)client.o \
$(LIB-message) $(LIB-configuration) $(LIB-transport) \
$(LIB-request)
$(LIB-message) $(LIB-configuration) \
$(LIB-transport) $(LIB-request)
OBJS-replica := $(o)replica.o $(o)log.o \
$(LIB-message) $(LIB-request) \
......
......@@ -36,7 +36,19 @@
#include <random>
namespace replication {
std::string ErrorCodeToString(ErrorCode err) {
switch (err) {
case ErrorCode::TIMEOUT:
return "TIMEOUT";
case ErrorCode::MISMATCHED_CONSENSUS_VIEWS:
return "MISMATCHED_CONSENSUS_VIEWS";
default:
Assert(false);
return "";
}
}
Client::Client(const transport::Configuration &config, Transport *transport,
uint64_t clientid)
: config(config), transport(transport)
......@@ -61,7 +73,7 @@ Client::~Client()
{
}
void
Client::ReceiveMessage(const TransportAddress &remote,
const string &type, const string &data)
......@@ -69,5 +81,5 @@ Client::ReceiveMessage(const TransportAddress &remote,
Panic("Received unexpected message type: %s",
type.c_str());
}
} // namespace replication
......@@ -5,7 +5,7 @@
* interface to replication client stubs
*
* Copyright 2013-2015 Irene Zhang <iyzhang@cs.washington.edu>
* Naveen Kr. Sharma <nksharma@cs.washington.edu>
* Naveen Kr. Sharma <naveenks@cs.washington.edu>
* Dan R. K. Ports <drkp@cs.washington.edu>
*
* Permission is hereby granted, free of charge, to any person
......@@ -42,32 +42,54 @@
namespace replication {
// A client's request may fail for various reasons. For example, if enough
// replicas are down, a client's request may time out. An ErrorCode indicates
// the reason that a client's request failed.
enum class ErrorCode {
// For whatever reason (failed replicas, slow network), the request took
// too long and timed out.
TIMEOUT,
// For IR, if a client issues a consensus operation and receives a majority
// of replies and confirms in different views, then the operation fails.
MISMATCHED_CONSENSUS_VIEWS
};
std::string ErrorCodeToString(ErrorCode err);
class Client : public TransportReceiver
{
public:
typedef std::function<void (const string &, const string &)> continuation_t;
typedef std::function<void (const string &)> timeout_continuation_t;
using continuation_t =
std::function<void(const string &request, const string &reply)>;
using error_continuation_t =
std::function<void(const string &request, ErrorCode err)>;
static const uint32_t DEFAULT_UNLOGGED_OP_TIMEOUT = 1000; // milliseconds
Client(const transport::Configuration &config, Transport *transport,
uint64_t clientid = 0);
virtual ~Client();
virtual void Invoke(const string &request,
continuation_t continuation) = 0;
virtual void InvokeUnlogged(int replicaIdx,
const string &request,
continuation_t continuation,
timeout_continuation_t timeoutContinuation = nullptr,
uint32_t timeout = DEFAULT_UNLOGGED_OP_TIMEOUT) = 0;
virtual void Invoke(
const string &request,
continuation_t continuation,
error_continuation_t error_continuation = nullptr) = 0;
virtual void InvokeUnlogged(
int replicaIdx,
const string &request,
continuation_t continuation,
error_continuation_t error_continuation = nullptr,
uint32_t timeout = DEFAULT_UNLOGGED_OP_TIMEOUT) = 0;
virtual void ReceiveMessage(const TransportAddress &remote,
const string &type,
const string &data);
protected:
transport::Configuration config;
Transport *transport;
uint64_t clientid;
};
......
......@@ -5,7 +5,7 @@
* a replica's log of pending and committed operations
*
* Copyright 2013-2015 Irene Zhang <iyzhang@cs.washington.edu>
* Naveen Kr. Sharma <nksharma@cs.washington.edu>
* Naveen Kr. Sharma <naveenks@cs.washington.edu>
* Dan R. K. Ports <drkp@cs.washington.edu>
*
* Permission is hereby granted, free of charge, to any person
......
......@@ -6,7 +6,7 @@
* replicas and determining whether a quorum of responses has been met
*
* Copyright 2013-2015 Irene Zhang <iyzhang@cs.washington.edu>
* Naveen Kr. Sharma <nksharma@cs.washington.edu>
* Naveen Kr. Sharma <naveenks@cs.washington.edu>
* Dan R. K. Ports <drkp@cs.washington.edu>
*
* Permission is hereby granted, free of charge, to any person
......@@ -35,7 +35,7 @@
#define _COMMON_QUORUMSET_H_
namespace replication {
template <class IDTYPE, class MSGTYPE>
class QuorumSet
{
......@@ -43,9 +43,9 @@ public:
QuorumSet(int numRequired)
: numRequired(numRequired)
{
}
void
Clear()
{
......@@ -80,7 +80,20 @@ public:
return &vsmessages;
} else {
return NULL;
}
}
}
const std::map<int, MSGTYPE> *
CheckForQuorum()
{
for (const auto &p : messages) {
const IDTYPE &vs = p.first;
const std::map<int, MSGTYPE> *quorum = CheckForQuorum(vs);
if (quorum != nullptr) {
return quorum;
}
}
return nullptr;
}
const std::map<int, MSGTYPE> *
......@@ -98,7 +111,7 @@ public:
}
vsmessages[replicaIdx] = msg;
return CheckForQuorum(vs);
}
......@@ -107,7 +120,7 @@ public:
{
AddAndCheckForQuorum(vs, replicaIdx, msg);
}
public:
int numRequired;
private:
......
......@@ -6,7 +6,7 @@
* replication protocol
*
* Copyright 2013-2015 Irene Zhang <iyzhang@cs.washington.edu>
* Naveen Kr. Sharma <nksharma@cs.washington.edu>
* Naveen Kr. Sharma <naveenks@cs.washington.edu>
* Dan R. K. Ports <drkp@cs.washington.edu>
*
* Permission is hereby granted, free of charge, to any person
......@@ -70,27 +70,6 @@ Replica::ReplicaUpcall(opnum_t opnum, const string &op, string &res)
Debug("Upcall result: %s", res.c_str());
}
void
Replica::Rollback(opnum_t current, opnum_t to, Log &log)
{
Debug("Making rollback-upcall from " FMT_OPNUM " to " FMT_OPNUM,
current, to);
std::map<opnum_t, string> reqs;
for (opnum_t x = current; x > to; x--) {
reqs.insert(std::pair<opnum_t, string>(x,
log.Find(x)->request.op()));
}
app->RollbackUpcall(current, to, reqs);
}
void
Replica::Commit(opnum_t op)
{
app->CommitUpcall(op);
}
void
Replica::UnloggedUpcall(const string &op, string &res)
{
......
......@@ -5,7 +5,7 @@
* interface to different vr protocols
*
* Copyright 2013-2015 Irene Zhang <iyzhang@cs.washington.edu>
* Naveen Kr. Sharma <nksharma@cs.washington.edu>
* Naveen Kr. Sharma <naveenks@cs.washington.edu>
* Dan R. K. Ports <drkp@cs.washington.edu>
*
* Permission is hereby granted, free of charge, to any person
......@@ -59,10 +59,6 @@ public:
virtual void LeaderUpcall(opnum_t opnum, const string &str1, bool &replicate, string &str2) { replicate = true; str2 = str1; };
// Invoke callback on all replicas
virtual void ReplicaUpcall(opnum_t opnum, const string &str1, string &str2) { };
// Rollback callback on failed speculative operations
virtual void RollbackUpcall(opnum_t current, opnum_t to, const std::map<opnum_t, string> &opMap) { };
// Commit callback to commit speculative operations
virtual void CommitUpcall(opnum_t) { };
// Invoke call back for unreplicated operations run on only one replica
virtual void UnloggedUpcall(const string &str1, string &str2) { };
};
......@@ -79,8 +75,6 @@ protected:
template<class MSG> void Execute(opnum_t opnum,
const Request & msg,
MSG &reply);
void Rollback(opnum_t current, opnum_t to, Log &log);
void Commit(opnum_t op);
void UnloggedUpcall(const string &op, string &res);
template<class MSG> void ExecuteUnlogged(const UnloggedRequest & msg,
MSG &reply);
......
syntax = "proto2";
package replication;
message Request {
......
d := $(dir $(lastword $(MAKEFILE_LIST)))
SRCS += $(addprefix $(d), \
record.cc client.cc replica.cc)
PROTOS += $(addprefix $(d), \
ir-proto.proto)
OBJS-ir-client := $(o)ir-proto.o $(o)client.o \
$(OBJS-client) $(LIB-message) \
$(LIB-configuration)
OBJS-ir-replica := $(o)record.o $(o)replica.o $(o)ir-proto.o \
$(OBJS-replica) $(LIB-message) \
$(LIB-configuration) $(LIB-persistent_register)
include $(d)tests/Rules.mk
// -*- mode: c++; c-file-style: "k&r"; c-basic-offset: 4 -*-
/***********************************************************************
*
* ir/client.cc:
* Inconsistent replication client
*
* Copyright 2013-2015 Dan R. K. Ports <drkp@cs.washington.edu>
* Irene Zhang Ports <iyzhang@cs.washington.edu>
*
* Permission is hereby granted, free of charge, to any person
* obtaining a copy of this software and associated documentation
* files (the "Software"), to deal in the Software without
* restriction, including without limitation the rights to use, copy,
* modify, merge, publish, distribute, sublicense, and/or sell copies
* of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be
* included in all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
* MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
* NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
* BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
* ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
* CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*
**********************************************************************/
#include "replication/common/client.h"
#include "replication/common/request.pb.h"
#include "lib/assert.h"
#include "lib/message.h"
#include "lib/transport.h"
#include "replication/ir/client.h"
#include "replication/ir/ir-proto.pb.h"
#include <math.h>
namespace replication {
namespace ir {
using namespace std;
IRClient::IRClient(const transport::Configuration &config,
Transport *transport,
uint64_t clientid)
: Client(config, transport, clientid),
lastReqId(0)
{
}
IRClient::~IRClient()
{
for (auto kv : pendingReqs) {
delete kv.second;
}
}
void
IRClient::Invoke(const string &request,
continuation_t continuation,
error_continuation_t error_continuation)
{
InvokeInconsistent(request, continuation, error_continuation);
}
void
IRClient::InvokeInconsistent(const string &request,
continuation_t continuation,
error_continuation_t error_continuation)
{
// TODO: Use error_continuation.
(void) error_continuation;
// Bump the request ID
uint64_t reqId = ++lastReqId;
// Create new timer
auto timer = std::unique_ptr<Timeout>(new Timeout(
transport, 500, [this, reqId]() { ResendInconsistent(reqId); }));
PendingInconsistentRequest *req =
new PendingInconsistentRequest(request,
reqId,
continuation,
std::move(timer),
config.QuorumSize());
pendingReqs[reqId] = req;
SendInconsistent(req);
}
void
IRClient::SendInconsistent(const PendingInconsistentRequest *req)
{
proto::ProposeInconsistentMessage reqMsg;
reqMsg.mutable_req()->set_op(req->request);
reqMsg.mutable_req()->set_clientid(clientid);
reqMsg.mutable_req()->set_clientreqid(req->clientReqId);
if (transport->SendMessageToAll(this, reqMsg)) {
req->timer->Reset();
} else {
Warning("Could not send inconsistent request to replicas");
pendingReqs.erase(req->clientReqId);
delete req;
}
}
void
IRClient::InvokeConsensus(const string &request,
decide_t decide,
continuation_t continuation,
error_continuation_t error_continuation)
{
uint64_t reqId = ++lastReqId;
auto timer = std::unique_ptr<Timeout>(new Timeout(
transport, 500, [this, reqId]() { ResendConsensus(reqId); }));
auto transition_to_slow_path_timer =
std::unique_ptr<Timeout>(new Timeout(transport, 500, [this, reqId]() {
TransitionToConsensusSlowPath(reqId);
}));
PendingConsensusRequest *req =
new PendingConsensusRequest(request,
reqId,
continuation,
std::move(timer),
std::move(transition_to_slow_path_timer),
config.QuorumSize(),
config.FastQuorumSize(),
decide,
error_continuation);
proto::ProposeConsensusMessage reqMsg;
reqMsg.mutable_req()->set_op(request);
reqMsg.mutable_req()->set_clientid(clientid);
reqMsg.mutable_req()->set_clientreqid(reqId);
pendingReqs[reqId] = req;
req->transition_to_slow_path_timer->Start();
SendConsensus(req);
}
void
IRClient::SendConsensus(const PendingConsensusRequest *req)
{
proto::ProposeConsensusMessage reqMsg;
reqMsg.mutable_req()->set_op(req->request);
reqMsg.mutable_req()->set_clientid(clientid);
reqMsg.mutable_req()->set_clientreqid(req->clientReqId);
if (transport->SendMessageToAll(this, reqMsg)) {
req->timer->Reset();
} else {
Warning("Could not send consensus request to replicas");
pendingReqs.erase(req->clientReqId);
delete req;
}
}
void
IRClient::InvokeUnlogged(int replicaIdx,
const string &request,
continuation_t continuation,
error_continuation_t error_continuation,
uint32_t timeout)
{
uint64_t reqId = ++lastReqId;
auto timer = std::unique_ptr<Timeout>(new Timeout(
transport, timeout,
[this, reqId]() { UnloggedRequestTimeoutCallback(reqId); }));
PendingUnloggedRequest *req =
new PendingUnloggedRequest(request,
reqId,
continuation,
error_continuation,
std::move(timer));
proto::UnloggedRequestMessage reqMsg;
reqMsg.mutable_req()->set_op(request);
reqMsg.mutable_req()->set_clientid(clientid);
reqMsg.mutable_req()->set_clientreqid(reqId);
if (transport->SendMessageToReplica(this, replicaIdx, reqMsg)) {
req->timer->Start();
pendingReqs[reqId] = req;
} else {
Warning("Could not send unlogged request to replica");
delete req;
}
}
void
IRClient::ResendInconsistent(const uint64_t reqId)
{
Warning("Client timeout; resending inconsistent request: %lu", reqId);
SendInconsistent((PendingInconsistentRequest *)pendingReqs[reqId]);
}
void
IRClient::ResendConsensus(const uint64_t reqId)
{
Warning("Client timeout; resending consensus request: %lu", reqId);
SendConsensus((PendingConsensusRequest *)pendingReqs[reqId]);
}
void
IRClient::TransitionToConsensusSlowPath(const uint64_t reqId)
{
Debug("Client timeout; taking consensus slow path: reqId=%lu", reqId);
PendingConsensusRequest *req =
dynamic_cast<PendingConsensusRequest *>(pendingReqs[reqId]);
ASSERT(req != NULL);
req->on_slow_path = true;
// We've already transitioned into the slow path, so don't transition into
// the slow-path again.
ASSERT(req->transition_to_slow_path_timer);
req->transition_to_slow_path_timer.reset();
// It's possible that we already have a quorum of responses (but not a
// super quorum).
const std::map<int, proto::ReplyConsensusMessage> *quorum =
req->consensusReplyQuorum.CheckForQuorum();
if (quorum != nullptr) {
HandleSlowPathConsensus(reqId, *quorum, false, req);
}
}
void IRClient::HandleSlowPathConsensus(
const uint64_t reqid,
const std::map<int, proto::ReplyConsensusMessage> &msgs,
const bool finalized_result_found,
PendingConsensusRequest *req)
{
ASSERT(finalized_result_found || msgs.size() >= req->quorumSize);
Debug("Handling slow path for request %lu.", reqid);
// If a finalized result wasn't found, call decide to determine the
// finalized result.
if (!finalized_result_found) {
uint64_t view = 0;
std::map<string, std::size_t> results;
for (const auto &p : msgs) {
const proto::ReplyConsensusMessage &msg = p.second;
results[msg.result()] += 1;
// All messages should have the same view.
if (view == 0) {
view = msg.view();
}
ASSERT(msg.view() == view);
}
// Upcall into the application, and put the result in the request
// to store for later retries.
ASSERT(req->decide != NULL);
req->decideResult = req->decide(results);
req->reply_consensus_view = view;
}
// Set up a new timer for the finalize phase.
req->timer = std::unique_ptr<Timeout>(
new Timeout(transport, 500, [this, reqid]() { //
ResendConfirmation(reqid, true);
}));
// Send finalize message.
proto::FinalizeConsensusMessage response;
response.mutable_opid()->set_clientid(clientid);
response.mutable_opid()->set_clientreqid(reqid);
response.set_result(req->decideResult);
if (transport->SendMessageToAll(this, response)) {
Debug("FinalizeConsensusMessages sent for request %lu.", reqid);
req->sent_confirms = true;
req->timer->Start();
} else {
Warning("Could not send finalize message to replicas");
pendingReqs.erase(reqid);
delete req;
}
}
void IRClient::HandleFastPathConsensus(
const uint64_t reqid,
const std::map<int, proto::ReplyConsensusMessage> &msgs,
PendingConsensusRequest *req)
{
ASSERT(msgs.size() >= req->superQuorumSize);
Debug("Handling fast path for request %lu.", reqid);
// We've received a super quorum of responses. Now, we have to check to see
// if we have a super quorum of _matching_ responses.
map<string, std::size_t> results;
for (const auto &m : msgs) {
const std::string &result = m.second.result();
results[result]++;
}
for (const auto &result : results) {
if (result.second < req->superQuorumSize) {
continue;
}
// A super quorum of matching requests was found!
Debug("A super quorum of matching requests was found for request %lu.",
reqid);
req->decideResult = result.first;
// Set up a new timeout for the finalize phase.
req->timer = std::unique_ptr<Timeout>(new Timeout(
transport, 500,
[this, reqid]() { ResendConfirmation(reqid, true); }));
// Asynchronously send the finalize message.
proto::FinalizeConsensusMessage response;
response.mutable_opid()->set_clientid(clientid);
response.mutable_opid()->set_clientreqid(reqid);
response.set_result(result.first);
if (transport->SendMessageToAll(this, response)) {
Debug("FinalizeConsensusMessages sent for request %lu.", reqid);
req->sent_confirms = true;
req->timer->Start();
} else {
Warning("Could not send finalize message to replicas");
pendingReqs.erase(reqid);
delete req;
}
// Return to the client.
if (!req->continuationInvoked) {
req->continuation(req->request, req->decideResult);
req->continuationInvoked = true;
}
return;
}
// There was not a super quorum of matching results, so we transition into
// the slow path.
Debug("A super quorum of matching requests was NOT found for request %lu.",
reqid);
req->on_slow_path = true;
if (req->transition_to_slow_path_timer) {
req->transition_to_slow_path_timer.reset();
}
HandleSlowPathConsensus(reqid, msgs, false, req);
}
void
IRClient::ResendConfirmation(const uint64_t reqId, bool isConsensus)
{
if (pendingReqs.find(reqId) == pendingReqs.end()) {
Debug("Received resend request when no request was pending");
return;
}
if (isConsensus) {
PendingConsensusRequest *req = static_cast<PendingConsensusRequest *>(pendingReqs[reqId]);
ASSERT(req != NULL);
proto::FinalizeConsensusMessage response;
response.mutable_opid()->set_clientid(clientid);
response.mutable_opid()->set_clientreqid(req->clientReqId);
response.set_result(req->decideResult);
if(transport->SendMessageToAll(this, response)) {
req->timer->Reset();
} else {
Warning("Could not send finalize message to replicas");
// give up and clean up
pendingReqs.erase(reqId);
delete req;
}
} else {
PendingInconsistentRequest *req = static_cast<PendingInconsistentRequest *>(pendingReqs[reqId]);
ASSERT(req != NULL);
proto::FinalizeInconsistentMessage response;
response.mutable_opid()->set_clientid(clientid);
response.mutable_opid()->set_clientreqid(req->clientReqId);
if (transport->SendMessageToAll(this, response)) {
req->timer->Reset();
} else {
Warning("Could not send finalize message to replicas");
pendingReqs.erase(reqId);
delete req;
}
}
}
void
IRClient::ReceiveMessage(const TransportAddress &remote,
const string &type,
const string &data)
{
proto::ReplyInconsistentMessage replyInconsistent;
proto::ReplyConsensusMessage replyConsensus;
proto::ConfirmMessage confirm;
proto::UnloggedReplyMessage unloggedReply;
if (type == replyInconsistent.GetTypeName()) {
replyInconsistent.ParseFromString(data);
HandleInconsistentReply(remote, replyInconsistent);
} else if (type == replyConsensus.GetTypeName()) {
replyConsensus.ParseFromString(data);
HandleConsensusReply(remote, replyConsensus);
} else if (type == confirm.GetTypeName()) {
confirm.ParseFromString(data);
HandleConfirm(remote, confirm);
} else if (type == unloggedReply.GetTypeName()) {
unloggedReply.ParseFromString(data);
HandleUnloggedReply(remote, unloggedReply);
} else {
Client::ReceiveMessage(remote, type, data);
}
}
void
IRClient::HandleInconsistentReply(const TransportAddress &remote,
const proto::ReplyInconsistentMessage &msg)
{
uint64_t reqId = msg.opid().clientreqid();
auto it = pendingReqs.find(reqId);
if (it == pendingReqs.end()) {
Debug("Received reply when no request was pending");
return;
}
PendingInconsistentRequest *req =
dynamic_cast<PendingInconsistentRequest *>(it->second);
// Make sure the dynamic cast worked
ASSERT(req != NULL);
Debug("Client received reply: %lu %i", reqId,
req->inconsistentReplyQuorum.NumRequired());
// Record replies
viewstamp_t vs = { msg.view(), reqId };
if (req->inconsistentReplyQuorum.AddAndCheckForQuorum(vs, msg.replicaidx(), msg)) {
// TODO: Some of the ReplyInconsistentMessages might already be
// finalized. If this is the case, then we don't have to send finalize
// messages to them. It's not incorrect to send them anyway (which this
// code does) but it's less efficient.
// If all quorum received, then send finalize and return to client
// Return to client
if (!req->continuationInvoked) {
req->timer = std::unique_ptr<Timeout>(new Timeout(
transport, 500,
[this, reqId]() { ResendConfirmation(reqId, false); }));
// asynchronously send the finalize message
proto::FinalizeInconsistentMessage response;
*(response.mutable_opid()) = msg.opid();
if (transport->SendMessageToAll(this, response)) {
req->timer->Start();
} else {
Warning("Could not send finalize message to replicas");
}
req->continuation(req->request, "");
req->continuationInvoked = true;
}
}
}
void
IRClient::HandleConsensusReply(const TransportAddress &remote,
const proto::ReplyConsensusMessage &msg)
{
uint64_t reqId = msg.opid().clientreqid();
Debug(
"Client received ReplyConsensusMessage from replica %i in view %lu for "
"request %lu.",
msg.replicaidx(), msg.view(), reqId);
auto it = pendingReqs.find(reqId);
if (it == pendingReqs.end()) {
Debug(
"Client was not expecting a ReplyConsensusMessage for request %lu, "
"so it is ignoring the request.",
reqId);
return;
}
PendingConsensusRequest *req =
dynamic_cast<PendingConsensusRequest *>(it->second);
ASSERT(req != nullptr);
if (req->sent_confirms) {
Debug(
"Client has already received a quorum or super quorum of "
"HandleConsensusReply for request %lu and has already sent out "
"ConfirmMessages.",
reqId);
return;
}
req->consensusReplyQuorum.Add(msg.view(), msg.replicaidx(), msg);
const std::map<int, proto::ReplyConsensusMessage> &msgs =
req->consensusReplyQuorum.GetMessages(msg.view());
if (msg.finalized()) {
Debug("The HandleConsensusReply for request %lu was finalized.", reqId);
// If we receive a finalized message, then we immediately transition
// into the slow path.
req->on_slow_path = true;
if (req->transition_to_slow_path_timer) {
req->transition_to_slow_path_timer.reset();
}
req->decideResult = msg.result();
req->reply_consensus_view = msg.view();
HandleSlowPathConsensus(reqId, msgs, true, req);
} else if (req->on_slow_path && msgs.size() >= req->quorumSize) {
HandleSlowPathConsensus(reqId, msgs, false, req);
} else if (!req->on_slow_path && msgs.size() >= req->superQuorumSize) {
HandleFastPathConsensus(reqId, msgs, req);
}
}
void
IRClient::HandleConfirm(const TransportAddress &remote,
const proto::ConfirmMessage &msg)
{
uint64_t reqId = msg.opid().clientreqid();
auto it = pendingReqs.find(reqId);
if (it == pendingReqs.end()) {
Debug(
"We received a ConfirmMessage for operation %lu, but we weren't "
"waiting for any ConfirmMessages. We are ignoring the message.",
reqId);
return;
}
PendingRequest *req = it->second;
viewstamp_t vs = { msg.view(), reqId };
if (req->confirmQuorum.AddAndCheckForQuorum(vs, msg.replicaidx(), msg)) {
req->timer->Stop();
pendingReqs.erase(it);
if (!req->continuationInvoked) {
// Return to the client. ConfirmMessages are sent by replicas in
// response to FinalizeInconsistentMessages and
// FinalizeConsensusMessage, but inconsistent operations are
// invoked before FinalizeInconsistentMessages are ever sent. Thus,
// req->continuationInvoked can only be false if req is a
// PendingConsensusRequest, so it's safe to cast it here.
PendingConsensusRequest *r2 =
dynamic_cast<PendingConsensusRequest *>(req);
ASSERT(r2 != nullptr);
if (vs.view == r2->reply_consensus_view) {
r2->continuation(r2->request, r2->decideResult);
} else {
Debug(
"We received a majority of ConfirmMessages for request %lu "
"with view %lu, but the view from ReplyConsensusMessages "
"was %lu.",
reqId, vs.view, r2->reply_consensus_view);
if (r2->error_continuation) {
r2->error_continuation(
r2->request, ErrorCode::MISMATCHED_CONSENSUS_VIEWS);
}
}
}
delete req;
}
}
void
IRClient::HandleUnloggedReply(const TransportAddress &remote,
const proto::UnloggedReplyMessage &msg)
{
uint64_t reqId = msg.clientreqid();
auto it = pendingReqs.find(reqId);
if (it == pendingReqs.end()) {
Debug("Received reply when no request was pending");
return;
}
PendingRequest *req = it->second;
// delete timer event
req->timer->Stop();
// remove from pending list
pendingReqs.erase(it);
// invoke application callback
req->continuation(req->request, msg.reply());
delete req;
}
void
IRClient::UnloggedRequestTimeoutCallback(const uint64_t reqId)
{
auto it = pendingReqs.find(reqId);
if (it == pendingReqs.end()) {
Debug("Received timeout when no request was pending");
return;
}
PendingUnloggedRequest *req = static_cast<PendingUnloggedRequest *>(it->second);
ASSERT(req != NULL);
Warning("Unlogged request timed out");
// delete timer event
req->timer->Stop();
// remove from pending list
pendingReqs.erase(it);
// invoke application callback
if (req->error_continuation) {
req->error_continuation(req->request, ErrorCode::TIMEOUT);
}
delete req;
}
} // namespace ir
} // namespace replication
// -*- mode: c++; c-file-style: "k&r"; c-basic-offset: 4 -*-
/***********************************************************************
*
* replication/ir/client.h:
* Inconsistent replication client
*
* Copyright 2013-2015 Dan R. K. Ports <drkp@cs.washington.edu>
* Irene Zhang Ports <iyzhang@cs.washington.edu>
*
* Permission is hereby granted, free of charge, to any person
* obtaining a copy of this software and associated documentation
* files (the "Software"), to deal in the Software without
* restriction, including without limitation the rights to use, copy,
* modify, merge, publish, distribute, sublicense, and/or sell copies
* of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be
* included in all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
* MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
* NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
* BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
* ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
* CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*
**********************************************************************/
#ifndef _IR_CLIENT_H_
#define _IR_CLIENT_H_
#include "replication/common/client.h"
#include "replication/common/quorumset.h"
#include "lib/configuration.h"
#include "replication/ir/ir-proto.pb.h"
#include <functional>
#include <map>
#include <memory>
#include <set>
#include <unordered_map>
namespace replication {
namespace ir {
class IRClient : public Client
{
public:
using result_set_t = std::map<string, std::size_t>;
using decide_t = std::function<string(const result_set_t &)>;
IRClient(const transport::Configuration &config,
Transport *transport,
uint64_t clientid = 0);
virtual ~IRClient();
virtual void Invoke(
const string &request,
continuation_t continuation,
error_continuation_t error_continuation = nullptr) override;
virtual void InvokeUnlogged(
int replicaIdx,
const string &request,
continuation_t continuation,
error_continuation_t error_continuation = nullptr,
uint32_t timeout = DEFAULT_UNLOGGED_OP_TIMEOUT) override;
virtual void ReceiveMessage(
const TransportAddress &remote,
const string &type,
const string &data) override;
virtual void InvokeInconsistent(
const string &request,
continuation_t continuation,
error_continuation_t error_continuation = nullptr);
virtual void InvokeConsensus(
const string &request,
decide_t decide,
continuation_t continuation,
error_continuation_t error_continuation = nullptr);
protected:
struct PendingRequest {
string request;
uint64_t clientReqId;
continuation_t continuation;
bool continuationInvoked = false;
std::unique_ptr<Timeout> timer;
QuorumSet<viewstamp_t, proto::ConfirmMessage> confirmQuorum;
inline PendingRequest(string request, uint64_t clientReqId,
continuation_t continuation,
std::unique_ptr<Timeout> timer, int quorumSize)
: request(request),
clientReqId(clientReqId),
continuation(continuation),
timer(std::move(timer)),
confirmQuorum(quorumSize){};
virtual ~PendingRequest(){};
};
struct PendingUnloggedRequest : public PendingRequest {
error_continuation_t error_continuation;
inline PendingUnloggedRequest(
string request, uint64_t clientReqId, continuation_t continuation,
error_continuation_t error_continuation,
std::unique_ptr<Timeout> timer)
: PendingRequest(request, clientReqId, continuation,
std::move(timer), 1),
error_continuation(error_continuation){};
};
struct PendingInconsistentRequest : public PendingRequest {
QuorumSet<viewstamp_t, proto::ReplyInconsistentMessage>
inconsistentReplyQuorum;
inline PendingInconsistentRequest(string request, uint64_t clientReqId,
continuation_t continuation,
std::unique_ptr<Timeout> timer,
int quorumSize)
: PendingRequest(request, clientReqId, continuation,
std::move(timer), quorumSize),
inconsistentReplyQuorum(quorumSize){};
};
struct PendingConsensusRequest : public PendingRequest {
QuorumSet<opnum_t, proto::ReplyConsensusMessage> consensusReplyQuorum;
decide_t decide;
string decideResult;
const std::size_t quorumSize;
const std::size_t superQuorumSize;
bool on_slow_path;
error_continuation_t error_continuation;
// The timer to give up on the fast path and transition to the slow
// path. After this timer is run for the first time, it is nulled.
std::unique_ptr<Timeout> transition_to_slow_path_timer;
// The view for which a majority result (or finalized result) was
// found. The view of a majority of confirms must match this view.
uint64_t reply_consensus_view = 0;
// True when a consensus request has already received a quorum or super
// quorum of replies and has already transitioned into the confirm
// phase.
bool sent_confirms = false;
inline PendingConsensusRequest(
string request, uint64_t clientReqId, continuation_t continuation,
std::unique_ptr<Timeout> timer,
std::unique_ptr<Timeout> transition_to_slow_path_timer,
int quorumSize, int superQuorum, decide_t decide,
error_continuation_t error_continuation)
: PendingRequest(request, clientReqId, continuation,
std::move(timer), quorumSize),
consensusReplyQuorum(quorumSize),
decide(decide),
quorumSize(quorumSize),
superQuorumSize(superQuorum),
on_slow_path(false),
error_continuation(error_continuation),
transition_to_slow_path_timer(
std::move(transition_to_slow_path_timer)){};
};
uint64_t lastReqId;
std::unordered_map<uint64_t, PendingRequest *> pendingReqs;
void SendInconsistent(const PendingInconsistentRequest *req);
void ResendInconsistent(const uint64_t reqId);
void SendConsensus(const PendingConsensusRequest *req);
void ResendConsensus(const uint64_t reqId);
// `TransitionToConsensusSlowPath` is called after a timeout to end the
// possibility of taking the fast path and transition into taking the slow
// path.
void TransitionToConsensusSlowPath(const uint64_t reqId);
// HandleSlowPathConsensus is called in one of two scenarios:
//
// 1. A finalized ReplyConsensusMessage was received. In this case, we
// immediately enter the slow path and use the finalized result. If
// finalized is true, req has already been populated with the
// finalized result.
// 2. We're in the slow path and receive a majority of
// ReplyConsensusMessages in the same view. In this case, we call
// decide to determine the final result.
//
// In either case, HandleSlowPathConsensus intitiates the finalize phase of
// a consensus request.
void HandleSlowPathConsensus(
const uint64_t reqid,
const std::map<int, proto::ReplyConsensusMessage> &msgs,
const bool finalized_result_found,
PendingConsensusRequest *req);
// HandleFastPathConsensus is called when we're on the fast path and
// receive a super quorum of responses from the same view.
// HandleFastPathConsensus will check to see if there is a superquorum of
// matching responses. If there is, it will return to the user and
// asynchronously intitiate the finalize phase of a consensus request.
// Otherwise, it transitions into the slow path which will also initiate
// the finalize phase of a consensus request, but not yet return to the
// user.
void HandleFastPathConsensus(
const uint64_t reqid,
const std::map<int, proto::ReplyConsensusMessage> &msgs,
PendingConsensusRequest *req);
void ResendConfirmation(const uint64_t reqId, bool isConsensus);
void HandleInconsistentReply(const TransportAddress &remote,
const proto::ReplyInconsistentMessage &msg);
void HandleConsensusReply(const TransportAddress &remote,
const proto::ReplyConsensusMessage &msg);
void HandleConfirm(const TransportAddress &remote,
const proto::ConfirmMessage &msg);
void HandleUnloggedReply(const TransportAddress &remote,
const proto::UnloggedReplyMessage &msg);
void UnloggedRequestTimeoutCallback(const uint64_t reqId);
};
} // namespace replication::ir
} // namespace replication
#endif /* _IR_CLIENT_H_ */
syntax = "proto2";
import "replication/common/request.proto";
package replication.ir.proto;
message OpID {
required uint64 clientid = 1;
required uint64 clientreqid = 2;
}
// For the view change and recovery protocol, a replica stores two things on
// disk: (1) its current view and (2) the latest view during which it was
// NORMAL. Replicas pack this information into this proto buf and serialize it
// to disk.
message PersistedViewInfo {
required uint64 view = 1;
required uint64 latest_normal_view = 2;
}
enum RecordEntryState {
RECORD_STATE_TENTATIVE = 0;
RECORD_STATE_FINALIZED = 1;
}
enum RecordEntryType {
RECORD_TYPE_INCONSISTENT = 0;
RECORD_TYPE_CONSENSUS = 1;
}
message RecordEntryProto {
required uint64 view = 1;
required OpID opid = 2;
required RecordEntryState state = 3;
required RecordEntryType type = 4;
required bytes op = 5;
required bytes result = 6;
}
// TODO: Currently, replicas send entire records to one another. Figure out if
// there is a more efficient way to exchange records.
message RecordProto {
repeated RecordEntryProto entry = 1;
}
message ProposeInconsistentMessage {
required replication.Request req = 1;
}
message ReplyInconsistentMessage {
required uint64 view = 1;
required uint32 replicaIdx = 2;
required OpID opid = 3;
required bool finalized = 4;
}
message FinalizeInconsistentMessage {
required OpID opid = 1;
}
message ConfirmMessage {
required uint64 view = 1;
required uint32 replicaIdx = 2;
required OpID opid = 3;
}
message ProposeConsensusMessage {
required replication.Request req = 1;
}
message ReplyConsensusMessage {
required uint64 view = 1;
required uint32 replicaIdx = 2;
required OpID opid = 3;
required bytes result = 4;
required bool finalized = 5;
}
message FinalizeConsensusMessage {
required OpID opid = 1;
required bytes result = 2;
}
message DoViewChangeMessage {
required uint32 replicaIdx = 1;
// record is optional because a replica only sends its record to the
// leader of the new view.
optional RecordProto record = 2;
required uint64 new_view = 3;
required uint64 latest_normal_view = 4;
}
message StartViewMessage {
required RecordProto record = 1;
required uint64 new_view = 2;
}
message UnloggedRequestMessage {
required replication.UnloggedRequest req = 1;
}
message UnloggedReplyMessage {
required bytes reply = 1;
required uint64 clientreqid = 2;
}
// -*- mode: c++; c-file-style: "k&r"; c-basic-offset: 4 -*-
/***********************************************************************
*
* log.h:
* a replica's log of pending and committed operations
*
* Copyright 2013 Dan R. K. Ports <drkp@cs.washington.edu>
*
* Permission is hereby granted, free of charge, to any person
* obtaining a copy of this software and associated documentation
* files (the "Software"), to deal in the Software without
* restriction, including without limitation the rights to use, copy,
* modify, merge, publish, distribute, sublicense, and/or sell copies
* of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be
* included in all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
* MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
* NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
* BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
* ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
* CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*
**********************************************************************/
#include "replication/ir/record.h"
#include <utility>
#include "lib/assert.h"
namespace replication {
namespace ir {
Record::Record(const proto::RecordProto &record_proto) {
for (const proto::RecordEntryProto &entry_proto : record_proto.entry()) {
const view_t view = entry_proto.view();
const opid_t opid = std::make_pair(entry_proto.opid().clientid(),
entry_proto.opid().clientreqid());
Request request;
request.set_op(entry_proto.op());
request.set_clientid(entry_proto.opid().clientid());
request.set_clientreqid(entry_proto.opid().clientreqid());
proto::RecordEntryState state = entry_proto.state();
proto::RecordEntryType type = entry_proto.type();
const std::string& result = entry_proto.result();
Add(view, opid, request, state, type, result);
}
}
RecordEntry &
Record::Add(const RecordEntry& entry) {
// Make sure this isn't a duplicate
ASSERT(entries.count(entry.opid) == 0);
entries[entry.opid] = entry;
return entries[entry.opid];
}
RecordEntry &
Record::Add(view_t view, opid_t opid, const Request &request,
proto::RecordEntryState state, proto::RecordEntryType type)
{
return Add(RecordEntry(view, opid, state, type, request, ""));
}
RecordEntry &
Record::Add(view_t view, opid_t opid, const Request &request,
proto::RecordEntryState state, proto::RecordEntryType type,
const string &result)
{
RecordEntry &entry = Add(view, opid, request, state, type);
entry.result = result;
return entries[opid];
}
// This really ought to be const
RecordEntry *
Record::Find(opid_t opid)
{
if (entries.empty() || entries.count(opid) == 0) {
return NULL;
}
RecordEntry *entry = &entries[opid];
ASSERT(entry->opid == opid);
return entry;
}
bool
Record::SetStatus(opid_t op, proto::RecordEntryState state)
{
RecordEntry *entry = Find(op);
if (entry == NULL) {
return false;
}
entry->state = state;
return true;
}
bool
Record::SetResult(opid_t op, const string &result)
{
RecordEntry *entry = Find(op);
if (entry == NULL) {
return false;
}
entry->result = result;
return true;
}
bool
Record::SetRequest(opid_t op, const Request &req)
{
RecordEntry *entry = Find(op);
if (entry == NULL) {
return false;
}
entry->request = req;
return true;
}
void
Record::Remove(opid_t opid)
{
entries.erase(opid);
}
bool
Record::Empty() const
{
return entries.empty();
}
void
Record::ToProto(proto::RecordProto *proto) const
{
for (const std::pair<const opid_t, RecordEntry> &p : entries) {
const RecordEntry &entry = p.second;
proto::RecordEntryProto *entry_proto = proto->add_entry();
entry_proto->set_view(entry.view);
entry_proto->mutable_opid()->set_clientid(entry.opid.first);
entry_proto->mutable_opid()->set_clientreqid(entry.opid.second);
entry_proto->set_state(entry.state);
entry_proto->set_type(entry.type);
entry_proto->set_op(entry.request.op());
entry_proto->set_result(entry.result);
}
}
const std::map<opid_t, RecordEntry> &Record::Entries() const {
return entries;
}
} // namespace ir
} // namespace replication