Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • syslab/tapir
  • aaasz/tapir
  • ashmrtnz/tapir
3 results
Show changes
Showing
with 1773 additions and 520 deletions
......@@ -30,66 +30,9 @@
#include "lockserver/server.h"
int
main(int argc, char **argv)
{
int index = -1;
const char *configPath = NULL;
// Parse arguments
int opt;
char *strtolPtr;
while ((opt = getopt(argc, argv, "c:i:")) != -1) {
switch (opt) {
case 'c':
configPath = optarg;
break;
case 'i':
index = strtol(optarg, &strtolPtr, 10);
if ((*optarg == '\0') || (*strtolPtr != '\0') || (index < 0)) {
fprintf(stderr, "option -i requires a numeric arg\n");
}
break;
default:
fprintf(stderr, "Unknown argument %s\n", argv[optind]);
}
}
if (!configPath) {
fprintf(stderr, "option -c is required\n");
return EXIT_FAILURE;
}
if (index == -1) {
fprintf(stderr, "option -i is required\n");
return EXIT_FAILURE;
}
// Load configuration
std::ifstream configStream(configPath);
if (configStream.fail()) {
fprintf(stderr, "unable to read configuration file: %s\n", configPath);
return EXIT_FAILURE;
}
transport::Configuration config(configStream);
if (index >= config.n) {
fprintf(stderr, "replica index %d is out of bounds; "
"only %d replicas defined\n", index, config.n);
return EXIT_FAILURE;
}
UDPTransport transport(0.0, 0.0, 0);
lockserver::LockServer server;
replication::ir::IRReplica replica(config, index, &transport, &server);
transport.Run();
return EXIT_SUCCESS;
}
#include <algorithm>
#include <iterator>
#include <unordered_set>
namespace lockserver {
......@@ -173,4 +116,131 @@ LockServer::UnloggedUpcall(const string &str1, string &str2)
Debug("Unlogged: %s\n", str1.c_str());
}
void
LockServer::Sync(const std::map<opid_t, RecordEntry>& record) {
locks.clear();
struct KeyLockInfo {
std::unordered_set<uint64_t> locked;
std::unordered_set<uint64_t> unlocked;
};
std::unordered_map<std::string, KeyLockInfo> key_lock_info;
for (const std::pair<const opid_t, RecordEntry> &p : record) {
const opid_t &opid = p.first;
const RecordEntry &entry = p.second;
Request request;
request.ParseFromString(entry.request.op());
Reply reply;
reply.ParseFromString(entry.result);
KeyLockInfo &info = key_lock_info[request.key()];
Debug("Sync opid=(%lu, %lu), clientid=%lu, key=%s, type=%d, status=%d.",
opid.first, opid.second, request.clientid(),
request.key().c_str(), request.type(), reply.status());
if (request.type() && reply.status() == 0) {
// Lock.
info.locked.insert(request.clientid());
} else if (!request.type() && reply.status() == 0) {
// Unlock.
info.unlocked.insert(request.clientid());
}
}
for (const std::pair<const std::string, KeyLockInfo> &p : key_lock_info) {
const std::string &key = p.first;
const KeyLockInfo &info = p.second;
std::unordered_set<uint64_t> diff;
std::set_difference(std::begin(info.locked), std::end(info.locked),
std::begin(info.unlocked), std::end(info.unlocked),
std::inserter(diff, diff.begin()));
ASSERT(diff.size() == 0 || diff.size() == 1);
if (diff.size() == 1) {
uint64_t client_id = *std::begin(diff);
Debug("Assigning lock %lu: %s", client_id, key.c_str());
locks[key] = client_id;
}
}
}
std::map<opid_t, std::string>
LockServer::Merge(const std::map<opid_t, std::vector<RecordEntry>> &d,
const std::map<opid_t, std::vector<RecordEntry>> &u,
const std::map<opid_t, std::string> &majority_results_in_d) {
// First note that d and u only contain consensus operations, and lock
// requests are the only consensus operations (unlock is an inconsistent
// operation), so d and u only contain lock requests. To merge, we grant
// any majority successful lock request in d if it does not conflict with a
// currently held lock. We do not grant any other lock request.
std::map<opid_t, std::string> results;
using EntryVec = std::vector<RecordEntry>;
for (const std::pair<const opid_t, EntryVec>& p: d) {
const opid_t &opid = p.first;
const EntryVec &entries = p.second;
// Get the request and reply.
const RecordEntry &entry = *std::begin(entries);
Request request;
request.ParseFromString(entry.request.op());
Reply reply;
auto iter = majority_results_in_d.find(opid);
ASSERT(iter != std::end(majority_results_in_d));
reply.ParseFromString(iter->second);
// Form the final result.
const bool operation_successful = reply.status() == 0;
if (operation_successful) {
// If the lock was successful, then we acquire the lock so long as
// it is not already held.
const std::string &key = reply.key();
if (locks.count(key) == 0) {
Debug("Assigning lock %lu: %s", request.clientid(),
key.c_str());
locks[key] = request.clientid();
results[opid] = iter->second;
} else {
Debug("Rejecting lock %lu: %s", request.clientid(),
key.c_str());
reply.set_status(-1);
std::string s;
reply.SerializeToString(&s);
results[opid] = s;
}
} else {
// If the lock was not successful, then we maintain this as the
// majority result.
results[opid] = iter->second;
}
}
// We reject all lock requests in u. TODO: We could acquire a lock if
// it is free, but it's simplest to just reject them unilaterally.
for (const std::pair<const opid_t, EntryVec>& p: u) {
const opid_t &opid = p.first;
const EntryVec &entries = p.second;
const RecordEntry &entry = *std::begin(entries);
Request request;
request.ParseFromString(entry.request.op());
Debug("Rejecting lock %lu: %s", request.clientid(),
request.key().c_str());
Reply reply;
reply.set_key(request.key());
reply.set_status(-1);
std::string s;
reply.SerializeToString(&s);
results[opid] = s;
}
return results;
}
} // namespace lockserver
......@@ -31,15 +31,18 @@
#ifndef _IR_LOCK_SERVER_H_
#define _IR_LOCK_SERVER_H_
#include "lib/udptransport.h"
#include "replication/ir/replica.h"
#include "lockserver/locks-proto.pb.h"
#include <string>
#include <unordered_map>
#include "lib/transport.h"
#include "replication/ir/replica.h"
#include "lockserver/locks-proto.pb.h"
namespace lockserver {
using opid_t = replication::ir::opid_t;
using RecordEntry = replication::ir::RecordEntry;
class LockServer : public replication::ir::IRAppReplica
{
public:
......@@ -47,13 +50,22 @@ public:
~LockServer();
// Invoke inconsistent operation, no return value
void ExecInconsistentUpcall(const string &str1);
void ExecInconsistentUpcall(const string &str1) override;
// Invoke consensus operation
void ExecConsensusUpcall(const string &str1, string &str2);
void ExecConsensusUpcall(const string &str1, string &str2) override;
// Invoke unreplicated operation
void UnloggedUpcall(const string &str1, string &str2);
void UnloggedUpcall(const string &str1, string &str2) override;
// Sync
void Sync(const std::map<opid_t, RecordEntry>& record) override;
// Merge
std::map<opid_t, std::string> Merge(
const std::map<opid_t, std::vector<RecordEntry>> &d,
const std::map<opid_t, std::vector<RecordEntry>> &u,
const std::map<opid_t, std::string> &majority_results_in_d) override;
private:
std::unordered_map<std::string, uint64_t> locks;
......
d := $(dir $(lastword $(MAKEFILE_LIST)))
GTEST_SRCS += $(addprefix $(d), lockserver-test.cc)
$(d)lockserver-test: $(o)lockserver-test.o \
$(o)../locks-proto.o \
$(o)../server.o \
$(o)../client.o \
$(OBJS-ir-replica) \
$(OBJS-ir-client) \
$(LIB-configuration) \
$(LIB-repltransport) \
$(LIB-store-common) \
$(GTEST_MAIN)
TEST_BINS += $(d)lockserver-test
// -*- mode: c++; c-file-style: "k&r"; c-basic-offset: 4 -*-
/***********************************************************************
*
* lockserver_test.cc:
* test cases for lock server
*
* Copyright 2013 Dan R. K. Ports <drkp@cs.washington.edu>
*
* Permission is hereby granted, free of charge, to any person
* obtaining a copy of this software and associated documentation
* files (the "Software"), to deal in the Software without
* restriction, including without limitation the rights to use, copy,
* modify, merge, publish, distribute, sublicense, and/or sell copies
* of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be
* included in all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
* MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
* NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
* BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
* ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
* CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*
**********************************************************************/
#include <fstream>
#include <memory>
#include <thread>
#include <gtest/gtest.h>
#include "lib/configuration.h"
#include "lib/repltransport.h"
#include "lockserver/client.h"
#include "lockserver/server.h"
#include "replication/ir/replica.h"
class LockServerTest : public testing::Test {
protected:
std::vector<transport::ReplicaAddress> replica_addrs_;
std::unique_ptr<transport::Configuration> config_;
ReplTransport transport_;
std::vector<std::unique_ptr<lockserver::LockClient>> clients_;
std::vector<std::unique_ptr<lockserver::LockServer>> servers_;
std::vector<std::unique_ptr<replication::ir::IRReplica>> replicas_;
LockServerTest() {
replica_addrs_ = {{"replica", "0"},
{"replica", "1"},
{"replica", "2"},
{"replica", "3"},
{"replica", "4"}};
config_ = std::unique_ptr<transport::Configuration>(
new transport::Configuration(5, 2, replica_addrs_));
RemovePersistedFiles();
for (std::size_t i = 0; i < 3; ++i) {
auto client = std::unique_ptr<lockserver::LockClient>(
new lockserver::LockClient(&transport_, *config_));
client->lock_async(std::to_string(i));
clients_.push_back(std::move(client));
}
for (std::size_t i = 0; i < replica_addrs_.size(); ++i) {
auto server = std::unique_ptr<lockserver::LockServer>(
new lockserver::LockServer());
servers_.push_back(std::move(server));
auto replica = std::unique_ptr<replication::ir::IRReplica>(
new replication::ir::IRReplica(*config_, i, &transport_,
servers_[i].get()));
replicas_.push_back(std::move(replica));
}
}
virtual void TearDown() {
RemovePersistedFiles();
}
virtual void RemovePersistedFiles() {
for (std::size_t i = 0; i < replica_addrs_.size(); ++i) {
const transport::ReplicaAddress &addr = replica_addrs_[i];
const std::string filename =
addr.host + ":" + addr.port + "_" + std::to_string(i) + ".bin";
std::ifstream f(filename);
if (f.good()) {
int success = std::remove(filename.c_str());
ASSERT(success == 0);
}
}
}
};
// Note that these tests are all white box smoke tests. They depend on the
// low-level details of knowing exactly which timeouts are registered and which
// messages are sent. If an implementation detail is changed to make some of
// these tests fail, you should cal transport_.Run() and walk through the
// execution to trigger the desired behavior. Also, they only check to make
// sure that nothing crashes, though you can read through the Debug prints to
// make sure everything looks right.
//
// TODO: Use a ReplTransport for tests like the ones in ir-test.cc to assert
// that the correct messages are being sent.
TEST_F(LockServerTest, SuccessfulFastPathLock) {
// Send client 0's lock request.
transport_.TriggerTimer(1);
// Deliver lock request to replicas.
for (const auto &addr : replica_addrs_) {
transport_.DeliverMessage({addr.host, addr.port}, 0);
}
// Deliver lock reply to client.
for (std::size_t i = 0; i < replica_addrs_.size(); ++i) {
transport_.DeliverMessage({"client", "0"}, i);
}
// Deliver finalize to replicas.
for (const auto &addr : replica_addrs_) {
transport_.DeliverMessage({addr.host, addr.port}, 1);
}
// Deliver confirm to client.
int j = replica_addrs_.size();
for (std::size_t i = j; i < j + replica_addrs_.size(); ++i) {
transport_.DeliverMessage({"client", "0"}, i);
}
}
TEST_F(LockServerTest, SuccessfulSlowPathLock) {
// Send client 0's lock request.
transport_.TriggerTimer(1);
// Transition to slow path.
transport_.TriggerTimer(clients_.size() + replica_addrs_.size() + 1);
// Deliver lock request to replicas.
for (const auto &addr : replica_addrs_) {
transport_.DeliverMessage({addr.host, addr.port}, 0);
}
// Deliver lock reply to client.
for (std::size_t i = 0; i < replica_addrs_.size(); ++i) {
transport_.DeliverMessage({"client", "0"}, i);
}
// Deliver finalize to replicas.
for (const auto &addr : replica_addrs_) {
transport_.DeliverMessage({addr.host, addr.port}, 1);
}
// Deliver confirm to client.
int j = replica_addrs_.size();
for (std::size_t i = j; i < j + replica_addrs_.size(); ++i) {
transport_.DeliverMessage({"client", "0"}, i);
}
}
TEST_F(LockServerTest, SuccessfulViewChange) {
// Send client 0's lock request.
transport_.TriggerTimer(1);
// Deliver lock request to replicas.
for (const auto &addr : replica_addrs_) {
transport_.DeliverMessage({addr.host, addr.port}, 0);
}
// Initiate view changes on all replicas.
const std::size_t nclients = clients_.size();
const std::size_t nreplicas = replica_addrs_.size();
for (std::size_t i = nclients + 1; i < nclients + nreplicas + 1; ++i) {
transport_.TriggerTimer(i);
}
// Deliver DoViewChangeMessages to new primary.
const transport::ReplicaAddress& primary = replica_addrs_[1];
for (std::size_t i = 1; i < 1 + nreplicas - 1; ++i) {
transport_.DeliverMessage({primary.host, primary.port}, i);
}
// Deliver StartViewMessage to all replicas.
for (std::size_t i = 0; i < nreplicas; ++i) {
if (i == 1) {
continue;
}
const transport::ReplicaAddress& addr = replica_addrs_[i];
transport_.DeliverMessage({addr.host, addr.port}, nreplicas);
}
}
TEST_F(LockServerTest, SuccessfulViewChangeNonemptyRdu) {
const std::size_t nclients = clients_.size();
const std::size_t nreplicas = replica_addrs_.size();
ASSERT_GE(nclients, 3);
ASSERT_GE(nreplicas, 3);
// Send client 0's lock request.
transport_.TriggerTimer(1);
// Deliver lock request to replicas.
for (const auto &addr : replica_addrs_) {
transport_.DeliverMessage({addr.host, addr.port}, 0);
}
// Deliver lock reply to client.
for (std::size_t i = 0; i < replica_addrs_.size(); ++i) {
transport_.DeliverMessage({"client", "0"}, i);
}
// Deliver finalize to replicas.
for (const auto &addr : replica_addrs_) {
transport_.DeliverMessage({addr.host, addr.port}, 1);
}
// Send client 1's lock request.
transport_.TriggerTimer(2);
// Deliver lock request to first three replicas.
for (std::size_t i = 0; i < 3; ++i) {
const transport::ReplicaAddress &addr = replica_addrs_[i];
transport_.DeliverMessage({addr.host, addr.port}, 2);
}
// Send client 2's lock request.
transport_.TriggerTimer(3);
// Deliver lock request to first replica.
const transport::ReplicaAddress &addr = replica_addrs_[0];
transport_.DeliverMessage({addr.host, addr.port}, 3);
// View change first three replicas.
for (std::size_t i = nclients + 1; i < nclients + 1 + 3; ++i) {
transport_.TriggerTimer(i);
}
// Deliver DoViewChangeMessages to new primary.
const transport::ReplicaAddress& primary = replica_addrs_[1];
for (std::size_t i = 4; i < 4 + 2; ++i) {
transport_.DeliverMessage({primary.host, primary.port}, i);
}
// Deliver StartViewMessage to replica 0 and 2.
const transport::ReplicaAddress& addr0 = replica_addrs_[0];
const transport::ReplicaAddress& addr2 = replica_addrs_[2];
transport_.DeliverMessage({addr0.host, addr0.port}, 6);
transport_.DeliverMessage({addr2.host, addr2.port}, 6);
}
TEST_F(LockServerTest, FinalizeConsensusReply) {
const std::size_t nclients = clients_.size();
const std::size_t nreplicas = replica_addrs_.size();
// Send client 0's lock request.
transport_.TriggerTimer(1);
// Deliver lock request to replicas.
for (const auto &addr : replica_addrs_) {
transport_.DeliverMessage({addr.host, addr.port}, 0);
}
// Trigger view change.
for (std::size_t i = nclients + 1; i < nclients + 1 + nreplicas; ++i) {
transport_.TriggerTimer(i);
}
// Deliver DoViewChangeMessages to new primary.
const transport::ReplicaAddress& primary = replica_addrs_[1];
for (std::size_t i = 1; i < 1 + nreplicas - 1; ++i) {
transport_.DeliverMessage({primary.host, primary.port}, i);
}
// Deliver StartViewMessage to all replicas.
for (std::size_t i = 0; i < nreplicas; ++i) {
if (i == 1) {
continue;
}
const transport::ReplicaAddress& addr = replica_addrs_[i];
transport_.DeliverMessage({addr.host, addr.port}, nreplicas);
}
// Deliver lock request to replicas.
for (const auto &addr : replica_addrs_) {
transport_.DeliverMessage({addr.host, addr.port}, 0);
}
// Deliver finalized reply to client.
transport_.DeliverMessage({"client", "0"}, nreplicas);
}
TEST_F(LockServerTest, MismatchedConsensus) {
const std::size_t nclients = clients_.size();
const std::size_t nreplicas = replica_addrs_.size();
// Send client 0's lock request.
transport_.TriggerTimer(1);
// Transition to slow path.
transport_.TriggerTimer(nclients + nreplicas + 1);
// Deliver lock request to replicas.
for (const auto &addr : replica_addrs_) {
transport_.DeliverMessage({addr.host, addr.port}, 0);
}
// Deliver lock reply to client.
for (std::size_t i = 0; i < replica_addrs_.size(); ++i) {
transport_.DeliverMessage({"client", "0"}, i);
}
// Trigger view change.
for (std::size_t i = nclients + 1; i < nclients + 1 + nreplicas; ++i) {
transport_.TriggerTimer(i);
}
// Deliver DoViewChangeMessages to new primary.
const transport::ReplicaAddress& primary = replica_addrs_[1];
for (std::size_t i = 2; i < 2 + nreplicas - 1; ++i) {
transport_.DeliverMessage({primary.host, primary.port}, i);
}
// Deliver StartViewMessage to all replicas.
for (std::size_t i = 0; i < nreplicas; ++i) {
if (i == 1) {
continue;
}
const transport::ReplicaAddress& addr = replica_addrs_[i];
transport_.DeliverMessage({addr.host, addr.port}, 2 + nreplicas - 1);
}
// Deliver FinalizeConsensusMessage to all replicas.
for (const auto &addr : replica_addrs_) {
transport_.DeliverMessage({addr.host, addr.port}, 1);
}
// Deliver ConfirmMessages to client 0.
for (std::size_t i = nreplicas; i < nreplicas + nreplicas; ++i) {
transport_.DeliverMessage({"client", "0"}, i);
}
}
......@@ -36,7 +36,19 @@
#include <random>
namespace replication {
std::string ErrorCodeToString(ErrorCode err) {
switch (err) {
case ErrorCode::TIMEOUT:
return "TIMEOUT";
case ErrorCode::MISMATCHED_CONSENSUS_VIEWS:
return "MISMATCHED_CONSENSUS_VIEWS";
default:
Assert(false);
return "";
}
}
Client::Client(const transport::Configuration &config, Transport *transport,
uint64_t clientid)
: config(config), transport(transport)
......@@ -61,7 +73,7 @@ Client::~Client()
{
}
void
Client::ReceiveMessage(const TransportAddress &remote,
const string &type, const string &data)
......@@ -69,5 +81,5 @@ Client::ReceiveMessage(const TransportAddress &remote,
Panic("Received unexpected message type: %s",
type.c_str());
}
} // namespace replication
......@@ -42,32 +42,54 @@
namespace replication {
// A client's request may fail for various reasons. For example, if enough
// replicas are down, a client's request may time out. An ErrorCode indicates
// the reason that a client's request failed.
enum class ErrorCode {
// For whatever reason (failed replicas, slow network), the request took
// too long and timed out.
TIMEOUT,
// For IR, if a client issues a consensus operation and receives a majority
// of replies and confirms in different views, then the operation fails.
MISMATCHED_CONSENSUS_VIEWS
};
std::string ErrorCodeToString(ErrorCode err);
class Client : public TransportReceiver
{
public:
typedef std::function<void (const string &, const string &)> continuation_t;
typedef std::function<void (const string &)> timeout_continuation_t;
using continuation_t =
std::function<void(const string &request, const string &reply)>;
using error_continuation_t =
std::function<void(const string &request, ErrorCode err)>;
static const uint32_t DEFAULT_UNLOGGED_OP_TIMEOUT = 1000; // milliseconds
Client(const transport::Configuration &config, Transport *transport,
uint64_t clientid = 0);
virtual ~Client();
virtual void Invoke(const string &request,
continuation_t continuation) = 0;
virtual void InvokeUnlogged(int replicaIdx,
const string &request,
continuation_t continuation,
timeout_continuation_t timeoutContinuation = nullptr,
uint32_t timeout = DEFAULT_UNLOGGED_OP_TIMEOUT) = 0;
virtual void Invoke(
const string &request,
continuation_t continuation,
error_continuation_t error_continuation = nullptr) = 0;
virtual void InvokeUnlogged(
int replicaIdx,
const string &request,
continuation_t continuation,
error_continuation_t error_continuation = nullptr,
uint32_t timeout = DEFAULT_UNLOGGED_OP_TIMEOUT) = 0;
virtual void ReceiveMessage(const TransportAddress &remote,
const string &type,
const string &data);
protected:
transport::Configuration config;
Transport *transport;
uint64_t clientid;
};
......
......@@ -35,7 +35,7 @@
#define _COMMON_QUORUMSET_H_
namespace replication {
template <class IDTYPE, class MSGTYPE>
class QuorumSet
{
......@@ -43,9 +43,9 @@ public:
QuorumSet(int numRequired)
: numRequired(numRequired)
{
}
void
Clear()
{
......@@ -80,7 +80,20 @@ public:
return &vsmessages;
} else {
return NULL;
}
}
}
const std::map<int, MSGTYPE> *
CheckForQuorum()
{
for (const auto &p : messages) {
const IDTYPE &vs = p.first;
const std::map<int, MSGTYPE> *quorum = CheckForQuorum(vs);
if (quorum != nullptr) {
return quorum;
}
}
return nullptr;
}
const std::map<int, MSGTYPE> *
......@@ -98,7 +111,7 @@ public:
}
vsmessages[replicaIdx] = msg;
return CheckForQuorum(vs);
}
......@@ -107,7 +120,7 @@ public:
{
AddAndCheckForQuorum(vs, replicaIdx, msg);
}
public:
int numRequired;
private:
......
......@@ -12,7 +12,7 @@ OBJS-ir-client := $(o)ir-proto.o $(o)client.o \
OBJS-ir-replica := $(o)record.o $(o)replica.o $(o)ir-proto.o \
$(OBJS-replica) $(LIB-message) \
$(LIB-configuration)
$(LIB-configuration) $(LIB-persistent_register)
include $(d)tests/Rules.mk
This diff is collapsed.
......@@ -38,6 +38,8 @@
#include "replication/ir/ir-proto.pb.h"
#include <functional>
#include <map>
#include <memory>
#include <set>
#include <unordered_map>
......@@ -47,104 +49,173 @@ namespace ir {
class IRClient : public Client
{
public:
typedef std::function<string (const std::set<string> &)> decide_t;
using result_set_t = std::map<string, std::size_t>;
using decide_t = std::function<string(const result_set_t &)>;
IRClient(const transport::Configuration &config,
Transport *transport,
uint64_t clientid = 0);
virtual ~IRClient();
virtual void Invoke(const string &request,
continuation_t continuation);
virtual void InvokeInconsistent(const string &request,
continuation_t continuation);
virtual void InvokeConsensus(const string &request,
decide_t decide,
continuation_t continuation);
virtual void InvokeUnlogged(int replicaIdx,
const string &request,
continuation_t continuation,
timeout_continuation_t timeoutContinuation = nullptr,
uint32_t timeout = DEFAULT_UNLOGGED_OP_TIMEOUT);
virtual void ReceiveMessage(const TransportAddress &remote,
const string &type, const string &data);
virtual void Invoke(
const string &request,
continuation_t continuation,
error_continuation_t error_continuation = nullptr) override;
virtual void InvokeUnlogged(
int replicaIdx,
const string &request,
continuation_t continuation,
error_continuation_t error_continuation = nullptr,
uint32_t timeout = DEFAULT_UNLOGGED_OP_TIMEOUT) override;
virtual void ReceiveMessage(
const TransportAddress &remote,
const string &type,
const string &data) override;
virtual void InvokeInconsistent(
const string &request,
continuation_t continuation,
error_continuation_t error_continuation = nullptr);
virtual void InvokeConsensus(
const string &request,
decide_t decide,
continuation_t continuation,
error_continuation_t error_continuation = nullptr);
protected:
struct PendingRequest
{
struct PendingRequest {
string request;
uint64_t clientReqId;
continuation_t continuation;
bool continuationInvoked = false;
Timeout *timer;
QuorumSet<viewstamp_t, proto::ConfirmMessage> confirmQuorum;
inline PendingRequest(string request,
uint64_t clientReqId,
continuation_t continuation,
Timeout *timer,
int quorumSize) :
request(request), clientReqId(clientReqId),
continuation(continuation), timer(timer),
confirmQuorum(quorumSize) { };
inline ~PendingRequest() { delete timer; };
};
continuation_t continuation;
bool continuationInvoked = false;
std::unique_ptr<Timeout> timer;
QuorumSet<viewstamp_t, proto::ConfirmMessage> confirmQuorum;
struct PendingUnloggedRequest : public PendingRequest
{
timeout_continuation_t timeoutContinuation;
inline PendingUnloggedRequest(string request,
uint64_t clientReqId,
continuation_t continuation,
timeout_continuation_t timeoutContinuation,
Timeout *timer) :
PendingRequest(request, clientReqId,
continuation, timer, 1),
timeoutContinuation(timeoutContinuation) { };
inline PendingRequest(string request, uint64_t clientReqId,
continuation_t continuation,
std::unique_ptr<Timeout> timer, int quorumSize)
: request(request),
clientReqId(clientReqId),
continuation(continuation),
timer(std::move(timer)),
confirmQuorum(quorumSize){};
virtual ~PendingRequest(){};
};
struct PendingInconsistentRequest : public PendingRequest
{
QuorumSet<viewstamp_t, proto::ReplyInconsistentMessage> inconsistentReplyQuorum;
inline PendingInconsistentRequest(string request,
uint64_t clientReqId,
continuation_t continuation,
Timeout *timer,
int quorumSize) :
PendingRequest(request, clientReqId,
continuation, timer, quorumSize),
inconsistentReplyQuorum(quorumSize) { };
struct PendingUnloggedRequest : public PendingRequest {
error_continuation_t error_continuation;
inline PendingUnloggedRequest(
string request, uint64_t clientReqId, continuation_t continuation,
error_continuation_t error_continuation,
std::unique_ptr<Timeout> timer)
: PendingRequest(request, clientReqId, continuation,
std::move(timer), 1),
error_continuation(error_continuation){};
};
struct PendingConsensusRequest : public PendingRequest
{
QuorumSet<viewstamp_t, proto::ReplyConsensusMessage> consensusReplyQuorum;
decide_t decide;
string decideResult;
inline PendingConsensusRequest(string request,
uint64_t clientReqId,
continuation_t continuation,
Timeout *timer,
int quorumSize,
int superQuorum,
decide_t decide) :
PendingRequest(request, clientReqId,
continuation, timer, quorumSize),
consensusReplyQuorum(superQuorum),
decide(decide) { };
struct PendingInconsistentRequest : public PendingRequest {
QuorumSet<viewstamp_t, proto::ReplyInconsistentMessage>
inconsistentReplyQuorum;
inline PendingInconsistentRequest(string request, uint64_t clientReqId,
continuation_t continuation,
std::unique_ptr<Timeout> timer,
int quorumSize)
: PendingRequest(request, clientReqId, continuation,
std::move(timer), quorumSize),
inconsistentReplyQuorum(quorumSize){};
};
struct PendingConsensusRequest : public PendingRequest {
QuorumSet<opnum_t, proto::ReplyConsensusMessage> consensusReplyQuorum;
decide_t decide;
string decideResult;
const std::size_t quorumSize;
const std::size_t superQuorumSize;
bool on_slow_path;
error_continuation_t error_continuation;
// The timer to give up on the fast path and transition to the slow
// path. After this timer is run for the first time, it is nulled.
std::unique_ptr<Timeout> transition_to_slow_path_timer;
// The view for which a majority result (or finalized result) was
// found. The view of a majority of confirms must match this view.
uint64_t reply_consensus_view = 0;
// True when a consensus request has already received a quorum or super
// quorum of replies and has already transitioned into the confirm
// phase.
bool sent_confirms = false;
inline PendingConsensusRequest(
string request, uint64_t clientReqId, continuation_t continuation,
std::unique_ptr<Timeout> timer,
std::unique_ptr<Timeout> transition_to_slow_path_timer,
int quorumSize, int superQuorum, decide_t decide,
error_continuation_t error_continuation)
: PendingRequest(request, clientReqId, continuation,
std::move(timer), quorumSize),
consensusReplyQuorum(quorumSize),
decide(decide),
quorumSize(quorumSize),
superQuorumSize(superQuorum),
on_slow_path(false),
error_continuation(error_continuation),
transition_to_slow_path_timer(
std::move(transition_to_slow_path_timer)){};
};
uint64_t view;
uint64_t lastReqId;
std::unordered_map<uint64_t, PendingRequest *> pendingReqs;
void SendInconsistent(const PendingInconsistentRequest *req);
void ResendInconsistent(const uint64_t reqId);
void ConsensusSlowPath(const uint64_t reqId);
void ResendInconsistent(const uint64_t reqId);
void SendConsensus(const PendingConsensusRequest *req);
void ResendConsensus(const uint64_t reqId);
// `TransitionToConsensusSlowPath` is called after a timeout to end the
// possibility of taking the fast path and transition into taking the slow
// path.
void TransitionToConsensusSlowPath(const uint64_t reqId);
// HandleSlowPathConsensus is called in one of two scenarios:
//
// 1. A finalized ReplyConsensusMessage was received. In this case, we
// immediately enter the slow path and use the finalized result. If
// finalized is true, req has already been populated with the
// finalized result.
// 2. We're in the slow path and receive a majority of
// ReplyConsensusMessages in the same view. In this case, we call
// decide to determine the final result.
//
// In either case, HandleSlowPathConsensus intitiates the finalize phase of
// a consensus request.
void HandleSlowPathConsensus(
const uint64_t reqid,
const std::map<int, proto::ReplyConsensusMessage> &msgs,
const bool finalized_result_found,
PendingConsensusRequest *req);
// HandleFastPathConsensus is called when we're on the fast path and
// receive a super quorum of responses from the same view.
// HandleFastPathConsensus will check to see if there is a superquorum of
// matching responses. If there is, it will return to the user and
// asynchronously intitiate the finalize phase of a consensus request.
// Otherwise, it transitions into the slow path which will also initiate
// the finalize phase of a consensus request, but not yet return to the
// user.
void HandleFastPathConsensus(
const uint64_t reqid,
const std::map<int, proto::ReplyConsensusMessage> &msgs,
PendingConsensusRequest *req);
void ResendConfirmation(const uint64_t reqId, bool isConsensus);
void HandleInconsistentReply(const TransportAddress &remote,
const proto::ReplyInconsistentMessage &msg);
void HandleConsensusReply(const TransportAddress &remote,
const proto::ReplyConsensusMessage &msg);
const proto::ReplyConsensusMessage &msg);
void HandleConfirm(const TransportAddress &remote,
const proto::ConfirmMessage &msg);
void HandleUnloggedReply(const TransportAddress &remote,
......
......@@ -9,6 +9,40 @@ message OpID {
required uint64 clientreqid = 2;
}
// For the view change and recovery protocol, a replica stores two things on
// disk: (1) its current view and (2) the latest view during which it was
// NORMAL. Replicas pack this information into this proto buf and serialize it
// to disk.
message PersistedViewInfo {
required uint64 view = 1;
required uint64 latest_normal_view = 2;
}
enum RecordEntryState {
RECORD_STATE_TENTATIVE = 0;
RECORD_STATE_FINALIZED = 1;
}
enum RecordEntryType {
RECORD_TYPE_INCONSISTENT = 0;
RECORD_TYPE_CONSENSUS = 1;
}
message RecordEntryProto {
required uint64 view = 1;
required OpID opid = 2;
required RecordEntryState state = 3;
required RecordEntryType type = 4;
required bytes op = 5;
required bytes result = 6;
}
// TODO: Currently, replicas send entire records to one another. Figure out if
// there is a more efficient way to exchange records.
message RecordProto {
repeated RecordEntryProto entry = 1;
}
message ProposeInconsistentMessage {
required replication.Request req = 1;
}
......@@ -17,6 +51,7 @@ message ReplyInconsistentMessage {
required uint64 view = 1;
required uint32 replicaIdx = 2;
required OpID opid = 3;
required bool finalized = 4;
}
message FinalizeInconsistentMessage {
......@@ -38,6 +73,7 @@ message ReplyConsensusMessage {
required uint32 replicaIdx = 2;
required OpID opid = 3;
required bytes result = 4;
required bool finalized = 5;
}
message FinalizeConsensusMessage {
......@@ -45,6 +81,19 @@ message FinalizeConsensusMessage {
required bytes result = 2;
}
message DoViewChangeMessage {
required uint32 replicaIdx = 1;
// record is optional because a replica only sends its record to the
// leader of the new view.
optional RecordProto record = 2;
required uint64 new_view = 3;
required uint64 latest_normal_view = 4;
}
message StartViewMessage {
required RecordProto record = 1;
required uint64 new_view = 2;
}
message UnloggedRequestMessage {
required replication.UnloggedRequest req = 1;
}
......
......@@ -29,35 +29,52 @@
**********************************************************************/
#include "replication/ir/record.h"
#include <utility>
#include "lib/assert.h"
namespace replication {
namespace ir {
Record::Record(const proto::RecordProto &record_proto) {
for (const proto::RecordEntryProto &entry_proto : record_proto.entry()) {
const view_t view = entry_proto.view();
const opid_t opid = std::make_pair(entry_proto.opid().clientid(),
entry_proto.opid().clientreqid());
Request request;
request.set_op(entry_proto.op());
request.set_clientid(entry_proto.opid().clientid());
request.set_clientreqid(entry_proto.opid().clientreqid());
proto::RecordEntryState state = entry_proto.state();
proto::RecordEntryType type = entry_proto.type();
const std::string& result = entry_proto.result();
Add(view, opid, request, state, type, result);
}
}
RecordEntry &
Record::Add(const RecordEntry& entry) {
// Make sure this isn't a duplicate
ASSERT(entries.count(entry.opid) == 0);
entries[entry.opid] = entry;
return entries[entry.opid];
}
RecordEntry &
Record::Add(view_t view, opid_t opid, const Request &request,
RecordEntryState state)
proto::RecordEntryState state, proto::RecordEntryType type)
{
RecordEntry entry;
entry.view = view;
entry.opid = opid;
entry.request = request;
entry.state = state;
// Make sure this isn't a duplicate
ASSERT(entries.count(opid) == 0);
entries[opid] = entry;
return entries[opid];
return Add(RecordEntry(view, opid, state, type, request, ""));
}
RecordEntry &
Record::Add(view_t view, opid_t opid, const Request &request,
RecordEntryState state, const string &result)
proto::RecordEntryState state, proto::RecordEntryType type,
const string &result)
{
RecordEntry entry = Add(view, opid, request, state);
RecordEntry &entry = Add(view, opid, request, state, type);
entry.result = result;
return entries[opid];
}
......@@ -76,7 +93,7 @@ Record::Find(opid_t opid)
bool
Record::SetStatus(opid_t op, RecordEntryState state)
Record::SetStatus(opid_t op, proto::RecordEntryState state)
{
RecordEntry *entry = Find(op);
if (entry == NULL) {
......@@ -116,12 +133,33 @@ Record::Remove(opid_t opid)
{
entries.erase(opid);
}
bool
Record::Empty() const
{
return entries.empty();
}
void
Record::ToProto(proto::RecordProto *proto) const
{
for (const std::pair<const opid_t, RecordEntry> &p : entries) {
const RecordEntry &entry = p.second;
proto::RecordEntryProto *entry_proto = proto->add_entry();
entry_proto->set_view(entry.view);
entry_proto->mutable_opid()->set_clientid(entry.opid.first);
entry_proto->mutable_opid()->set_clientreqid(entry.opid.second);
entry_proto->set_state(entry.state);
entry_proto->set_type(entry.type);
entry_proto->set_op(entry.request.op());
entry_proto->set_result(entry.result);
}
}
const std::map<opid_t, RecordEntry> &Record::Entries() const {
return entries;
}
} // namespace ir
} // namespace replication
......@@ -31,61 +31,89 @@
#ifndef _IR_RECORD_H_
#define _IR_RECORD_H_
#include "replication/common/request.pb.h"
#include <map>
#include <string>
#include <utility>
#include "lib/assert.h"
#include "lib/message.h"
#include "lib/transport.h"
#include "replication/common/request.pb.h"
#include "replication/common/viewstamp.h"
#include <map>
#include <string>
#include <utility>
#include "replication/ir/ir-proto.pb.h"
namespace replication {
namespace ir {
enum RecordEntryState {
RECORD_STATE_TENTATIVE,
RECORD_STATE_FINALIZED
};
typedef std::pair<uint64_t, uint64_t> opid_t;
struct RecordEntry
{
view_t view;
opid_t opid;
RecordEntryState state;
proto::RecordEntryState state;
proto::RecordEntryType type;
Request request;
std::string result;
RecordEntry() { result = ""; }
RecordEntry(const RecordEntry &x)
: view(x.view), opid(x.opid), state(x.state), request(x.request),
result(x.result) { }
RecordEntry(view_t view, opid_t opid, RecordEntryState state,
const Request &request, const std::string &result)
: view(view), opid(opid), state(state), request(request),
result(result) { }
virtual ~RecordEntry() { }
: view(x.view),
opid(x.opid),
state(x.state),
type(x.type),
request(x.request),
result(x.result) {}
RecordEntry(view_t view, opid_t opid, proto::RecordEntryState state,
proto::RecordEntryType type, const Request &request,
const std::string &result)
: view(view),
opid(opid),
state(state),
type(type),
request(request),
result(result) {}
virtual ~RecordEntry() {}
};
class Record
{
public:
Record() {};
RecordEntry & Add(view_t view, opid_t opid, const Request &request, RecordEntryState state);
RecordEntry & Add(view_t view, opid_t opid, const Request &request, RecordEntryState state, const std::string &result);
RecordEntry * Find(opid_t opid);
bool SetStatus(opid_t opid, RecordEntryState state);
// Use the copy-and-swap idiom to make Record moveable but not copyable
// [1]. We make it non-copyable to avoid unnecessary copies.
//
// [1]: https://stackoverflow.com/a/3279550/3187068
Record(){};
Record(const proto::RecordProto &record_proto);
Record(Record &&other) : Record() { swap(*this, other); }
Record(const Record &) = delete;
Record &operator=(const Record &) = delete;
Record &operator=(Record &&other) {
swap(*this, other);
return *this;
}
friend void swap(Record &x, Record &y) {
std::swap(x.entries, y.entries);
}
RecordEntry &Add(const RecordEntry& entry);
RecordEntry &Add(view_t view, opid_t opid, const Request &request,
proto::RecordEntryState state,
proto::RecordEntryType type);
RecordEntry &Add(view_t view, opid_t opid, const Request &request,
proto::RecordEntryState state, proto::RecordEntryType type,
const std::string &result);
RecordEntry *Find(opid_t opid);
bool SetStatus(opid_t opid, proto::RecordEntryState state);
bool SetResult(opid_t opid, const std::string &result);
bool SetRequest(opid_t opid, const Request &req);
void Remove(opid_t opid);
bool Empty() const;
void ToProto(proto::RecordProto *proto) const;
const std::map<opid_t, RecordEntry> &Entries() const;
private:
std::map<opid_t, RecordEntry> entries;
};
} // namespace ir
......
This diff is collapsed.
......@@ -9,17 +9,22 @@
#ifndef _IR_REPLICA_H_
#define _IR_REPLICA_H_
#include <memory>
#include "lib/assert.h"
#include "lib/configuration.h"
#include "lib/message.h"
#include "lib/persistent_register.h"
#include "lib/udptransport.h"
#include "lib/configuration.h"
#include "replication/ir/record.h"
#include "replication/common/quorumset.h"
#include "replication/common/replica.h"
#include "replication/ir/ir-proto.pb.h"
#include "replication/ir/record.h"
namespace replication {
namespace ir {
class IRAppReplica
{
public:
......@@ -31,32 +36,28 @@ public:
virtual void ExecConsensusUpcall(const string &str1, string &str2) { };
// Invoke unreplicated operation
virtual void UnloggedUpcall(const string &str1, string &str2) { };
// Sync
virtual void Sync(const std::map<opid_t, RecordEntry>& record) { };
// Merge
virtual std::map<opid_t, std::string> Merge(
const std::map<opid_t, std::vector<RecordEntry>> &d,
const std::map<opid_t, std::vector<RecordEntry>> &u,
const std::map<opid_t, std::string> &majority_results_in_d) {
return {};
};
};
class IRReplica : TransportReceiver
{
private:
view_t view;
// Index of 'this' replica, and handle to transport layer.
int myIdx;
Transport *transport;
IRAppReplica *app;
// record for this replica
Record record;
public:
IRReplica(transport::Configuration config, int myIdx,
Transport *transport, IRAppReplica *app);
~IRReplica();
// Message handlers.
void ReceiveMessage(const TransportAddress &remote,
const std::string &type, const std::string &data);
void HandleMessage(const TransportAddress &remote,
const std::string &type, const std::string &data);
void HandleProposeInconsistent(const TransportAddress &remote,
......@@ -67,9 +68,58 @@ public:
const proto::ProposeConsensusMessage &msg);
void HandleFinalizeConsensus(const TransportAddress &remote,
const proto::FinalizeConsensusMessage &msg);
void HandleDoViewChange(const TransportAddress &remote,
const proto::DoViewChangeMessage &msg);
void HandleStartView(const TransportAddress &remote,
const proto::StartViewMessage &msg);
void HandleUnlogged(const TransportAddress &remote,
const proto::UnloggedRequestMessage &msg);
// Timeout handlers.
void HandleViewChangeTimeout();
private:
// Persist `view` and `latest_normal_view` to disk using
// `persistent_view_info`.
void PersistViewInfo();
// Recover `view` and `latest_normal_view` from disk using
// `persistent_view_info`.
void RecoverViewInfo();
// Broadcast DO-VIEW-CHANGE messages to all other replicas with our record
// included only in the message to the leader.
void BroadcastDoViewChangeMessages();
// IrMergeRecords implements Figure 5 of the TAPIR paper.
Record IrMergeRecords(
const std::map<int, proto::DoViewChangeMessage> &records);
transport::Configuration config;
int myIdx; // Replica index into config.
Transport *transport;
IRAppReplica *app;
ReplicaStatus status;
// For the view change and recovery protocol, a replica stores its view and
// latest normal view to disk. We store this info in view and
// latest_normal_view and use persistent_view_info to persist it to disk.
view_t view;
view_t latest_normal_view;
PersistentRegister persistent_view_info;
Record record;
std::unique_ptr<Timeout> view_change_timeout;
// The leader of a view-change waits to receive a quorum of DO-VIEW-CHANGE
// messages before merging and syncing and sending out START-VIEW messages.
// do_view_change_quorum is used to wait for this quorum.
//
// TODO: Garbage collect old view change quorums. Once we've entered view
// v, we should be able to garbage collect all quorums for views less than
// v.
QuorumSet<view_t, proto::DoViewChangeMessage> do_view_change_quorum;
};
} // namespace ir
......
......@@ -39,16 +39,14 @@
#include "replication/ir/client.h"
#include "replication/ir/replica.h"
#include <cstdio>
#include <stdlib.h>
#include <stdio.h>
#include <gtest/gtest.h>
#include <vector>
#include <set>
#include <sstream>
static string replicaLastOp;
static string clientLastOp;
static string clientLastReply;
#include <memory>
using google::protobuf::Message;
using namespace replication;
......@@ -56,14 +54,15 @@ using namespace replication::ir;
using namespace replication::ir::proto;
class IRApp : public IRAppReplica {
std::vector<string> *iOps;
std::vector<string> *cOps;
std::vector<string> *unloggedOps;
public:
IRApp(std::vector<string> *i, std::vector<string> *c, std::vector<string> *u) : iOps(i), cOps(c), unloggedOps(u) { }
IRApp(std::vector<string> *i, std::vector<string> *c,
std::vector<string> *u)
: iOps(i), cOps(c), unloggedOps(u) {}
void ExecInconsistentUpcall(const string &req) {
iOps->push_back(req);
}
......@@ -72,51 +71,48 @@ public:
cOps->push_back(req);
reply = "1";
}
void UnloggedUpcall(const string &req, string &reply) {
unloggedOps->push_back(req);
reply = "unlreply: " + req;
}
};
class IRTest : public ::testing::Test
{
protected:
std::vector<IRReplica *> replicas;
IRClient *client;
SimulatedTransport *transport;
transport::Configuration *config;
std::vector<transport::ReplicaAddress> replicaAddrs;
std::unique_ptr<transport::Configuration> config;
SimulatedTransport transport;
std::vector<std::unique_ptr<IRApp>> apps;
std::vector<std::unique_ptr<IRReplica>> replicas;
std::unique_ptr<IRClient> client;
std::vector<std::vector<string> > iOps;
std::vector<std::vector<string> > cOps;
std::vector<std::vector<string> > unloggedOps;
int requestNum;
virtual void SetUp() {
std::vector<transport::ReplicaAddress> replicaAddrs =
{ { "localhost", "12345" },
{ "localhost", "12346" },
{ "localhost", "12347" }};
config = new transport::Configuration(3, 1, replicaAddrs);
transport = new SimulatedTransport();
IRTest() : requestNum(-1) {
replicaAddrs = {{"localhost", "12345"},
{"localhost", "12346"},
{"localhost", "12347"}};
config = std::unique_ptr<transport::Configuration>(
new transport::Configuration(3, 1, replicaAddrs));
iOps.resize(config->n);
cOps.resize(config->n);
unloggedOps.resize(config->n);
for (int i = 0; i < config->n; i++) {
replicas.push_back(new IRReplica(*config, i, transport,
new IRApp(&iOps[i], &cOps[i], &unloggedOps[i])));
auto ir_app = std::unique_ptr<IRApp>(
new IRApp(&iOps[i], &cOps[i], &unloggedOps[i]));
auto p = std::unique_ptr<IRReplica>(
new IRReplica(*config, i, &transport, ir_app.get()));
apps.push_back(std::move(ir_app));
replicas.push_back(std::move(p));
}
client = new IRClient(*config, transport);
requestNum = -1;
// Only let tests run for a simulated minute. This prevents
// infinite retry loops, etc.
// transport->Timer(60000, [&]() {
// transport->CancelAllTimers();
// });
client = std::unique_ptr<IRClient>(new IRClient(*config, &transport));
}
virtual string RequestOp(int n) {
......@@ -128,37 +124,41 @@ protected:
virtual string LastRequestOp() {
return RequestOp(requestNum);
}
virtual void ClientSendNextInconsistent(Client::continuation_t upcall) {
requestNum++;
client->InvokeInconsistent(LastRequestOp(), upcall);
}
virtual void ClientSendNextConsensus(Client::continuation_t upcall, IRClient::decide_t decide) {
virtual void ClientSendNextConsensus(Client::continuation_t upcall,
IRClient::decide_t decide) {
requestNum++;
client->InvokeConsensus(LastRequestOp(), decide, upcall);
}
virtual void ClientSendNextUnlogged(int idx, Client::continuation_t upcall,
Client::timeout_continuation_t timeoutContinuation = nullptr,
uint32_t timeout = Client::DEFAULT_UNLOGGED_OP_TIMEOUT) {
virtual void ClientSendNextUnlogged(
int idx, Client::continuation_t upcall,
Client::error_continuation_t error_continuation = nullptr,
uint32_t timeout = Client::DEFAULT_UNLOGGED_OP_TIMEOUT) {
requestNum++;
client->InvokeUnlogged(idx, LastRequestOp(), upcall, timeoutContinuation, timeout);
client->InvokeUnlogged(idx, LastRequestOp(), upcall,
error_continuation, timeout);
}
virtual void TearDown() {
for (auto x : replicas) {
delete x;
// Replicas store their view information in the following files:
// - localhost:12345_0.bin
// - localhost:12346_1.bin
// - localhost:12347_2.bin
// We have to make sure to delete them after every test. Otherwise,
// replicas run in recovery mode.
for (std::size_t i = 0; i < replicaAddrs.size(); ++i) {
const transport::ReplicaAddress &addr = replicaAddrs[i];
const std::string filename =
addr.host + ":" + addr.port + "_" + std::to_string(i) + ".bin";
int success = std::remove(filename.c_str());
ASSERT(success == 0);
}
replicas.clear();
iOps.clear();
cOps.clear();
unloggedOps.clear();
delete client;
delete transport;
delete config;
}
};
......@@ -169,12 +169,12 @@ TEST_F(IRTest, OneInconsistentOp)
// Inconsistent ops do not return a value
EXPECT_EQ(reply, "");
transport->CancelAllTimers();
transport.CancelAllTimers();
};
ClientSendNextInconsistent(upcall);
transport->Run();
transport.Run();
// By now, they all should have executed the last request.
for (int i = 0; i < config->n; i++) {
......@@ -189,10 +189,10 @@ TEST_F(IRTest, OneConsensusOp)
EXPECT_EQ(req, LastRequestOp());
EXPECT_EQ(reply, "1");
transport->CancelAllTimers();
transport.CancelAllTimers();
};
auto decide = [this](const std::set<string> &results) {
auto decide = [this](const std::map<string, std::size_t> &results) {
// shouldn't ever get called
EXPECT_FALSE(true);
......@@ -200,7 +200,7 @@ TEST_F(IRTest, OneConsensusOp)
};
ClientSendNextConsensus(upcall, decide);
transport->Run();
transport.Run();
// By now, they all should have executed the last request.
for (int i = 0; i < config->n; i++) {
......@@ -216,15 +216,15 @@ TEST_F(IRTest, Unlogged)
EXPECT_EQ(reply, "unlreply: "+LastRequestOp());
EXPECT_EQ(unloggedOps[1].back(), req);
transport->CancelAllTimers();
transport.CancelAllTimers();
};
int timeouts = 0;
auto timeout = [&](const string &req) {
auto timeout = [&](const string &req, ErrorCode) {
timeouts++;
};
ClientSendNextUnlogged(1, upcall, timeout);
transport->Run();
transport.Run();
for (unsigned int i = 0; i < iOps.size(); i++) {
EXPECT_EQ(0, iOps[i].size());
......@@ -237,15 +237,15 @@ TEST_F(IRTest, UnloggedTimeout)
{
auto upcall = [this](const string &req, const string &reply) {
FAIL();
transport->CancelAllTimers();
transport.CancelAllTimers();
};
int timeouts = 0;
auto timeout = [&](const string &req) {
auto timeout = [&](const string &req, ErrorCode) {
timeouts++;
};
// Drop messages to or from replica 1
transport->AddFilter(10, [](TransportReceiver *src, int srcIdx,
transport.AddFilter(10, [](TransportReceiver *src, int srcIdx,
TransportReceiver *dst, int dstIdx,
Message &m, uint64_t &delay) {
if ((srcIdx == 1) || (dstIdx == 1)) {
......@@ -255,12 +255,12 @@ TEST_F(IRTest, UnloggedTimeout)
});
// Run for 10 seconds
transport->Timer(10000, [&]() {
transport->CancelAllTimers();
transport.Timer(10000, [&]() {
transport.CancelAllTimers();
});
ClientSendNextUnlogged(1, upcall, timeout);
transport->Run();
transport.Run();
for (unsigned int i = 0; i < iOps.size(); i++) {
EXPECT_EQ(0, iOps[i].size());
......@@ -283,19 +283,18 @@ TEST_F(IRTest, UnloggedTimeout)
// if (requestNum < 9) {
// ClientSendNext(upcall);
// } else {
// transport->CancelAllTimers();
// transport.CancelAllTimers();
// }
// };
// ClientSendNext(upcall);
// transport->Run();
// transport.Run();
// // By now, they all should have executed the last request.
// for (int i = 0; i < config->n; i++) {
// EXPECT_EQ(10, ops[i].size());
// for (int j = 0; j < 10; j++) {
// EXPECT_EQ(RequestOp(j), ops[i][j]);
// EXPECT_EQ(RequestOp(j), ops[i][j]);
// }
// }
// }
......@@ -44,7 +44,7 @@ VRClient::VRClient(const transport::Configuration &config,
uint64_t clientid)
: Client(config, transport, clientid)
{
lastReqId = 0;
lastReqId = 0;
}
VRClient::~VRClient()
......@@ -56,8 +56,12 @@ VRClient::~VRClient()
void
VRClient::Invoke(const string &request,
continuation_t continuation)
continuation_t continuation,
error_continuation_t error_continuation)
{
// TODO: Currently, invocations never timeout and error_continuation is
// never called. It may make sense to set a timeout on the invocation.
(void) error_continuation;
uint64_t reqId = ++lastReqId;
Timeout *timer = new Timeout(transport, 500, [this, reqId]() {
......@@ -74,7 +78,7 @@ void
VRClient::InvokeUnlogged(int replicaIdx,
const string &request,
continuation_t continuation,
timeout_continuation_t timeoutContinuation,
error_continuation_t error_continuation,
uint32_t timeout)
{
uint64_t reqId = ++lastReqId;
......@@ -91,7 +95,7 @@ VRClient::InvokeUnlogged(int replicaIdx,
PendingUnloggedRequest *req =
new PendingUnloggedRequest(request, reqId,
continuation, timer,
timeoutContinuation);
error_continuation);
pendingReqs[reqId] = req;
req->timer->Start();
} else {
......@@ -107,10 +111,10 @@ VRClient::SendRequest(const PendingRequest *req)
reqMsg.mutable_req()->set_op(req->request);
reqMsg.mutable_req()->set_clientid(clientid);
reqMsg.mutable_req()->set_clientreqid(req->clientReqId);
//Debug("SENDING REQUEST: %lu %lu", clientid, pendingRequest->clientReqId);
// XXX Try sending only to (what we think is) the leader first
if (transport->SendMessageToAll(this, reqMsg)) {
if (transport->SendMessageToAll(this, reqMsg)) {
req->timer->Reset();
} else {
Warning("Could not send request to replicas.");
......@@ -139,7 +143,7 @@ VRClient::ReceiveMessage(const TransportAddress &remote,
{
proto::ReplyMessage reply;
proto::UnloggedReplyMessage unloggedReply;
if (type == reply.GetTypeName()) {
reply.ParseFromString(data);
HandleReply(remote, reply);
......@@ -175,14 +179,14 @@ VRClient::HandleUnloggedReply(const TransportAddress &remote,
const proto::UnloggedReplyMessage &msg)
{
uint64_t reqId = msg.clientreqid();
auto it = pendingReqs.find(reqId);
auto it = pendingReqs.find(reqId);
if (it == pendingReqs.end()) {
Debug("Received reply when no request was pending");
return;
}
PendingRequest *req = it->second;
Debug("Client received unloggedReply %lu", reqId);
req->timer->Stop();
pendingReqs.erase(it);
......@@ -193,7 +197,7 @@ VRClient::HandleUnloggedReply(const TransportAddress &remote,
void
VRClient::UnloggedRequestTimeoutCallback(const uint64_t reqId)
{
auto it = pendingReqs.find(reqId);
auto it = pendingReqs.find(reqId);
if (it == pendingReqs.end()) {
Debug("Received reply when no request was pending");
return;
......@@ -203,7 +207,9 @@ VRClient::UnloggedRequestTimeoutCallback(const uint64_t reqId)
static_cast<PendingUnloggedRequest *>(it->second);
req->timer->Stop();
pendingReqs.erase(it);
req->timeoutContinuation(req->request);
if (req->error_continuation) {
req->error_continuation(req->request, ErrorCode::TIMEOUT);
}
delete req;
}
......
......@@ -49,11 +49,12 @@ public:
uint64_t clientid = 0);
virtual ~VRClient();
virtual void Invoke(const string &request,
continuation_t continuation);
continuation_t continuation,
error_continuation_t error_continuation = nullptr);
virtual void InvokeUnlogged(int replicaIdx,
const string &request,
continuation_t continuation,
timeout_continuation_t timeoutContinuation = nullptr,
error_continuation_t error_continuation = nullptr,
uint32_t timeout = DEFAULT_UNLOGGED_OP_TIMEOUT);
virtual void ReceiveMessage(const TransportAddress &remote,
const string &type, const string &data);
......@@ -80,14 +81,14 @@ protected:
struct PendingUnloggedRequest : public PendingRequest
{
timeout_continuation_t timeoutContinuation;
error_continuation_t error_continuation;
inline PendingUnloggedRequest(string request,
uint64_t clientReqId,
continuation_t continuation,
Timeout *timer,
timeout_continuation_t timeoutContinuation)
error_continuation_t error_continuation)
: PendingRequest(request, clientReqId, continuation, timer),
timeoutContinuation(timeoutContinuation) { };
error_continuation(error_continuation) { };
};
std::unordered_map<uint64_t, PendingRequest *> pendingReqs;
......
This diff is collapsed.
This diff is collapsed.