From 061301e55f39b3cd7c766ff87a3d7fd18103d330 Mon Sep 17 00:00:00 2001 From: Irene Y Zhang <iyzhang@cs.washington.edu> Date: Tue, 23 Jun 2015 08:49:36 -0700 Subject: [PATCH] reorganizing and updating some VR code --- Makefile | 2 + lib/configuration.cc | 4 +- lib/configuration.h | 16 +- lib/latency-format.proto | 2 +- lib/latency.cc | 12 +- lib/latency.h | 4 +- lib/simtransport.cc | 6 +- lib/simtransport.h | 6 +- lib/tests/configuration-test.cc | 2 +- lib/tests/simtransport-test.cc | 6 +- lib/tests/simtransport-testmessage.proto | 2 +- lib/transport.h | 2 +- lib/transportcommon.h | 30 +- lib/udptransport.cc | 16 +- lib/udptransport.h | 14 +- replication/common/Rules.mk | 19 + replication/common/client.cc | 73 ++ replication/common/client.h | 76 ++ replication/common/log-impl.h | 87 ++ replication/common/log.cc | 222 +++++ replication/common/log.h | 119 +++ replication/common/quorumset.h | 119 +++ replication/common/replica-inl.h | 57 ++ replication/common/replica.cc | 98 ++ replication/common/replica.h | 100 +++ replication/common/request.proto | 13 + replication/common/tests/Rules.mk | 2 + replication/{vr => common}/viewstamp.h | 0 replication/vr/Rules.mk | 18 + replication/vr/client.cc | 242 +++++ replication/vr/client.h | 92 ++ replication/vr/replica.cc | 1039 ++++++++++++++++++++++ replication/vr/replica.h | 127 +++ replication/vr/tests/Rules.mk | 10 + replication/vr/tests/vr-test.cc | 568 ++++++++++++ replication/vr/vr-proto.proto | 94 ++ 36 files changed, 3238 insertions(+), 61 deletions(-) create mode 100644 replication/common/Rules.mk create mode 100644 replication/common/client.cc create mode 100644 replication/common/client.h create mode 100644 replication/common/log-impl.h create mode 100644 replication/common/log.cc create mode 100644 replication/common/log.h create mode 100644 replication/common/quorumset.h create mode 100644 replication/common/replica-inl.h create mode 100644 replication/common/replica.cc create mode 100644 replication/common/replica.h create mode 100644 replication/common/request.proto create mode 100644 replication/common/tests/Rules.mk rename replication/{vr => common}/viewstamp.h (100%) create mode 100644 replication/vr/Rules.mk create mode 100644 replication/vr/client.cc create mode 100644 replication/vr/client.h create mode 100644 replication/vr/replica.cc create mode 100644 replication/vr/replica.h create mode 100644 replication/vr/tests/Rules.mk create mode 100644 replication/vr/tests/vr-test.cc create mode 100644 replication/vr/vr-proto.proto diff --git a/Makefile b/Makefile index d0ccf0e..1a75342 100644 --- a/Makefile +++ b/Makefile @@ -122,6 +122,8 @@ $(foreach bin,$(1),$(eval LDFLAGS-$(bin) += $(2))) endef include lib/Rules.mk +include replication/common/Rules.mk +include replication/vr/Rules.mk ################################################################## # General rules diff --git a/lib/configuration.cc b/lib/configuration.cc index 08c7520..cd8edb7 100644 --- a/lib/configuration.cc +++ b/lib/configuration.cc @@ -38,7 +38,7 @@ #include <string> #include <string.h> -namespace specpaxos { +namespace transport { ReplicaAddress::ReplicaAddress(const string &host, const string &port) : host(host), port(port) @@ -213,4 +213,4 @@ Configuration::operator==(const Configuration &other) const return true; } -} // namespace specpaxos +} // namespace transport diff --git a/lib/configuration.h b/lib/configuration.h index c6b0055..6879dbc 100644 --- a/lib/configuration.h +++ b/lib/configuration.h @@ -32,7 +32,7 @@ #ifndef _LIB_CONFIGURATION_H_ #define _LIB_CONFIGURATION_H_ -#include "replication/vr/viewstamp.h" +#include "replication/common/viewstamp.h" #include <fstream> #include <stdbool.h> @@ -41,7 +41,7 @@ using std::string; -namespace specpaxos { +namespace transport { struct ReplicaAddress { @@ -82,12 +82,12 @@ private: bool hasMulticast; }; -} // namespace specpaxos +} // namespace replication namespace std { -template <> struct hash<specpaxos::ReplicaAddress> +template <> struct hash<transport::ReplicaAddress> { - size_t operator()(const specpaxos::ReplicaAddress & x) const + size_t operator()(const transport::ReplicaAddress & x) const { return hash<string>()(x.host) * 37 + hash<string>()(x.port); } @@ -95,15 +95,15 @@ template <> struct hash<specpaxos::ReplicaAddress> } namespace std { -template <> struct hash<specpaxos::Configuration> +template <> struct hash<transport::Configuration> { - size_t operator()(const specpaxos::Configuration & x) const + size_t operator()(const transport::Configuration & x) const { size_t out = 0; out = x.n * 37 + x.f; for (int i = 0; i < x.n; i++) { out *= 37; - out += hash<specpaxos::ReplicaAddress>()(x.replica(i)); + out += hash<transport::ReplicaAddress>()(x.replica(i)); } return out; } diff --git a/lib/latency-format.proto b/lib/latency-format.proto index 15bb74e..908e822 100644 --- a/lib/latency-format.proto +++ b/lib/latency-format.proto @@ -1,4 +1,4 @@ -package specpaxos.latency.format; +package transport.latency.format; message LatencyDist { diff --git a/lib/latency.cc b/lib/latency.cc index 1b6a533..823c666 100644 --- a/lib/latency.cc +++ b/lib/latency.cc @@ -328,10 +328,10 @@ Latency_FlushTo(const char *fname) { std::ofstream outfile(fname); Latency_t *l = latencyHead; - ::specpaxos::latency::format::LatencyFile out; + ::transport::latency::format::LatencyFile out; for (; l; l = l->next) { - ::specpaxos::latency::format::Latency lout; + ::transport::latency::format::Latency lout; Latency_Put(l, lout); *(out.add_latencies()) = lout; } @@ -356,14 +356,14 @@ Latency_Flush(void) } void -Latency_Put(Latency_t *l, ::specpaxos::latency::format::Latency &out) +Latency_Put(Latency_t *l, ::transport::latency::format::Latency &out) { out.Clear(); out.set_name(l->name); for (int i = 0; i < l->distPoolNext; ++i) { Latency_Dist_t *d = &l->distPool[i]; - ::specpaxos::latency::format::LatencyDist *outd = out.add_dists(); + ::transport::latency::format::LatencyDist *outd = out.add_dists(); outd->set_type(d->type); outd->set_min(d->min); outd->set_max(d->max); @@ -377,12 +377,12 @@ Latency_Put(Latency_t *l, ::specpaxos::latency::format::Latency &out) } bool -Latency_TryGet(const ::specpaxos::latency::format::Latency &in, Latency_t *l) +Latency_TryGet(const ::transport::latency::format::Latency &in, Latency_t *l) { LatencyInit(l, strdup(in.name().c_str())); // XXX Memory leak l->distPoolNext = in.dists_size(); for (int i = 0; i < l->distPoolNext; ++i) { - const ::specpaxos::latency::format::LatencyDist &ind = + const ::transport::latency::format::LatencyDist &ind = in.dists(i); Latency_Dist_t *d = &l->distPool[i]; d->type = ind.type(); diff --git a/lib/latency.h b/lib/latency.h index 4ffff98..af0a294 100644 --- a/lib/latency.h +++ b/lib/latency.h @@ -108,8 +108,8 @@ void Latency_FlushTo(const char *fname); void Latency_Flush(void); void Latency_Put(Latency_t *l, - ::specpaxos::latency::format::Latency &out); -bool Latency_TryGet(const ::specpaxos::latency::format::Latency &in, + ::transport::latency::format::Latency &out); +bool Latency_TryGet(const ::transport::latency::format::Latency &in, Latency_t *l); static inline void diff --git a/lib/simtransport.cc b/lib/simtransport.cc index 743dfa1..9cec2af 100644 --- a/lib/simtransport.cc +++ b/lib/simtransport.cc @@ -75,7 +75,7 @@ SimulatedTransport::~SimulatedTransport() void SimulatedTransport::Register(TransportReceiver *receiver, - const specpaxos::Configuration &config, + const transport::Configuration &config, int replicaIdx) { // Allocate an endpoint @@ -137,7 +137,7 @@ SimulatedTransport::SendMessageInternal(TransportReceiver *src, } SimulatedTransportAddress -SimulatedTransport::LookupAddress(const specpaxos::Configuration &cfg, +SimulatedTransport::LookupAddress(const transport::Configuration &cfg, int idx) { // Check every registered replica to see if its configuration and @@ -162,7 +162,7 @@ SimulatedTransport::LookupAddress(const specpaxos::Configuration &cfg, } const SimulatedTransportAddress * -SimulatedTransport::LookupMulticastAddress(const specpaxos::Configuration *cfg) +SimulatedTransport::LookupMulticastAddress(const transport::Configuration *cfg) { return NULL; } diff --git a/lib/simtransport.h b/lib/simtransport.h index 57e4ecf..e2ca6a5 100644 --- a/lib/simtransport.h +++ b/lib/simtransport.h @@ -67,7 +67,7 @@ public: SimulatedTransport(); ~SimulatedTransport(); void Register(TransportReceiver *receiver, - const specpaxos::Configuration &config, + const transport::Configuration &config, int replicaIdx); void Run(); void AddFilter(int id, filter_t filter); @@ -83,9 +83,9 @@ protected: bool multicast); SimulatedTransportAddress - LookupAddress(const specpaxos::Configuration &cfg, int idx); + LookupAddress(const transport::Configuration &cfg, int idx); const SimulatedTransportAddress * - LookupMulticastAddress(const specpaxos::Configuration *cfg); + LookupMulticastAddress(const transport::Configuration *cfg); private: struct QueuedMessage { diff --git a/lib/tests/configuration-test.cc b/lib/tests/configuration-test.cc index 5a9a18a..090372d 100644 --- a/lib/tests/configuration-test.cc +++ b/lib/tests/configuration-test.cc @@ -32,7 +32,7 @@ #include <gtest/gtest.h> -using namespace specpaxos; +using namespace transport; using std::vector; TEST(Configuration, Basic) diff --git a/lib/tests/simtransport-test.cc b/lib/tests/simtransport-test.cc index fbce3ad..83d852d 100644 --- a/lib/tests/simtransport-test.cc +++ b/lib/tests/simtransport-test.cc @@ -35,7 +35,7 @@ #include <gtest/gtest.h> -using namespace specpaxos::test; +using namespace transport::test; using ::google::protobuf::Message; @@ -67,11 +67,11 @@ TestReceiver::ReceiveMessage(const TransportAddress &src, class SimTransportTest : public testing::Test { protected: - std::vector<specpaxos::ReplicaAddress> replicaAddrs = + std::vector<transport::ReplicaAddress> replicaAddrs = { { "localhost", "12345" }, { "localhost", "12346" }, { "localhost", "12347" }}; - specpaxos::Configuration config{3, 1, replicaAddrs}; + transport::Configuration config{3, 1, replicaAddrs}; TestReceiver *receiver0; TestReceiver *receiver1; diff --git a/lib/tests/simtransport-testmessage.proto b/lib/tests/simtransport-testmessage.proto index e9820fc..0b0ddde 100644 --- a/lib/tests/simtransport-testmessage.proto +++ b/lib/tests/simtransport-testmessage.proto @@ -1,4 +1,4 @@ -package specpaxos.test; +package transport.test; message TestMessage { required string test = 1; diff --git a/lib/transport.h b/lib/transport.h index c725e57..32d769c 100644 --- a/lib/transport.h +++ b/lib/transport.h @@ -74,7 +74,7 @@ protected: public: virtual ~Transport() {} virtual void Register(TransportReceiver *receiver, - const specpaxos::Configuration &config, + const transport::Configuration &config, int replicaIdx) = 0; virtual bool SendMessage(TransportReceiver *src, const TransportAddress &dst, const Message &m) = 0; diff --git a/lib/transportcommon.h b/lib/transportcommon.h index 7d17590..e32440f 100644 --- a/lib/transportcommon.h +++ b/lib/transportcommon.h @@ -70,7 +70,7 @@ public: SendMessageToReplica(TransportReceiver *src, int replicaIdx, const Message &m) { - const specpaxos::Configuration *cfg = configurations[src]; + const transport::Configuration *cfg = configurations[src]; ASSERT(cfg != NULL); if (!replicaAddressesInitialized) { @@ -86,7 +86,7 @@ public: virtual bool SendMessageToAll(TransportReceiver *src, const Message &m) { - const specpaxos::Configuration *cfg = configurations[src]; + const transport::Configuration *cfg = configurations[src]; ASSERT(cfg != NULL); if (!replicaAddressesInitialized) { @@ -117,25 +117,25 @@ protected: const ADDR &dst, const Message &m, bool multicast = false) = 0; - virtual ADDR LookupAddress(const specpaxos::Configuration &cfg, + virtual ADDR LookupAddress(const transport::Configuration &cfg, int replicaIdx) = 0; virtual const ADDR * - LookupMulticastAddress(const specpaxos::Configuration *cfg) = 0; + LookupMulticastAddress(const transport::Configuration *cfg) = 0; - std::unordered_map<specpaxos::Configuration, - specpaxos::Configuration *> canonicalConfigs; + std::unordered_map<transport::Configuration, + transport::Configuration *> canonicalConfigs; std::map<TransportReceiver *, - specpaxos::Configuration *> configurations; - std::map<const specpaxos::Configuration *, + transport::Configuration *> configurations; + std::map<const transport::Configuration *, std::map<int, ADDR> > replicaAddresses; - std::map<const specpaxos::Configuration *, + std::map<const transport::Configuration *, std::map<int, TransportReceiver *> > replicaReceivers; - std::map<const specpaxos::Configuration *, ADDR> multicastAddresses; + std::map<const transport::Configuration *, ADDR> multicastAddresses; bool replicaAddressesInitialized; - virtual specpaxos::Configuration * + virtual transport::Configuration * RegisterConfiguration(TransportReceiver *receiver, - const specpaxos::Configuration &config, + const transport::Configuration &config, int replicaIdx) { ASSERT(receiver != NULL); @@ -144,10 +144,10 @@ protected: // pointer to the canonical copy; if not, create one. This // allows us to use that pointer as a key in various // structures. - specpaxos::Configuration *canonical + transport::Configuration *canonical = canonicalConfigs[config]; if (canonical == NULL) { - canonical = new specpaxos::Configuration(config); + canonical = new transport::Configuration(config); canonicalConfigs[config] = canonical; } @@ -177,7 +177,7 @@ protected: // For every configuration, look up all addresses and cache // them. for (auto &kv : canonicalConfigs) { - specpaxos::Configuration *cfg = kv.second; + transport::Configuration *cfg = kv.second; for (int i = 0; i < cfg->n; i++) { const ADDR addr = LookupAddress(*cfg, i); diff --git a/lib/udptransport.cc b/lib/udptransport.cc index dfc7384..144be69 100644 --- a/lib/udptransport.cc +++ b/lib/udptransport.cc @@ -83,7 +83,7 @@ bool operator<(const UDPTransportAddress &a, const UDPTransportAddress &b) } UDPTransportAddress -UDPTransport::LookupAddress(const specpaxos::ReplicaAddress &addr) +UDPTransport::LookupAddress(const replication::ReplicaAddress &addr) { int res; struct addrinfo hints; @@ -106,15 +106,15 @@ UDPTransport::LookupAddress(const specpaxos::ReplicaAddress &addr) } UDPTransportAddress -UDPTransport::LookupAddress(const specpaxos::Configuration &config, +UDPTransport::LookupAddress(const replication::Configuration &config, int idx) { - const specpaxos::ReplicaAddress &addr = config.replica(idx); + const replication::ReplicaAddress &addr = config.replica(idx); return LookupAddress(addr); } const UDPTransportAddress * -UDPTransport::LookupMulticastAddress(const specpaxos::Configuration +UDPTransport::LookupMulticastAddress(const replication::Configuration *config) { if (!config->multicast()) { @@ -232,7 +232,7 @@ UDPTransport::~UDPTransport() } void -UDPTransport::ListenOnMulticastPort(const specpaxos::Configuration +UDPTransport::ListenOnMulticastPort(const replication::Configuration *canonicalConfig) { if (!canonicalConfig->multicast()) { @@ -298,13 +298,13 @@ UDPTransport::ListenOnMulticastPort(const specpaxos::Configuration void UDPTransport::Register(TransportReceiver *receiver, - const specpaxos::Configuration &config, + const replication::Configuration &config, int replicaIdx) { ASSERT(replicaIdx < config.n); struct sockaddr_in sin; - const specpaxos::Configuration *canonicalConfig = + const replication::Configuration *canonicalConfig = RegisterConfiguration(receiver, config, replicaIdx); // Create socket @@ -624,7 +624,7 @@ UDPTransport::OnReadable(int fd) // If so, deliver the message to all replicas for that // config, *except* if that replica was the sender of the // message. - const specpaxos::Configuration *cfg = it->second; + const replication::Configuration *cfg = it->second; for (auto &kv : replicaReceivers[cfg]) { TransportReceiver *receiver = kv.second; const UDPTransportAddress &raddr = diff --git a/lib/udptransport.h b/lib/udptransport.h index a0b01db..6a52f56 100644 --- a/lib/udptransport.h +++ b/lib/udptransport.h @@ -69,7 +69,7 @@ public: int dscp = 0, event_base *evbase = nullptr); virtual ~UDPTransport(); void Register(TransportReceiver *receiver, - const specpaxos::Configuration &config, + const replication::Configuration &config, int replicaIdx); void Run(); void Stop(); @@ -106,8 +106,8 @@ private: std::vector<event *> signalEvents; std::map<int, TransportReceiver*> receivers; // fd -> receiver std::map<TransportReceiver*, int> fds; // receiver -> fd - std::map<const specpaxos::Configuration *, int> multicastFds; - std::map<int, const specpaxos::Configuration *> multicastConfigs; + std::map<const replication::Configuration *, int> multicastFds; + std::map<int, const replication::Configuration *> multicastConfigs; int lastTimerId; std::map<int, UDPTransportTimerInfo *> timers; uint64_t lastFragMsgId; @@ -122,13 +122,13 @@ private: const UDPTransportAddress &dst, const Message &m, bool multicast = false); UDPTransportAddress - LookupAddress(const specpaxos::ReplicaAddress &addr); + LookupAddress(const replication::ReplicaAddress &addr); UDPTransportAddress - LookupAddress(const specpaxos::Configuration &cfg, + LookupAddress(const replication::Configuration &cfg, int replicaIdx); const UDPTransportAddress * - LookupMulticastAddress(const specpaxos::Configuration *cfg); - void ListenOnMulticastPort(const specpaxos::Configuration + LookupMulticastAddress(const replication::Configuration *cfg); + void ListenOnMulticastPort(const replication::Configuration *canonicalConfig); void OnReadable(int fd); void OnTimer(UDPTransportTimerInfo *info); diff --git a/replication/common/Rules.mk b/replication/common/Rules.mk new file mode 100644 index 0000000..e5e7a93 --- /dev/null +++ b/replication/common/Rules.mk @@ -0,0 +1,19 @@ +d := $(dir $(lastword $(MAKEFILE_LIST))) + +SRCS += $(addprefix $(d), \ + client.cc replica.cc log.cc) + +PROTOS += $(addprefix $(d), \ + request.proto) + +LIB-request := $(o)request.o + +OBJS-client := $(o)client.o \ + $(LIB-message) $(LIB-configuration) $(LIB-transport) \ + $(LIB-request) + +OBJS-replica := $(o)replica.o $(o)log.o \ + $(LIB-message) $(LIB-request) \ + $(LIB-configuration) $(LIB-udptransport) + +include $(d)tests/Rules.mk diff --git a/replication/common/client.cc b/replication/common/client.cc new file mode 100644 index 0000000..76c3e36 --- /dev/null +++ b/replication/common/client.cc @@ -0,0 +1,73 @@ +// -*- mode: c++; c-file-style: "k&r"; c-basic-offset: 4 -*- +/*********************************************************************** + * + * client.cc: + * interface to replication client stubs + * + * Copyright 2013 Dan R. K. Ports <drkp@cs.washington.edu> + * + * Permission is hereby granted, free of charge, to any person + * obtaining a copy of this software and associated documentation + * files (the "Software"), to deal in the Software without + * restriction, including without limitation the rights to use, copy, + * modify, merge, publish, distribute, sublicense, and/or sell copies + * of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be + * included in all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, + * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF + * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND + * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS + * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN + * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN + * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + * + **********************************************************************/ + +#include "replication/common/client.h" +#include "replication/common/request.pb.h" +#include "lib/message.h" +#include "lib/transport.h" + +#include <random> + +namespace replication { + +Client::Client(const Configuration &config, Transport *transport, + uint64_t clientid) + : config(config), transport(transport) +{ + this->clientid = clientid; + + // Randomly generate a client ID + // This is surely not the fastest way to get a random 64-bit int, + // but it should be fine for this purpose. + while (this->clientid == 0) { + std::random_device rd; + std::mt19937_64 gen(rd()); + std::uniform_int_distribution<uint64_t> dis; + this->clientid = dis(gen); + Debug("VRClient ID: %lu", this->clientid); + } + + transport->Register(this, config, -1); +} + +Client::~Client() +{ + +} + +void +Client::ReceiveMessage(const TransportAddress &remote, + const string &type, const string &data) +{ + Panic("Received unexpected message type: %s", + type.c_str()); +} + +} // namespace replication diff --git a/replication/common/client.h b/replication/common/client.h new file mode 100644 index 0000000..9ff6cec --- /dev/null +++ b/replication/common/client.h @@ -0,0 +1,76 @@ +// -*- mode: c++; c-file-style: "k&r"; c-basic-offset: 4 -*- +/*********************************************************************** + * + * client.h: + * interface to replication client stubs + * + * Copyright 2013-2015 Irene Zhang <iyzhang@cs.washington.edu> + * Naveen Kr. Sharma <nksharma@cs.washington.edu> + * Dan R. K. Ports <drkp@cs.washington.edu> + * + * Permission is hereby granted, free of charge, to any person + * obtaining a copy of this software and associated documentation + * files (the "Software"), to deal in the Software without + * restriction, including without limitation the rights to use, copy, + * modify, merge, publish, distribute, sublicense, and/or sell copies + * of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be + * included in all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, + * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF + * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND + * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS + * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN + * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN + * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + * + **********************************************************************/ + +#ifndef _COMMON_CLIENT_H_ +#define _COMMON_CLIENT_H_ + +#include "lib/configuration.h" +#include "replication/common/request.pb.h" +#include "lib/transport.h" + + +#include <functional> + +namespace replication { + +class Client : public TransportReceiver +{ +public: + typedef std::function<void (const string &, const string &)> continuation_t; + typedef std::function<void (const string &)> timeout_continuation_t; + + static const uint32_t DEFAULT_UNLOGGED_OP_TIMEOUT = 1000; // milliseconds + + Client(const transport::Configuration &config, Transport *transport, + uint64_t clientid = 0); + virtual ~Client(); + virtual void Invoke(const string &request, + continuation_t continuation) = 0; + virtual void InvokeUnlogged(int replicaIdx, + const string &request, + continuation_t continuation, + timeout_continuation_t timeoutContinuation = nullptr, + uint32_t timeout = DEFAULT_UNLOGGED_OP_TIMEOUT) = 0; + virtual void ReceiveMessage(const TransportAddress &remote, + const string &type, + const string &data); + +protected: + transport::Configuration config; + Transport *transport; + + uint64_t clientid; +}; + +} // namespace replication + +#endif /* _COMMON_CLIENT_H_ */ diff --git a/replication/common/log-impl.h b/replication/common/log-impl.h new file mode 100644 index 0000000..c1d874c --- /dev/null +++ b/replication/common/log-impl.h @@ -0,0 +1,87 @@ +// -*- mode: c++; c-file-style: "k&r"; c-basic-offset: 4 -*- +/*********************************************************************** + * + * log.h: + * a replica's log of pending and committed operations + * + * Copyright 2013-2015 Irene Zhang <iyzhang@cs.washington.edu> + * Naveen Kr. Sharma <nksharma@cs.washington.edu> + * Dan R. K. Ports <drkp@cs.washington.edu> + * + * Permission is hereby granted, free of charge, to any person + * obtaining a copy of this software and associated documentation + * files (the "Software"), to deal in the Software without + * restriction, including without limitation the rights to use, copy, + * modify, merge, publish, distribute, sublicense, and/or sell copies + * of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be + * included in all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, + * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF + * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND + * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS + * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN + * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN + * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + * + **********************************************************************/ + +#ifndef _COMMON_LOG_IMPL_H_ +#define _COMMON_LOG_IMPL_H_ + +template <class T> void +Log::Dump(opnum_t from, T out) +{ + for (opnum_t i = std::max(from, start); + i <= LastOpnum(); i++) { + + const LogEntry *entry = Find(i); + ASSERT(entry != NULL); + + auto elem = out->Add(); + elem->set_view(entry->viewstamp.view); + elem->set_opnum(entry->viewstamp.opnum); + elem->set_state(entry->state); + elem->set_hash(entry->hash); + *(elem->mutable_request()) = entry->request; + } +} + +template <class iter> void +Log::Install(iter start, iter end) +{ + // Find the first divergence in the log + iter it = start; + for (it = start; it != end; it++) { + const LogEntry *oldEntry = Find(it->opnum()); + if (oldEntry == NULL) { + break; + } + if (it->view() != oldEntry->viewstamp.view) { + RemoveAfter(it->opnum()); + break; + } + } + + if (it == end) { + // We didn't find a divergence. This means that the logs + // should be identical. If the existing log is longer, + // something is wrong. +// it--; +// ASSERT(it->opnum() == lastViewstamp.opnum); +// ASSERT(it->view() == lastViewstamp.view); +// ASSERT(Find(it->opnum()+1) == NULL); + } + + // Install the new log entries + for (; it != end; it++) { + viewstamp_t vs = { it->view(), it->opnum() }; + Append(vs, it->request(), LOG_STATE_PREPARED); + } +} + +#endif /* _COMMON_LOG_IMPL_H_ */ diff --git a/replication/common/log.cc b/replication/common/log.cc new file mode 100644 index 0000000..70c4e2c --- /dev/null +++ b/replication/common/log.cc @@ -0,0 +1,222 @@ +// -*- mode: c++; c-file-style: "k&r"; c-basic-offset: 4 -*- +/*********************************************************************** + * + * log.h: + * a replica's log of pending and committed operations + * + * Copyright 2013 Dan R. K. Ports <drkp@cs.washington.edu> + * + * Permission is hereby granted, free of charge, to any person + * obtaining a copy of this software and associated documentation + * files (the "Software"), to deal in the Software without + * restriction, including without limitation the rights to use, copy, + * modify, merge, publish, distribute, sublicense, and/or sell copies + * of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be + * included in all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, + * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF + * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND + * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS + * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN + * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN + * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + * + **********************************************************************/ + +#include "replicationcommon/log.h" +#include "replication/common/request.pb.h" +#include "lib/assert.h" + +#include <openssl/sha.h> + +namespace replication { + +const string Log::EMPTY_HASH = string(SHA_DIGEST_LENGTH, '\0'); + +Log::Log(bool useHash, opnum_t start, string initialHash) + : useHash(useHash) +{ + this->initialHash = initialHash; + this->start = start; + if (start == 1) { + ASSERT(initialHash == EMPTY_HASH); + } +} + + +LogEntry & +Log::Append(viewstamp_t vs, const Request &req, LogEntryState state) +{ + if (entries.empty()) { + ASSERT(vs.opnum == start); + } else { + ASSERT(vs.opnum == LastOpnum()+1); + } + + LogEntry entry; + entry.viewstamp = vs; + entry.request = req; + entry.state = state; + if (useHash) { + entry.hash = ComputeHash(LastHash(), entry); + } + + entries.push_back(entry); + return *Find(vs.opnum); +} + +// This really ought to be const +LogEntry * +Log::Find(opnum_t opnum) +{ + if (entries.empty()) { + return NULL; + } + + if (opnum < start) { + return NULL; + } + + if (opnum-start > entries.size()-1) { + return NULL; + } + + LogEntry *entry = &entries[opnum-start]; + ASSERT(entry->viewstamp.opnum == opnum); + return entry; +} + + +bool +Log::SetStatus(opnum_t op, LogEntryState state) +{ + LogEntry *entry = Find(op); + if (entry == NULL) { + return false; + } + + entry->state = state; + return true; +} + +bool +Log::SetRequest(opnum_t op, const Request &req) +{ + if (useHash) { + Panic("Log::SetRequest on hashed log not supported."); + } + + LogEntry *entry = Find(op); + if (entry == NULL) { + return false; + } + + entry->request = req; + return true; +} + +void +Log::RemoveAfter(opnum_t op) +{ +#if PARANOID + // We'd better not be removing any committed entries. + for (opnum_t i = op; i <= LastOpnum(); i++) { + ASSERT(Find(i)->state != LOG_STATE_COMMITTED); + } +#endif + + if (op > LastOpnum()) { + return; + } + + Debug("Removing log entries after " FMT_OPNUM, op); + + ASSERT(op-start < entries.size()); + entries.resize(op-start); + + ASSERT(LastOpnum() == op-1); +} + +LogEntry * +Log::Last() +{ + if (entries.empty()) { + return NULL; + } + + return &entries.back(); +} + +viewstamp_t +Log::LastViewstamp() const +{ + if (entries.empty()) { + return viewstamp_t(0, start-1); + } else { + return entries.back().viewstamp; + } +} + +opnum_t +Log::LastOpnum() const +{ + if (entries.empty()) { + return start-1; + } else { + return entries.back().viewstamp.opnum; + } +} + +opnum_t +Log::FirstOpnum() const +{ + // XXX Not really sure what's appropriate to return here if the + // log is empty + return start; +} + +bool +Log::Empty() const +{ + return entries.empty(); +} + +const string & +Log::LastHash() const +{ + if (entries.empty()) { + return initialHash; + } else { + return entries.back().hash; + } +} + +string +Log::ComputeHash(string lastHash, const LogEntry &entry) +{ + SHA_CTX ctx; + unsigned char out[SHA_DIGEST_LENGTH]; + + SHA1_Init(&ctx); + + SHA1_Update(&ctx, lastHash.c_str(), lastHash.size()); + SHA1_Update(&ctx, &entry.viewstamp, sizeof(entry.viewstamp)); + uint64_t x; + x = entry.request.clientid(); + SHA1_Update(&ctx, &x, sizeof(x)); + x = entry.request.clientreqid(); + SHA1_Update(&ctx, &x, sizeof(x)); + SHA1_Update(&ctx, entry.request.op().c_str(), + entry.request.op().size()); + + SHA1_Final(out, &ctx); + + return string((char *)out, SHA_DIGEST_LENGTH); +} + +} // namespace replication diff --git a/replication/common/log.h b/replication/common/log.h new file mode 100644 index 0000000..7aaf059 --- /dev/null +++ b/replication/common/log.h @@ -0,0 +1,119 @@ +// -*- mode: c++; c-file-style: "k&r"; c-basic-offset: 4 -*- +/*********************************************************************** + * + * log.h: + * a replica's log of pending and committed operations + * + * Copyright 2013 Dan R. K. Ports <drkp@cs.washington.edu> + * + * Permission is hereby granted, free of charge, to any person + * obtaining a copy of this software and associated documentation + * files (the "Software"), to deal in the Software without + * restriction, including without limitation the rights to use, copy, + * modify, merge, publish, distribute, sublicense, and/or sell copies + * of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be + * included in all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, + * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF + * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND + * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS + * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN + * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN + * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + * + **********************************************************************/ + +#ifndef _COMMON_LOG_H_ +#define _COMMON_LOG_H_ + +#include "replication/common/request.pb.h" +#include "lib/assert.h" +#include "lib/message.h" +#include "lib/transport.h" +#include "replication/common/viewstamp.h" + +#include <map> +#include <google/protobuf/message.h> + +namespace replication { + +enum LogEntryState { + LOG_STATE_COMMITTED, + LOG_STATE_PREPARED, + LOG_STATE_SPECULATIVE, // specpaxos only + LOG_STATE_FASTPREPARED // fastpaxos only +}; + +struct LogEntry +{ + viewstamp_t viewstamp; + LogEntryState state; + Request request; + string hash; + // Speculative client table stuff + opnum_t prevClientReqOpnum; + ::google::protobuf::Message *replyMessage; + + LogEntry() { replyMessage = NULL; } + LogEntry(const LogEntry &x) + : viewstamp(x.viewstamp), state(x.state), request(x.request), + hash(x.hash), prevClientReqOpnum(x.prevClientReqOpnum) + { + if (x.replyMessage) { + replyMessage = x.replyMessage->New(); + replyMessage->CopyFrom(*x.replyMessage); + } else { + replyMessage = NULL; + } + } + LogEntry(viewstamp_t viewstamp, LogEntryState state, + const Request &request, const string &hash) + : viewstamp(viewstamp), state(state), request(request), + hash(hash), replyMessage(NULL) { } + virtual ~LogEntry() + { + if (replyMessage) { + delete replyMessage; + } + } +}; + +class Log +{ +public: + Log(bool useHash, opnum_t start = 1, string initialHash = EMPTY_HASH); + LogEntry & Append(viewstamp_t vs, const Request &req, LogEntryState state); + LogEntry * Find(opnum_t opnum); + bool SetStatus(opnum_t opnum, LogEntryState state); + bool SetRequest(opnum_t op, const Request &req); + void RemoveAfter(opnum_t opnum); + LogEntry * Last(); + viewstamp_t LastViewstamp() const; // deprecated + opnum_t LastOpnum() const; + opnum_t FirstOpnum() const; + bool Empty() const; + template <class T> void Dump(opnum_t from, T out); + template <class iter> void Install(iter start, iter end); + const string &LastHash() const; + + static string ComputeHash(string lastHash, const LogEntry &entry); + static const string EMPTY_HASH; + + +private: + std::vector<LogEntry> entries; + string initialHash; + opnum_t start; + bool useHash; +}; + +#include "replication/common/log-impl.h" + +} // namespace replication + +#endif /* _COMMON_LOG_H_ */ diff --git a/replication/common/quorumset.h b/replication/common/quorumset.h new file mode 100644 index 0000000..cd17d91 --- /dev/null +++ b/replication/common/quorumset.h @@ -0,0 +1,119 @@ +// -*- mode: c++; c-file-style: "k&r"; c-basic-offset: 4 -*- +/*********************************************************************** + * + * quorumset.h: + * utility type for tracking sets of messages received from other + * replicas and determining whether a quorum of responses has been met + * + * Copyright 2013-2015 Irene Zhang <iyzhang@cs.washington.edu> + * Naveen Kr. Sharma <nksharma@cs.washington.edu> + * Dan R. K. Ports <drkp@cs.washington.edu> + * + * Permission is hereby granted, free of charge, to any person + * obtaining a copy of this software and associated documentation + * files (the "Software"), to deal in the Software without + * restriction, including without limitation the rights to use, copy, + * modify, merge, publish, distribute, sublicense, and/or sell copies + * of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be + * included in all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, + * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF + * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND + * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS + * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN + * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN + * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + * + **********************************************************************/ + +#ifndef _COMMON_QUORUMSET_H_ +#define _COMMON_QUORUMSET_H_ + +namespace replication { + +template <class IDTYPE, class MSGTYPE> +class QuorumSet +{ +public: + QuorumSet(int numRequired) + : numRequired(numRequired) + { + + } + + void + Clear() + { + messages.clear(); + } + + void + Clear(IDTYPE vs) + { + std::map<int, MSGTYPE> &vsmessages = messages[vs]; + vsmessages.clear(); + } + + int + NumRequired() const + { + return numRequired; + } + + const std::map<int, MSGTYPE> & + GetMessages(IDTYPE vs) + { + return messages[vs]; + } + + const std::map<int, MSGTYPE> * + CheckForQuorum(IDTYPE vs) + { + std::map<int, MSGTYPE> &vsmessages = messages[vs]; + int count = vsmessages.size(); + if (count >= numRequired) { + return &vsmessages; + } else { + return NULL; + } + } + + const std::map<int, MSGTYPE> * + AddAndCheckForQuorum(IDTYPE vs, int replicaIdx, const MSGTYPE &msg) + { + std::map<int, MSGTYPE> &vsmessages = messages[vs]; + if (vsmessages.find(replicaIdx) != vsmessages.end()) { + // This is a duplicate message + + // But we'll ignore that, replace the old message from + // this replica, and proceed. + // + // XXX Is this the right thing to do? It is for + // speculative replies in SpecPaxos... + } + + vsmessages[replicaIdx] = msg; + + return CheckForQuorum(vs); + } + + void + Add(IDTYPE vs, int replicaIdx, const MSGTYPE &msg) + { + AddAndCheckForQuorum(vs, replicaIdx, msg); + } + +public: + int numRequired; +private: + std::map<IDTYPE, std::map<int, MSGTYPE> > messages; +}; + +} // namespace replication + +#endif // _COMMON_QUORUMSET_H_ diff --git a/replication/common/replica-inl.h b/replication/common/replica-inl.h new file mode 100644 index 0000000..430f18a --- /dev/null +++ b/replication/common/replica-inl.h @@ -0,0 +1,57 @@ +// -*- mode: c++; c-file-style: "k&r"; c-basic-offset: 4 -*- +/*********************************************************************** + * + * replica-inl.h: + * inline/template functions for common replica interface + * + * Copyright 2013 Dan R. K. Ports <drkp@cs.washington.edu> + * + * Permission is hereby granted, free of charge, to any person + * obtaining a copy of this software and associated documentation + * files (the "Software"), to deal in the Software without + * restriction, including without limitation the rights to use, copy, + * modify, merge, publish, distribute, sublicense, and/or sell copies + * of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be + * included in all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, + * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF + * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND + * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS + * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN + * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN + * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + * + **********************************************************************/ + +#ifndef _COMMON_REPLICA_INL_H_ +#define _COMMON_REPLICA_INL_H_ + +template<class MSG> +void +Replica::Execute(opnum_t opnum, + const Request &msg, + MSG &reply) +{ + string res; + ReplicaUpcall(opnum, msg.op(), res); + + reply.set_reply(res); +} + +template<class MSG> +void +Replica::ExecuteUnlogged(const UnloggedRequest &msg, + MSG &reply) +{ + string res; + UnloggedUpcall(msg.op(), res); + + reply.set_reply(res); +} + +#endif // _COMMON_REPLICA_INL_H_ diff --git a/replication/common/replica.cc b/replication/common/replica.cc new file mode 100644 index 0000000..281c31a --- /dev/null +++ b/replication/common/replica.cc @@ -0,0 +1,98 @@ +// -*- mode: c++; c-file-style: "k&r"; c-basic-offset: 4 -*- +/*********************************************************************** + * + * replica.cc: + * common functions for replica implementation regardless of + * replication protocol + * + * Copyright 2013-2015 Irene Zhang <iyzhang@cs.washington.edu> + * Naveen Kr. Sharma <nksharma@cs.washington.edu> + * Dan R. K. Ports <drkp@cs.washington.edu> + * + * Permission is hereby granted, free of charge, to any person + * obtaining a copy of this software and associated documentation + * files (the "Software"), to deal in the Software without + * restriction, including without limitation the rights to use, copy, + * modify, merge, publish, distribute, sublicense, and/or sell copies + * of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be + * included in all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, + * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF + * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND + * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS + * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN + * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN + * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + * + **********************************************************************/ + +#include "replication/common/log.h" +#include "replication/common/replica.h" + +#include "replication/lib/message.h" + +#include <stdlib.h> + +namespace replication { + +Replica::Replica(const transport::Configuration &configuration, int myIdx, + Transport *transport, AppReplica *app) + : configuration(configuration), myIdx(myIdx), + transport(transport), app(app) +{ + transport->Register(this, configuration, myIdx); +} + +Replica::~Replica() +{ + +} + +void +Replica::LeaderUpcall(opnum_t opnum, const string &op, bool &replicate, string &res) +{ + app->LeaderUpcall(opnum, op, replicate, res); +} + +void +Replica::ReplicaUpcall(opnum_t opnum, const string &op, string &res) +{ + Debug("Making upcall for operation %s", op.c_str()); + app->ReplicaUpcall(opnum, op, res); + + Debug("Upcall result: %s", res.c_str()); +} + +void +Replica::Rollback(opnum_t current, opnum_t to, Log &log) +{ + Debug("Making rollback-upcall from " FMT_OPNUM " to " FMT_OPNUM, + current, to); + + std::map<opnum_t, string> reqs; + for (opnum_t x = current; x > to; x--) { + reqs.insert(std::pair<opnum_t, string>(x, + log.Find(x)->request.op())); + } + + app->RollbackUpcall(current, to, reqs); +} + +void +Replica::Commit(opnum_t op) +{ + app->CommitUpcall(op); +} + +void +Replica::UnloggedUpcall(const string &op, string &res) +{ + app->UnloggedUpcall(op, res); +} + +} // namespace replication diff --git a/replication/common/replica.h b/replication/common/replica.h new file mode 100644 index 0000000..6d78e62 --- /dev/null +++ b/replication/common/replica.h @@ -0,0 +1,100 @@ +// -*- mode: c++; c-file-style: "k&r"; c-basic-offset: 4 -*- +/*********************************************************************** + * + * replica.h: + * interface to different vr protocols + * + * Copyright 2013-2015 Irene Zhang <iyzhang@cs.washington.edu> + * Naveen Kr. Sharma <nksharma@cs.washington.edu> + * Dan R. K. Ports <drkp@cs.washington.edu> + * + * Permission is hereby granted, free of charge, to any person + * obtaining a copy of this software and associated documentation + * files (the "Software"), to deal in the Software without + * restriction, including without limitation the rights to use, copy, + * modify, merge, publish, distribute, sublicense, and/or sell copies + * of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be + * included in all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, + * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF + * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND + * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS + * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN + * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN + * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + * + **********************************************************************/ + +#ifndef _COMMON_REPLICA_H_ +#define _COMMON_REPLICA_H_ + + +#include "lib/configuration.h" +#include "replication/common/log.h" +#include "replication/common/request.pb.h" +#include "lib/transport.h" +#include "replication/common/viewstamp.h" + +namespace replication { + +class Replica; + +enum ReplicaStatus { + STATUS_NORMAL, + STATUS_VIEW_CHANGE, + STATUS_RECOVERING +}; + +class AppReplica +{ +public: + AppReplica() { }; + virtual ~AppReplica() { }; + // Invoke callback on the leader, with the option to replicate on success + virtual void LeaderUpcall(opnum_t opnum, const string &str1, bool &replicate, string &str2) { replicate = true; str2 = str1; }; + // Invoke callback on all replicas + virtual void ReplicaUpcall(opnum_t opnum, const string &str1, string &str2) { }; + // Rollback callback on failed speculative operations + virtual void RollbackUpcall(opnum_t current, opnum_t to, const std::map<opnum_t, string> &opMap) { }; + // Commit callback to commit speculative operations + virtual void CommitUpcall(opnum_t) { }; + // Invoke call back for unreplicated operations run on only one replica + virtual void UnloggedUpcall(const string &str1, string &str2) { }; +}; + +class Replica : public TransportReceiver +{ +public: + Replica(const transport::Configuration &config, int myIdx, Transport *transport, AppReplica *app); + virtual ~Replica(); + +protected: + void LeaderUpcall(opnum_t opnum, const string &op, bool &replicate, string &res); + void ReplicaUpcall(opnum_t opnum, const string &op, string &res); + template<class MSG> void Execute(opnum_t opnum, + const Request & msg, + MSG &reply); + void Rollback(opnum_t current, opnum_t to, Log &log); + void Commit(opnum_t op); + void UnloggedUpcall(const string &op, string &res); + template<class MSG> void ExecuteUnlogged(const UnloggedRequest & msg, + MSG &reply); + +protected: + transport::Configuration configuration; + int myIdx; + Transport *transport; + AppReplica *app; + ReplicaStatus status; +}; + +#include "replica-inl.h" + +} // namespace replication + +#endif /* _COMMON_REPLICA_H */ diff --git a/replication/common/request.proto b/replication/common/request.proto new file mode 100644 index 0000000..9a76ec1 --- /dev/null +++ b/replication/common/request.proto @@ -0,0 +1,13 @@ +package replication; + +message Request { + required bytes op = 1; + required uint64 clientid = 2; + required uint64 clientreqid = 3; +} + +message UnloggedRequest { + required bytes op = 1; + required uint64 clientid = 2; + required uint64 clientreqid = 3; +} diff --git a/replication/common/tests/Rules.mk b/replication/common/tests/Rules.mk new file mode 100644 index 0000000..f58d1d4 --- /dev/null +++ b/replication/common/tests/Rules.mk @@ -0,0 +1,2 @@ +d := $(dir $(lastword $(MAKEFILE_LIST))) + diff --git a/replication/vr/viewstamp.h b/replication/common/viewstamp.h similarity index 100% rename from replication/vr/viewstamp.h rename to replication/common/viewstamp.h diff --git a/replication/vr/Rules.mk b/replication/vr/Rules.mk new file mode 100644 index 0000000..f9582d1 --- /dev/null +++ b/replication/vr/Rules.mk @@ -0,0 +1,18 @@ +d := $(dir $(lastword $(MAKEFILE_LIST))) + +SRCS += $(addprefix $(d), \ + replica.cc client.cc) + +PROTOS += $(addprefix $(d), \ + vr-proto.proto) + +OBJS-vr-client := $(o)client.o $(o)vr-proto.o \ + $(OBJS-client) $(LIB-message) \ + $(LIB-configuration) + +OBJS-vr-replica := $(o)replica.o $(o)vr-proto.o \ + $(OBJS-replica) $(LIB-message) \ + $(LIB-configuration) + +include $(d)tests/Rules.mk + diff --git a/replication/vr/client.cc b/replication/vr/client.cc new file mode 100644 index 0000000..8f68cf2 --- /dev/null +++ b/replication/vr/client.cc @@ -0,0 +1,242 @@ + // -*- mode: c++; c-file-style: "k&r"; c-basic-offset: 4 -*- + /*********************************************************************** + * + * vr/client.cc: + * Viewstamped Replication clinet + * + * Copyright 2013 Dan R. K. Ports <drkp@cs.washington.edu> + * + * Permission is hereby granted, free of charge, to any person + * obtaining a copy of this software and associated documentation + * files (the "Software"), to deal in the Software without + * restriction, including without limitation the rights to use, copy, + * modify, merge, publish, distribute, sublicense, and/or sell copies + * of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be + * included in all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, + * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF + * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND + * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS + * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN + * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN + * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + * + **********************************************************************/ + +#include "replication/common/client.h" +#include "replication/common/request.pb.h" +#include "lib/assert.h" +#include "lib/message.h" +#include "lib/transport.h" +#include "replication/vr/client.h" +#include "replication/vr/vr-proto.pb.h" + +namespace replication { +namespace vr { + +VRClient::VRClient(const transport::Configuration &config, + Transport *transport, + uint64_t clientid) + : Client(config, transport, clientid) +{ + pendingRequest = NULL; + pendingUnloggedRequest = NULL; + lastReqId = 0; + + requestTimeout = new Timeout(transport, 500, [this]() { + ResendRequest(); + }); + unloggedRequestTimeout = new Timeout(transport, 500, [this]() { + UnloggedRequestTimeoutCallback(); + }); +} + +VRClient::~VRClient() +{ + if (pendingRequest) { + delete pendingRequest; + } + if (pendingUnloggedRequest) { + delete pendingUnloggedRequest; + } + delete requestTimeout; + delete unloggedRequestTimeout; +} + +void +VRClient::Invoke(const string &request, + continuation_t continuation) +{ + // XXX Can only handle one pending request for now + if (pendingRequest != NULL) { + Panic("Client only supports one pending request"); + } + + ++lastReqId; + uint64_t reqId = lastReqId; + pendingRequest = new PendingRequest(request, reqId, continuation); + + SendRequest(); +} + +void +VRClient::InvokeUnlogged(int replicaIdx, + const string &request, + continuation_t continuation, + timeout_continuation_t timeoutContinuation, + uint32_t timeout) +{ + // XXX Can only handle one pending request for now + if (pendingUnloggedRequest != NULL) { + Panic("Client only supports one pending request"); + } + + ++lastReqId; + uint64_t reqId = lastReqId; + + pendingUnloggedRequest = new PendingRequest(request, reqId, continuation); + pendingUnloggedRequest->timeoutContinuation = timeoutContinuation; + + proto::UnloggedRequestMessage reqMsg; + reqMsg.mutable_req()->set_op(pendingUnloggedRequest->request); + reqMsg.mutable_req()->set_clientid(clientid); + reqMsg.mutable_req()->set_clientreqid(pendingUnloggedRequest->clientReqId); + + ASSERT(!unloggedRequestTimeout->Active()); + unloggedRequestTimeout->SetTimeout(timeout); + unloggedRequestTimeout->Start(); + + transport->SendMessageToReplica(this, replicaIdx, reqMsg); +} + +void +VRClient::SendRequest() +{ + if (pendingRequest == NULL) { + return; + } + proto::RequestMessage reqMsg; + reqMsg.mutable_req()->set_op(pendingRequest->request); + reqMsg.mutable_req()->set_clientid(clientid); + reqMsg.mutable_req()->set_clientreqid(pendingRequest->clientReqId); + + //Debug("SENDING REQUEST: %lu %lu", clientid, pendingRequest->clientReqId); + // XXX Try sending only to (what we think is) the leader first + transport->SendMessageToAll(this, reqMsg); + + requestTimeout->Reset(); +} + +void +VRClient::ResendRequest() +{ + if (pendingRequest == NULL) { + requestTimeout->Stop(); + return; + } + + Warning("Client timeout; resending request: %lu", pendingRequest->clientReqId); + SendRequest(); +} + + +void +VRClient::ReceiveMessage(const TransportAddress &remote, + const string &type, + const string &data) +{ + static proto::ReplyMessage reply; + static proto::UnloggedReplyMessage unloggedReply; + + if (type == reply.GetTypeName()) { + reply.ParseFromString(data); + HandleReply(remote, reply); + } else if (type == unloggedReply.GetTypeName()) { + unloggedReply.ParseFromString(data); + HandleUnloggedReply(remote, unloggedReply); + } else { + Client::ReceiveMessage(remote, type, data); + } +} + +void +VRClient::HandleReply(const TransportAddress &remote, + const proto::ReplyMessage &msg) +{ + if (pendingRequest == NULL) { + Warning("Received reply when no request was pending"); + return; + } + + if (msg.clientreqid() != pendingRequest->clientReqId) { + Debug("Received reply for a different request"); + return; + } + + Debug("Client received reply: %lu", pendingRequest->clientReqId); + + requestTimeout->Stop(); + + PendingRequest *req = pendingRequest; + pendingRequest = NULL; + +#if CLIENT_NETWORK_DELAY + transport->Timer(CLIENT_NETWORK_DELAY, [=]() { + req->continuation(req->request, msg.reply()); + delete req; + }); +#else + req->continuation(req->request, msg.reply()); + delete req; +#endif + + +} + +void +VRClient::HandleUnloggedReply(const TransportAddress &remote, + const proto::UnloggedReplyMessage &msg) +{ + if (pendingUnloggedRequest == NULL) { + Warning("Received unloggedReply when no request was pending"); + return; + } + + Debug("Client received unloggedReply"); + + unloggedRequestTimeout->Stop(); + + PendingRequest *req = pendingUnloggedRequest; + pendingUnloggedRequest = NULL; + +#if READ_AT_LEADER + transport->Timer(CLIENT_NETWORK_DELAY, [=]() { + req->continuation(req->request, msg.reply()); + delete req; + }); +#else + req->continuation(req->request, msg.reply()); + delete req; +#endif +} + +void +VRClient::UnloggedRequestTimeoutCallback() +{ + PendingRequest *req = pendingUnloggedRequest; + pendingUnloggedRequest = NULL; + + Warning("Unlogged request timed out"); + + unloggedRequestTimeout->Stop(); + + req->timeoutContinuation(req->request); +} + +} // namespace vr +} // namespace replication diff --git a/replication/vr/client.h b/replication/vr/client.h new file mode 100644 index 0000000..db4df0f --- /dev/null +++ b/replication/vr/client.h @@ -0,0 +1,92 @@ +// -*- mode: c++; c-file-style: "k&r"; c-basic-offset: 4 -*- +/*********************************************************************** + * + * vr/client.h: + * dummy implementation of replication interface that just uses a + * single replica and passes commands directly to it + * + * Copyright 2013 Dan R. K. Ports <drkp@cs.washington.edu> + * + * Permission is hereby granted, free of charge, to any person + * obtaining a copy of this software and associated documentation + * files (the "Software"), to deal in the Software without + * restriction, including without limitation the rights to use, copy, + * modify, merge, publish, distribute, sublicense, and/or sell copies + * of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be + * included in all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, + * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF + * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND + * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS + * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN + * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN + * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + * + **********************************************************************/ + +#ifndef _VR_CLIENT_H_ +#define _VR_CLIENT_H_ + +#include "replication/common/client.h" +#include "lib/configuration.h" +#include "replication/vr/vr-proto.pb.h" + +namespace replication { +namespace vr { + +class VRClient : public Client +{ +public: + VRClient(const transport::Configuration &config, + Transport *transport, + uint64_t clientid = 0); + virtual ~VRClient(); + virtual void Invoke(const string &request, + continuation_t continuation); + virtual void InvokeUnlogged(int replicaIdx, + const string &request, + continuation_t continuation, + timeout_continuation_t timeoutContinuation = nullptr, + uint32_t timeout = DEFAULT_UNLOGGED_OP_TIMEOUT); + virtual void ReceiveMessage(const TransportAddress &remote, + const string &type, const string &data); + +protected: + int view; + int opnumber; + uint64_t lastReqId; + + struct PendingRequest + { + string request; + uint64_t clientReqId; + continuation_t continuation; + timeout_continuation_t timeoutContinuation; + inline PendingRequest(string request, uint64_t clientReqId, + continuation_t continuation) + : request(request), clientReqId(clientReqId), + continuation(continuation) { } + }; + PendingRequest *pendingRequest; + PendingRequest *pendingUnloggedRequest; + Timeout *requestTimeout; + Timeout *unloggedRequestTimeout; + + void SendRequest(); + void ResendRequest(); + void HandleReply(const TransportAddress &remote, + const proto::ReplyMessage &msg); + void HandleUnloggedReply(const TransportAddress &remote, + const proto::UnloggedReplyMessage &msg); + void UnloggedRequestTimeoutCallback(); +}; + +} // namespace replication::vr +} // namespace replication + +#endif /* _VR_CLIENT_H_ */ diff --git a/replication/vr/replica.cc b/replication/vr/replica.cc new file mode 100644 index 0000000..6315ac5 --- /dev/null +++ b/replication/vr/replica.cc @@ -0,0 +1,1039 @@ +// -*- mode: c++; c-file-style: "k&r"; c-basic-offset: 4 -*- +/*********************************************************************** + * + * vr/replica.cc: + * Viewstamped Replication protocol + * + * Copyright 2013 Dan R. K. Ports <drkp@cs.washington.edu> + * + * Permission is hereby granted, free of charge, to any person + * obtaining a copy of this software and associated documentation + * files (the "Software"), to deal in the Software without + * restriction, including without limitation the rights to use, copy, + * modify, merge, publish, distribute, sublicense, and/or sell copies + * of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be + * included in all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, + * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF + * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND + * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS + * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN + * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN + * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + * + **********************************************************************/ + +#include "replication/common/replica.h" +#include "replication/vr/replica.h" +#include "replication/vr/vr-proto.pb.h" + +#include "lib/assert.h" +#include "lib/configuration.h" +#include "lib/message.h" +#include "lib/transport.h" + +#include <algorithm> + +#define RDebug(fmt, ...) Debug("[%d] " fmt, myIdx, ##__VA_ARGS__) +#define RNotice(fmt, ...) Notice("[%d] " fmt, myIdx, ##__VA_ARGS__) +#define RWarning(fmt, ...) Warning("[%d] " fmt, myIdx, ##__VA_ARGS__) +#define RPanic(fmt, ...) Panic("[%d] " fmt, myIdx, ##__VA_ARGS__) + +namespace replication { +namespace vr { + +using namespace proto; + +VRReplica::VRReplica(transport::Configuration config, int myIdx, + Transport *transport, unsigned int batchSize, + AppReplica *app) + : Replica(config, myIdx, transport, app), + batchSize(batchSize), + log(false), + prepareOKQuorum(config.QuorumSize()-1), + startViewChangeQuorum(config.QuorumSize()-1), + doViewChangeQuorum(config.QuorumSize()-1) +{ + this->status = STATUS_NORMAL; + this->view = 0; + this->lastOp = 0; + this->lastCommitted = 0; + this->lastRequestStateTransferView = 0; + this->lastRequestStateTransferOpnum = 0; + lastBatchEnd = 0; + + if (batchSize > 1) { + Notice("Batching enabled; batch size %d", batchSize); + } + + this->viewChangeTimeout = new Timeout(transport, 5000, [this]() { + StartViewChange(view+1); + }); + this->nullCommitTimeout = new Timeout(transport, 1000, [this]() { + SendNullCommit(); + }); + this->stateTransferTimeout = new Timeout(transport, 1000, [this]() { + this->lastRequestStateTransferView = 0; + this->lastRequestStateTransferOpnum = 0; + }); + this->stateTransferTimeout->Start(); + this->resendPrepareTimeout = new Timeout(transport, 500, [this]() { + ResendPrepare(); + }); + this->closeBatchTimeout = new Timeout(transport, 300, [this]() { + CloseBatch(); + }); + + if (AmLeader()) { + nullCommitTimeout->Start(); + } else { + viewChangeTimeout->Start(); + } +} + +VRReplica::~VRReplica() +{ + delete viewChangeTimeout; + delete nullCommitTimeout; + delete stateTransferTimeout; + delete resendPrepareTimeout; + delete closeBatchTimeout; + + for (auto &kv : pendingPrepares) { + delete kv.first; + } +} + +bool +VRReplica::AmLeader() const +{ + return (configuration.GetLeaderIndex(view) == myIdx); +} + +void +VRReplica::CommitUpTo(opnum_t upto) +{ + while (lastCommitted < upto) { + lastCommitted++; + + /* Find operation in log */ + const LogEntry *entry = log.Find(lastCommitted); + if (!entry) { + RPanic("Did not find operation " FMT_OPNUM " in log", lastCommitted); + } + + /* Execute it */ + RDebug("Executing request " FMT_OPNUM, lastCommitted); + ReplyMessage reply; + Execute(lastCommitted, entry->request, reply); + + reply.set_view(entry->viewstamp.view); + reply.set_opnum(entry->viewstamp.opnum); + reply.set_clientreqid(entry->request.clientreqid()); + + /* Mark it as committed */ + log.SetStatus(lastCommitted, LOG_STATE_COMMITTED); + + // Store reply in the client table + ClientTableEntry &cte = + clientTable[entry->request.clientid()]; + if (cte.lastReqId <= entry->request.clientreqid()) { + cte.lastReqId = entry->request.clientreqid(); + cte.replied = true; + cte.reply = reply; + } else { + // We've subsequently prepared another operation from the + // same client. So this request must have been completed + // at the client, and there's no need to record the + // result. + } + + /* Send reply */ + auto iter = clientAddresses.find(entry->request.clientid()); + if (iter != clientAddresses.end()) { + transport->SendMessage(this, *iter->second, reply); + } + } +} + +void +VRReplica::SendPrepareOKs(opnum_t oldLastOp) +{ + /* Send PREPAREOKs for new uncommitted operations */ + for (opnum_t i = oldLastOp; i <= lastOp; i++) { + /* It has to be new *and* uncommitted */ + if (i <= lastCommitted) { + continue; + } + + const LogEntry *entry = log.Find(i); + if (!entry) { + RPanic("Did not find operation " FMT_OPNUM " in log", i); + } + ASSERT(entry->state == LOG_STATE_PREPARED); + UpdateClientTable(entry->request); + + PrepareOKMessage reply; + reply.set_view(view); + reply.set_opnum(i); + reply.set_replicaidx(myIdx); + + RDebug("Sending PREPAREOK " FMT_VIEWSTAMP " for new uncommitted operation", + reply.view(), reply.opnum()); + + if (!(transport->SendMessageToReplica(this, + configuration.GetLeaderIndex(view), + reply))) { + RWarning("Failed to send PrepareOK message to leader"); + } + } +} + +void +VRReplica::RequestStateTransfer() +{ + RequestStateTransferMessage m; + m.set_view(view); + m.set_opnum(lastCommitted); + + if ((lastRequestStateTransferOpnum != 0) && + (lastRequestStateTransferView == view) && + (lastRequestStateTransferOpnum == lastCommitted)) { + RDebug("Skipping state transfer request " FMT_VIEWSTAMP + " because we already requested it", view, lastCommitted); + return; + } + + RNotice("Requesting state transfer: " FMT_VIEWSTAMP, view, lastCommitted); + + this->lastRequestStateTransferView = view; + this->lastRequestStateTransferOpnum = lastCommitted; + + if (!transport->SendMessageToAll(this, m)) { + RWarning("Failed to send RequestStateTransfer message to all replicas"); + } +} + +void +VRReplica::EnterView(view_t newview) +{ + RNotice("Entering new view " FMT_VIEW, newview); + + view = newview; + status = STATUS_NORMAL; + lastBatchEnd = lastOp; + + if (AmLeader()) { + viewChangeTimeout->Stop(); + nullCommitTimeout->Start(); + } else { + viewChangeTimeout->Start(); + nullCommitTimeout->Stop(); + resendPrepareTimeout->Stop(); + closeBatchTimeout->Stop(); + } + + prepareOKQuorum.Clear(); + startViewChangeQuorum.Clear(); + doViewChangeQuorum.Clear(); +} + +void +VRReplica::StartViewChange(view_t newview) +{ + RNotice("Starting view change for view " FMT_VIEW, newview); + + view = newview; + status = STATUS_VIEW_CHANGE; + + viewChangeTimeout->Reset(); + nullCommitTimeout->Stop(); + resendPrepareTimeout->Stop(); + closeBatchTimeout->Stop(); + + StartViewChangeMessage m; + m.set_view(newview); + m.set_replicaidx(myIdx); + m.set_lastcommitted(lastCommitted); + + if (!transport->SendMessageToAll(this, m)) { + RWarning("Failed to send StartViewChange message to all replicas"); + } +} + +void +VRReplica::SendNullCommit() +{ + CommitMessage cm; + cm.set_view(this->view); + cm.set_opnum(this->lastCommitted); + + ASSERT(AmLeader()); + + if (!(transport->SendMessageToAll(this, cm))) { + RWarning("Failed to send null COMMIT message to all replicas"); + } +} + +void +VRReplica::UpdateClientTable(const Request &req) +{ + ClientTableEntry &entry = clientTable[req.clientid()]; + + ASSERT(entry.lastReqId <= req.clientreqid()); + + if (entry.lastReqId == req.clientreqid()) { + return; + } + + entry.lastReqId = req.clientreqid(); + entry.replied = false; + entry.reply.Clear(); +} + +void +VRReplica::ResendPrepare() +{ + ASSERT(AmLeader()); + if (lastOp == lastCommitted) { + return; + } + RNotice("Resending prepare"); + if (!(transport->SendMessageToAll(this, lastPrepare))) { + RWarning("Failed to ressend prepare message to all replicas"); + } +} + +void +VRReplica::CloseBatch() +{ + ASSERT(AmLeader()); + ASSERT(lastBatchEnd < lastOp); + + opnum_t batchStart = lastBatchEnd+1; + + RDebug("Sending batched prepare from " FMT_OPNUM + " to " FMT_OPNUM, + batchStart, lastOp); + /* Send prepare messages */ + PrepareMessage p; + p.set_view(view); + p.set_opnum(lastOp); + p.set_batchstart(batchStart); + + for (opnum_t i = batchStart; i <= lastOp; i++) { + Request *r = p.add_request(); + const LogEntry *entry = log.Find(i); + ASSERT(entry != NULL); + ASSERT(entry->viewstamp.view == view); + ASSERT(entry->viewstamp.opnum == i); + *r = entry->request; + } + lastPrepare = p; + + if (!(transport->SendMessageToAll(this, p))) { + RWarning("Failed to send prepare message to all replicas"); + } + lastBatchEnd = lastOp; + + resendPrepareTimeout->Reset(); + closeBatchTimeout->Stop(); +} + +void +VRReplica::ReceiveMessage(const TransportAddress &remote, + const string &type, const string &data) +{ + static RequestMessage request; + static UnloggedRequestMessage unloggedRequest; + static PrepareMessage prepare; + static PrepareOKMessage prepareOK; + static CommitMessage commit; + static RequestStateTransferMessage requestStateTransfer; + static StateTransferMessage stateTransfer; + static StartViewChangeMessage startViewChange; + static DoViewChangeMessage doViewChange; + static StartViewMessage startView; + + if (type == request.GetTypeName()) { + request.ParseFromString(data); + HandleRequest(remote, request); + } else if (type == unloggedRequest.GetTypeName()) { + unloggedRequest.ParseFromString(data); + HandleUnloggedRequest(remote, unloggedRequest); + } else if (type == prepare.GetTypeName()) { + prepare.ParseFromString(data); + HandlePrepare(remote, prepare); + } else if (type == prepareOK.GetTypeName()) { + prepareOK.ParseFromString(data); + HandlePrepareOK(remote, prepareOK); + } else if (type == commit.GetTypeName()) { + commit.ParseFromString(data); + HandleCommit(remote, commit); + } else if (type == requestStateTransfer.GetTypeName()) { + requestStateTransfer.ParseFromString(data); + HandleRequestStateTransfer(remote, requestStateTransfer); + } else if (type == stateTransfer.GetTypeName()) { + stateTransfer.ParseFromString(data); + HandleStateTransfer(remote, stateTransfer); + } else if (type == startViewChange.GetTypeName()) { + startViewChange.ParseFromString(data); + HandleStartViewChange(remote, startViewChange); + } else if (type == doViewChange.GetTypeName()) { + doViewChange.ParseFromString(data); + HandleDoViewChange(remote, doViewChange); + } else if (type == startView.GetTypeName()) { + startView.ParseFromString(data); + HandleStartView(remote, startView); + } else { + RPanic("Received unexpected message type in VR proto: %s", + type.c_str()); + } +} + +void +VRReplica::HandleRequest(const TransportAddress &remote, + const RequestMessage &msg) +{ + viewstamp_t v; + + if (status != STATUS_NORMAL) { + RNotice("Ignoring request due to abnormal status"); + return; + } + + if (!AmLeader()) { + RDebug("Ignoring request because I'm not the leader"); + return; + } + + // Save the client's address + clientAddresses.erase(msg.req().clientid()); + clientAddresses.insert( + std::pair<uint64_t, std::unique_ptr<TransportAddress> >( + msg.req().clientid(), + std::unique_ptr<TransportAddress>(remote.clone()))); + + // Check the client table to see if this is a duplicate request + auto kv = clientTable.find(msg.req().clientid()); + if (kv != clientTable.end()) { + const ClientTableEntry &entry = kv->second; + if (msg.req().clientreqid() < entry.lastReqId) { + RNotice("Ignoring stale request"); + return; + } + if (msg.req().clientreqid() == entry.lastReqId) { + // This is a duplicate request. Resend the reply if we + // have one. We might not have a reply to resend if we're + // waiting for the other replicas; in that case, just + // discard the request. + if (entry.replied) { + RNotice("Received duplicate request; resending reply"); + if (!(transport->SendMessage(this, remote, + entry.reply))) { + RWarning("Failed to resend reply to client"); + } + return; + } else { + RNotice("Received duplicate request but no reply available; ignoring"); + return; + } + } + } + + // Update the client table + UpdateClientTable(msg.req()); + + // Leader Upcall + bool replicate = false; + string res; + LeaderUpcall(lastCommitted, msg.req().op(), replicate, res); + ClientTableEntry &cte = + clientTable[msg.req().clientid()]; + + // Check whether this request should be committed to replicas + if (!replicate) { + RDebug("Executing request failed. Not committing to replicas"); + ReplyMessage reply; + + reply.set_reply(res); + reply.set_view(0); + reply.set_opnum(0); + reply.set_clientreqid(msg.req().clientreqid()); + cte.replied = true; + cte.reply = reply; + transport->SendMessage(this, remote, reply); + } else { + Request request; + request.set_op(res); + request.set_clientid(msg.req().clientid()); + request.set_clientreqid(msg.req().clientreqid()); + + /* Assign it an opnum */ + ++this->lastOp; + v.view = this->view; + v.opnum = this->lastOp; + + RDebug("Received REQUEST, assigning " FMT_VIEWSTAMP, VA_VIEWSTAMP(v)); + + /* Add the request to my log */ + log.Append(v, request, LOG_STATE_PREPARED); + + if (lastOp - lastBatchEnd+1 > batchSize) { + CloseBatch(); + } else { + RDebug("Keeping in batch"); + if (!closeBatchTimeout->Active()) { + closeBatchTimeout->Start(); + } + } + + nullCommitTimeout->Reset(); + } +} + +void +VRReplica::HandleUnloggedRequest(const TransportAddress &remote, + const UnloggedRequestMessage &msg) +{ + if (status != STATUS_NORMAL) { + // Not clear if we should ignore this or just let the request + // go ahead, but this seems reasonable. + RNotice("Ignoring unlogged request due to abnormal status"); + return; + } + + UnloggedReplyMessage reply; + + Debug("Received unlogged request %s", (char *)msg.req().op().c_str()); + + ExecuteUnlogged(msg.req(), reply); + + if (!(transport->SendMessage(this, remote, reply))) + Warning("Failed to send reply message"); +} + +void +VRReplica::HandlePrepare(const TransportAddress &remote, + const PrepareMessage &msg) +{ + RDebug("Received PREPARE <" FMT_VIEW "," FMT_OPNUM "-" FMT_OPNUM ">", + msg.view(), msg.batchstart(), msg.opnum()); + + if (this->status != STATUS_NORMAL) { + RDebug("Ignoring PREPARE due to abnormal status"); + return; + } + + if (msg.view() < this->view) { + RDebug("Ignoring PREPARE due to stale view"); + return; + } + + if (msg.view() > this->view) { + RequestStateTransfer(); + pendingPrepares.push_back(std::pair<TransportAddress *, PrepareMessage>(remote.clone(), msg)); + return; + } + + if (AmLeader()) { + RPanic("Unexpected PREPARE: I'm the leader of this view"); + } + + ASSERT(msg.batchstart() <= msg.opnum()); + ASSERT_EQ(msg.opnum()-msg.batchstart()+1, (unsigned int)msg.request_size()); + + viewChangeTimeout->Reset(); + + if (msg.opnum() <= this->lastOp) { + RDebug("Ignoring PREPARE; already prepared that operation"); + // Resend the prepareOK message + PrepareOKMessage reply; + reply.set_view(msg.view()); + reply.set_opnum(msg.opnum()); + reply.set_replicaidx(myIdx); + if (!(transport->SendMessageToReplica(this, + configuration.GetLeaderIndex(view), + reply))) { + RWarning("Failed to send PrepareOK message to leader"); + } + return; + } + + if (msg.batchstart() > this->lastOp+1) { + RequestStateTransfer(); + pendingPrepares.push_back(std::pair<TransportAddress *, PrepareMessage>(remote.clone(), msg)); + return; + } + + /* Add operations to the log */ + opnum_t op = msg.batchstart()-1; + for (auto &req : msg.request()) { + op++; + if (op <= lastOp) { + continue; + } + this->lastOp++; + log.Append(viewstamp_t(msg.view(), op), + req, LOG_STATE_PREPARED); + UpdateClientTable(req); + } + ASSERT(op == msg.opnum()); + + /* Build reply and send it to the leader */ + PrepareOKMessage reply; + reply.set_view(msg.view()); + reply.set_opnum(msg.opnum()); + reply.set_replicaidx(myIdx); + + if (!(transport->SendMessageToReplica(this, + configuration.GetLeaderIndex(view), + reply))) { + RWarning("Failed to send PrepareOK message to leader"); + } +} + +void +VRReplica::HandlePrepareOK(const TransportAddress &remote, + const PrepareOKMessage &msg) +{ + + RDebug("Received PREPAREOK <" FMT_VIEW ", " + FMT_OPNUM "> from replica %d", + msg.view(), msg.opnum(), msg.replicaidx()); + + if (this->status != STATUS_NORMAL) { + RDebug("Ignoring PREPAREOK due to abnormal status"); + return; + } + + if (msg.view() < this->view) { + RDebug("Ignoring PREPAREOK due to stale view"); + return; + } + + if (msg.view() > this->view) { + RequestStateTransfer(); + return; + } + + if (!AmLeader()) { + RWarning("Ignoring PREPAREOK because I'm not the leader"); + return; + } + + viewstamp_t vs = { msg.view(), msg.opnum() }; + if (auto msgs = + (prepareOKQuorum.AddAndCheckForQuorum(vs, msg.replicaidx(), msg))) { + /* + * We have a quorum of PrepareOK messages for this + * opnumber. Execute it and all previous operations. + * + * (Note that we might have already executed it. That's fine, + * we just won't do anything.) + * + * This also notifies the client of the result. + */ + CommitUpTo(msg.opnum()); + + if (msgs->size() >= (unsigned int)configuration.QuorumSize()) { + return; + } + + /* + * Send COMMIT message to the other replicas. + * + * This can be done asynchronously, so it really ought to be + * piggybacked on the next PREPARE or something. + */ + CommitMessage cm; + cm.set_view(this->view); + cm.set_opnum(this->lastCommitted); + + if (!(transport->SendMessageToAll(this, cm))) { + RWarning("Failed to send COMMIT message to all replicas"); + } + + nullCommitTimeout->Reset(); + } +} + +void +VRReplica::HandleCommit(const TransportAddress &remote, + const CommitMessage &msg) +{ + RDebug("Received COMMIT " FMT_VIEWSTAMP, msg.view(), msg.opnum()); + + if (this->status != STATUS_NORMAL) { + RDebug("Ignoring COMMIT due to abnormal status"); + return; + } + + if (msg.view() < this->view) { + RDebug("Ignoring COMMIT due to stale view"); + return; + } + + if (msg.view() > this->view) { + RequestStateTransfer(); + return; + } + + if (AmLeader()) { + RPanic("Unexpected COMMIT: I'm the leader of this view"); + } + + viewChangeTimeout->Reset(); + + if (msg.opnum() <= this->lastCommitted) { + RDebug("Ignoring COMMIT; already committed that operation"); + return; + } + + if (msg.opnum() > this->lastOp) { + RequestStateTransfer(); + return; + } + + CommitUpTo(msg.opnum()); +} + + +void +VRReplica::HandleRequestStateTransfer(const TransportAddress &remote, + const RequestStateTransferMessage &msg) +{ + RDebug("Received REQUESTSTATETRANSFER " FMT_VIEWSTAMP, + msg.view(), msg.opnum()); + + if (status != STATUS_NORMAL) { + RDebug("Ignoring REQUESTSTATETRANSFER due to abnormal status"); + return; + } + + if (msg.view() > view) { + RequestStateTransfer(); + return; + } + + RNotice("Sending state transfer from " FMT_VIEWSTAMP " to " + FMT_VIEWSTAMP, + msg.view(), msg.opnum(), view, lastCommitted); + + StateTransferMessage reply; + reply.set_view(view); + reply.set_opnum(lastCommitted); + + log.Dump(msg.opnum()+1, reply.mutable_entries()); + + transport->SendMessage(this, remote, reply); +} + +void +VRReplica::HandleStateTransfer(const TransportAddress &remote, + const StateTransferMessage &msg) +{ + RDebug("Received STATETRANSFER " FMT_VIEWSTAMP, msg.view(), msg.opnum()); + + if (msg.view() < view) { + RWarning("Ignoring state transfer for older view"); + return; + } + + opnum_t oldLastOp = lastOp; + + /* Install the new log entries */ + for (auto newEntry : msg.entries()) { + if (newEntry.opnum() <= lastCommitted) { + // Already committed this operation; nothing to be done. +#if PARANOID + const LogEntry *entry = log.Find(newEntry.opnum()); + ASSERT(entry->viewstamp.opnum == newEntry.opnum()); + ASSERT(entry->viewstamp.view == newEntry.view()); +// ASSERT(entry->request == newEntry.request()); +#endif + } else if (newEntry.opnum() <= lastOp) { + // We already have an entry with this opnum, but maybe + // it's from an older view? + const LogEntry *entry = log.Find(newEntry.opnum()); + ASSERT(entry->viewstamp.opnum == newEntry.opnum()); + ASSERT(entry->viewstamp.view <= newEntry.view()); + + if (entry->viewstamp.view == newEntry.view()) { + // We already have this operation in our log. + ASSERT(entry->state == LOG_STATE_PREPARED); +#if PARANOID +// ASSERT(entry->request == newEntry.request()); +#endif + } else { + // Our operation was from an older view, so obviously + // it didn't survive a view change. Throw out any + // later log entries and replace with this one. + ASSERT(entry->state != LOG_STATE_COMMITTED); + log.RemoveAfter(newEntry.opnum()); + lastOp = newEntry.opnum(); + oldLastOp = lastOp; + + viewstamp_t vs = { newEntry.view(), newEntry.opnum() }; + log.Append(vs, newEntry.request(), LOG_STATE_PREPARED); + } + } else { + // This is a new operation to us. Add it to the log. + ASSERT(newEntry.opnum() == lastOp+1); + + lastOp++; + viewstamp_t vs = { newEntry.view(), newEntry.opnum() }; + log.Append(vs, newEntry.request(), LOG_STATE_PREPARED); + } + } + + + if (msg.view() > view) { + EnterView(msg.view()); + } + + /* Execute committed operations */ + ASSERT(msg.opnum() <= lastOp); + CommitUpTo(msg.opnum()); + SendPrepareOKs(oldLastOp); + + // Process pending prepares + std::list<std::pair<TransportAddress *, PrepareMessage> >pending = pendingPrepares; + pendingPrepares.clear(); + for (auto & msgpair : pendingPrepares) { + RDebug("Processing pending prepare message"); + HandlePrepare(*msgpair.first, msgpair.second); + delete msgpair.first; + } +} + +void +VRReplica::HandleStartViewChange(const TransportAddress &remote, + const StartViewChangeMessage &msg) +{ + RDebug("Received STARTVIEWCHANGE " FMT_VIEW " from replica %d", + msg.view(), msg.replicaidx()); + + if (msg.view() < view) { + RDebug("Ignoring STARTVIEWCHANGE for older view"); + return; + } + + if ((msg.view() == view) && (status != STATUS_VIEW_CHANGE)) { + RDebug("Ignoring STARTVIEWCHANGE for current view"); + return; + } + + if ((status != STATUS_VIEW_CHANGE) || (msg.view() > view)) { + StartViewChange(msg.view()); + } + + ASSERT(msg.view() == view); + + if (auto msgs = + startViewChangeQuorum.AddAndCheckForQuorum(msg.view(), + msg.replicaidx(), + msg)) { + int leader = configuration.GetLeaderIndex(view); + // Don't try to send a DoViewChange message to ourselves + if (leader != myIdx) { + DoViewChangeMessage dvc; + dvc.set_view(view); + dvc.set_lastnormalview(log.LastViewstamp().view); + dvc.set_lastop(lastOp); + dvc.set_lastcommitted(lastCommitted); + dvc.set_replicaidx(myIdx); + + // Figure out how much of the log to include + opnum_t minCommitted = std::min_element( + msgs->begin(), msgs->end(), + [](decltype(*msgs->begin()) a, + decltype(*msgs->begin()) b) { + return a.second.lastcommitted() < b.second.lastcommitted(); + })->second.lastcommitted(); + minCommitted = std::min(minCommitted, lastCommitted); + + log.Dump(minCommitted, + dvc.mutable_entries()); + + if (!(transport->SendMessageToReplica(this, leader, dvc))) { + RWarning("Failed to send DoViewChange message to leader of new view"); + } + } + } +} + + +void +VRReplica::HandleDoViewChange(const TransportAddress &remote, + const DoViewChangeMessage &msg) +{ + RDebug("Received DOVIEWCHANGE " FMT_VIEW " from replica %d, " + "lastnormalview=" FMT_VIEW " op=" FMT_OPNUM " committed=" FMT_OPNUM, + msg.view(), msg.replicaidx(), + msg.lastnormalview(), msg.lastop(), msg.lastcommitted()); + + if (msg.view() < view) { + RDebug("Ignoring DOVIEWCHANGE for older view"); + return; + } + + if ((msg.view() == view) && (status != STATUS_VIEW_CHANGE)) { + RDebug("Ignoring DOVIEWCHANGE for current view"); + return; + } + + if ((status != STATUS_VIEW_CHANGE) || (msg.view() > view)) { + // It's superfluous to send the StartViewChange messages here, + // but harmless... + StartViewChange(msg.view()); + } + + ASSERT(configuration.GetLeaderIndex(msg.view()) == myIdx); + + auto msgs = doViewChangeQuorum.AddAndCheckForQuorum(msg.view(), + msg.replicaidx(), + msg); + if (msgs != NULL) { + // Find the response with the most up to date log, i.e. the + // one with the latest viewstamp + view_t latestView = log.LastViewstamp().view; + opnum_t latestOp = log.LastViewstamp().opnum; + DoViewChangeMessage *latestMsg = NULL; + + for (auto kv : *msgs) { + DoViewChangeMessage &x = kv.second; + if ((x.lastnormalview() > latestView) || + (((x.lastnormalview() == latestView) && + (x.lastop() > latestOp)))) { + latestView = x.lastnormalview(); + latestOp = x.lastop(); + latestMsg = &x; + } + } + + // Install the new log. We might not need to do this, if our + // log was the most current one. + if (latestMsg != NULL) { + RDebug("Selected log from replica %d with lastop=" FMT_OPNUM, + latestMsg->replicaidx(), latestMsg->lastop()); + if (latestMsg->entries_size() == 0) { + // There weren't actually any entries in the + // log. That should only happen in the corner case + // that everyone already had the entire log, maybe + // because it actually is empty. + ASSERT(lastCommitted == msg.lastcommitted()); + ASSERT(msg.lastop() == msg.lastcommitted()); + } else { + if (latestMsg->entries(0).opnum() > lastCommitted+1) { + RPanic("Received log that didn't include enough entries to install it"); + } + + log.RemoveAfter(latestMsg->lastop()+1); + log.Install(latestMsg->entries().begin(), + latestMsg->entries().end()); + } + } else { + RDebug("My log is most current, lastnormalview=" FMT_VIEW " lastop=" FMT_OPNUM, + log.LastViewstamp().view, lastOp); + } + + // How much of the log should we include when we send the + // STARTVIEW message? Start from the lowest committed opnum of + // any of the STARTVIEWCHANGE or DOVIEWCHANGE messages we got. + // + // We need to compute this before we enter the new view + // because the saved messages will go away. + auto svcs = startViewChangeQuorum.GetMessages(view); + opnum_t minCommittedSVC = std::min_element( + svcs.begin(), svcs.end(), + [](decltype(*svcs.begin()) a, + decltype(*svcs.begin()) b) { + return a.second.lastcommitted() < b.second.lastcommitted(); + })->second.lastcommitted(); + opnum_t minCommittedDVC = std::min_element( + msgs->begin(), msgs->end(), + [](decltype(*msgs->begin()) a, + decltype(*msgs->begin()) b) { + return a.second.lastcommitted() < b.second.lastcommitted(); + })->second.lastcommitted(); + opnum_t minCommitted = std::min(minCommittedSVC, minCommittedDVC); + minCommitted = std::min(minCommitted, lastCommitted); + + EnterView(msg.view()); + + ASSERT(AmLeader()); + + lastOp = latestOp; + if (latestMsg != NULL) { + CommitUpTo(latestMsg->lastcommitted()); + } + + // Send a STARTVIEW message with the new log + StartViewMessage sv; + sv.set_view(view); + sv.set_lastop(lastOp); + sv.set_lastcommitted(lastCommitted); + + log.Dump(minCommitted, sv.mutable_entries()); + + if (!(transport->SendMessageToAll(this, sv))) { + RWarning("Failed to send StartView message to all replicas"); + } + } +} + +void +VRReplica::HandleStartView(const TransportAddress &remote, + const StartViewMessage &msg) +{ + RDebug("Received STARTVIEW " FMT_VIEW + " op=" FMT_OPNUM " committed=" FMT_OPNUM " entries=%d", + msg.view(), msg.lastop(), msg.lastcommitted(), msg.entries_size()); + RDebug("Currently in view " FMT_VIEW " op " FMT_OPNUM " committed " FMT_OPNUM, + view, lastOp, lastCommitted); + + if (msg.view() < view) { + RWarning("Ignoring STARTVIEW for older view"); + return; + } + + if ((msg.view() == view) && (status != STATUS_VIEW_CHANGE)) { + RWarning("Ignoring STARTVIEW for current view"); + return; + } + + ASSERT(configuration.GetLeaderIndex(msg.view()) != myIdx); + + if (msg.entries_size() == 0) { + ASSERT(msg.lastcommitted() == lastCommitted); + ASSERT(msg.lastop() == msg.lastcommitted()); + } else { + if (msg.entries(0).opnum() > lastCommitted+1) { + RPanic("Not enough entries in STARTVIEW message to install new log"); + } + + // Install the new log + log.RemoveAfter(msg.lastop()+1); + log.Install(msg.entries().begin(), + msg.entries().end()); + } + + + EnterView(msg.view()); + opnum_t oldLastOp = lastOp; + lastOp = msg.lastop(); + + ASSERT(!AmLeader()); + + CommitUpTo(msg.lastcommitted()); + SendPrepareOKs(oldLastOp); +} + +} // namespace replication::vr +} // namespace replication diff --git a/replication/vr/replica.h b/replication/vr/replica.h new file mode 100644 index 0000000..335d573 --- /dev/null +++ b/replication/vr/replica.h @@ -0,0 +1,127 @@ +// -*- mode: c++; c-file-style: "k&r"; c-basic-offset: 4 -*- +/*********************************************************************** + * + * vr/replica.h: + * Viewstamped Replication protocol + * + * Copyright 2013 Dan R. K. Ports <drkp@cs.washington.edu> + * + * Permission is hereby granted, free of charge, to any person + * obtaining a copy of this software and associated documentation + * files (the "Software"), to deal in the Software without + * restriction, including without limitation the rights to use, copy, + * modify, merge, publish, distribute, sublicense, and/or sell copies + * of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be + * included in all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, + * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF + * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND + * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS + * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN + * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN + * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + * + **********************************************************************/ + +#ifndef _VR_REPLICA_H_ +#define _VR_REPLICA_H_ + +#include "lib/configuration.h" +#include "replication/common/log.h" +#include "replication/common/replica.h" +#include "replication/common/quorumset.h" +#include "replication/vr/vr-proto.pb.h" + +#include <map> +#include <memory> +#include <list> + +namespace replication { +namespace vr { + +class VRReplica : public Replica +{ +public: + VRReplica(transport::Configuration config, int myIdx, + Transport *transport, unsigned int batchSize, + AppReplica *app); + ~VRReplica(); + + void ReceiveMessage(const TransportAddress &remote, + const string &type, const string &data); + +private: + view_t view; + opnum_t lastCommitted; + opnum_t lastOp; + view_t lastRequestStateTransferView; + opnum_t lastRequestStateTransferOpnum; + std::list<std::pair<TransportAddress *, + proto::PrepareMessage> > pendingPrepares; + proto::PrepareMessage lastPrepare; + unsigned int batchSize; + opnum_t lastBatchEnd; + + Log log; + std::map<uint64_t, std::unique_ptr<TransportAddress> > clientAddresses; + struct ClientTableEntry + { + uint64_t lastReqId; + bool replied; + proto::ReplyMessage reply; + }; + std::map<uint64_t, ClientTableEntry> clientTable; + + QuorumSet<viewstamp_t, proto::PrepareOKMessage> prepareOKQuorum; + QuorumSet<view_t, proto::StartViewChangeMessage> startViewChangeQuorum; + QuorumSet<view_t, proto::DoViewChangeMessage> doViewChangeQuorum; + + Timeout *viewChangeTimeout; + Timeout *nullCommitTimeout; + Timeout *stateTransferTimeout; + Timeout *resendPrepareTimeout; + Timeout *closeBatchTimeout; + + bool AmLeader() const; + void CommitUpTo(opnum_t upto); + void SendPrepareOKs(opnum_t oldLastOp); + void RequestStateTransfer(); + void EnterView(view_t newview); + void StartViewChange(view_t newview); + void SendNullCommit(); + void UpdateClientTable(const Request &req); + void ResendPrepare(); + void CloseBatch(); + + void HandleRequest(const TransportAddress &remote, + const proto::RequestMessage &msg); + void HandleUnloggedRequest(const TransportAddress &remote, + const proto::UnloggedRequestMessage &msg); + + void HandlePrepare(const TransportAddress &remote, + const proto::PrepareMessage &msg); + void HandlePrepareOK(const TransportAddress &remote, + const proto::PrepareOKMessage &msg); + void HandleCommit(const TransportAddress &remote, + const proto::CommitMessage &msg); + void HandleRequestStateTransfer(const TransportAddress &remote, + const proto::RequestStateTransferMessage &msg); + void HandleStateTransfer(const TransportAddress &remote, + const proto::StateTransferMessage &msg); + void HandleStartViewChange(const TransportAddress &remote, + const proto::StartViewChangeMessage &msg); + void HandleDoViewChange(const TransportAddress &remote, + const proto::DoViewChangeMessage &msg); + void HandleStartView(const TransportAddress &remote, + const proto::StartViewMessage &msg); +}; + +} // namespace replication::vr +} // namespace replication + +#endif /* _VR_REPLICA_H_ */ diff --git a/replication/vr/tests/Rules.mk b/replication/vr/tests/Rules.mk new file mode 100644 index 0000000..fa29f87 --- /dev/null +++ b/replication/vr/tests/Rules.mk @@ -0,0 +1,10 @@ +d := $(dir $(lastword $(MAKEFILE_LIST))) + +GTEST_SRCS += $(d)vr-test.cc + +$(d)vr-test: $(o)vr-test.o \ + $(OBJS-vr-replica) $(OBJS-vr-client) \ + $(LIB-simtransport) \ + $(GTEST_MAIN) + +TEST_BINS += $(d)vr-test diff --git a/replication/vr/tests/vr-test.cc b/replication/vr/tests/vr-test.cc new file mode 100644 index 0000000..e61c8b6 --- /dev/null +++ b/replication/vr/tests/vr-test.cc @@ -0,0 +1,568 @@ +// -*- mode: c++; c-file-style: "k&r"; c-basic-offset: 4 -*- +/*********************************************************************** + * + * vr-test.cc: + * test cases for Viewstamped Replication protocol + * + * Copyright 2013 Dan R. K. Ports <drkp@cs.washington.edu> + * + * Permission is hereby granted, free of charge, to any person + * obtaining a copy of this software and associated documentation + * files (the "Software"), to deal in the Software without + * restriction, including without limitation the rights to use, copy, + * modify, merge, publish, distribute, sublicense, and/or sell copies + * of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be + * included in all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, + * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF + * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND + * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS + * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN + * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN + * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + * + **********************************************************************/ + +#include "lib/configuration.h" +#include "lib/message.h" +#include "lib/transport.h" +#include "lib/simtransport.h" + +#include "replication/common/client.h" +#include "replication/common/replica.h" +#include "replication/vr/client.h" +#include "replication/vr/replica.h" + +#include <stdlib.h> +#include <stdio.h> +#include <gtest/gtest.h> +#include <vector> +#include <sstream> + +static string replicaLastOp; +static string clientLastOp; +static string clientLastReply; + +using google::protobuf::Message; +using namespace replication; +using namespace replication::vr; +using namespace replication::vr::proto; + +class VRTest : public ::testing::TestWithParam<int> +{ +protected: + std::vector<VRReplica *> replicas; + VRClient *client; + SimulatedTransport *transport; + transport::Configuration *config; + std::vector<std::vector<string> > ops; + std::vector<std::vector<string> > unloggedOps; + int requestNum; + + virtual void SetUp() { + std::vector<transport::ReplicaAddress> replicaAddrs = + { { "localhost", "12345" }, + { "localhost", "12346" }, + { "localhost", "12347" }}; + config = new transport::Configuration(3, 1, replicaAddrs); + + transport = new SimulatedTransport(); + + ops.resize(config->n); + unloggedOps.resize(config->n); + + for (int i = 0; i < config->n; i++) { + replicas.push_back(new VRReplica(*config, i, transport, GetParam(), + [i,this](Replica *r, opnum_t opnum, const string &req, string &reply) { + ops[i].push_back(req); + reply = "reply: " + req; + }, + [i,this](Replica *r, const string &req, string &reply) { + unloggedOps[i].push_back(req); + reply = "unlreply: " + req; + })); + } + + client = new VRClient(*config, transport); + requestNum = -1; + + // Only let tests run for a simulated minute. This prevents + // infinite retry loops, etc. +// transport->Timer(60000, [&]() { +// transport->CancelAllTimers(); +// }); + } + + virtual string RequestOp(int n) { + std::ostringstream stream; + stream << "test: " << n; + return stream.str(); + } + + virtual string LastRequestOp() { + return RequestOp(requestNum); + } + + virtual void ClientSendNext(Client::continuation_t upcall) { + requestNum++; + client->Invoke(LastRequestOp(), upcall); + } + + virtual void ClientSendNextUnlogged(int idx, Client::continuation_t upcall, + Client::timeout_continuation_t timeoutContinuation = nullptr, + uint32_t timeout = Client::DEFAULT_UNLOGGED_OP_TIMEOUT) { + requestNum++; + client->InvokeUnlogged(idx, LastRequestOp(), upcall, timeoutContinuation, timeout); + } + + virtual void TearDown() { + for (auto x : replicas) { + delete x; + } + + replicas.clear(); + ops.clear(); + unloggedOps.clear(); + + delete client; + delete transport; + delete config; + } +}; + +TEST_P(VRTest, OneOp) +{ + auto upcall = [this](const string &req, const string &reply) { + EXPECT_EQ(req, LastRequestOp()); + EXPECT_EQ(reply, "reply: "+LastRequestOp()); + + // Not guaranteed that any replicas except the leader have + // executed this request. + EXPECT_EQ(ops[0].back(), req); + transport->CancelAllTimers(); + }; + + ClientSendNext(upcall); + transport->Run(); + + // By now, they all should have executed the last request. + for (int i = 0; i < config->n; i++) { + EXPECT_EQ(ops[i].size(), 1); + EXPECT_EQ(ops[i].back(), LastRequestOp()); + } +} + +TEST_P(VRTest, Unlogged) +{ + auto upcall = [this](const string &req, const string &reply) { + EXPECT_EQ(req, LastRequestOp()); + EXPECT_EQ(reply, "unlreply: "+LastRequestOp()); + + EXPECT_EQ(unloggedOps[1].back(), req); + transport->CancelAllTimers(); + }; + int timeouts = 0; + auto timeout = [&](const string &req) { + timeouts++; + }; + + ClientSendNextUnlogged(1, upcall, timeout); + transport->Run(); + + for (int i = 0; i < ops.size(); i++) { + EXPECT_EQ(0, ops[i].size()); + EXPECT_EQ((i == 1 ? 1 : 0), unloggedOps[i].size()); + } + EXPECT_EQ(0, timeouts); +} + +TEST_P(VRTest, UnloggedTimeout) +{ + auto upcall = [this](const string &req, const string &reply) { + FAIL(); + transport->CancelAllTimers(); + }; + int timeouts = 0; + auto timeout = [&](const string &req) { + timeouts++; + }; + + // Drop messages to or from replica 1 + transport->AddFilter(10, [](TransportReceiver *src, int srcIdx, + TransportReceiver *dst, int dstIdx, + Message &m, uint64_t &delay) { + if ((srcIdx == 1) || (dstIdx == 1)) { + return false; + } + return true; + }); + + // Run for 10 seconds + transport->Timer(10000, [&]() { + transport->CancelAllTimers(); + }); + + ClientSendNextUnlogged(1, upcall, timeout); + transport->Run(); + + for (int i = 0; i < ops.size(); i++) { + EXPECT_EQ(0, ops[i].size()); + EXPECT_EQ(0, unloggedOps[i].size()); + } + EXPECT_EQ(1, timeouts); +} + + +TEST_P(VRTest, ManyOps) +{ + Client::continuation_t upcall = [&](const string &req, const string &reply) { + EXPECT_EQ(req, LastRequestOp()); + EXPECT_EQ(reply, "reply: "+LastRequestOp()); + + // Not guaranteed that any replicas except the leader have + // executed this request. + EXPECT_EQ(ops[0].back(), req); + + if (requestNum < 9) { + ClientSendNext(upcall); + } else { + transport->CancelAllTimers(); + } + }; + + ClientSendNext(upcall); + transport->Run(); + + // By now, they all should have executed the last request. + for (int i = 0; i < config->n; i++) { + EXPECT_EQ(10, ops[i].size()); + for (int j = 0; j < 10; j++) { + EXPECT_EQ(RequestOp(j), ops[i][j]); + } + } +} + +TEST_P(VRTest, FailedReplica) +{ + Client::continuation_t upcall = [&](const string &req, const string &reply) { + EXPECT_EQ(req, LastRequestOp()); + EXPECT_EQ(reply, "reply: "+LastRequestOp()); + + // Not guaranteed that any replicas except the leader have + // executed this request. + EXPECT_EQ(ops[0].back(), req); + + if (requestNum < 9) { + ClientSendNext(upcall); + } else { + transport->CancelAllTimers(); + } + }; + + ClientSendNext(upcall); + + // Drop messages to or from replica 1 + transport->AddFilter(10, [](TransportReceiver *src, int srcIdx, + TransportReceiver *dst, int dstIdx, + Message &m, uint64_t &delay) { + if ((srcIdx == 1) || (dstIdx == 1)) { + return false; + } + return true; + }); + + transport->Run(); + + // By now, they all should have executed the last request. + for (int i = 0; i < config->n; i++) { + if (i == 1) { + continue; + } + EXPECT_EQ(10, ops[i].size()); + for (int j = 0; j < 10; j++) { + EXPECT_EQ(RequestOp(j), ops[i][j]); + } + } +} + +TEST_P(VRTest, StateTransfer) +{ + Client::continuation_t upcall = [&](const string &req, const string &reply) { + EXPECT_EQ(req, LastRequestOp()); + EXPECT_EQ(reply, "reply: "+LastRequestOp()); + + // Not guaranteed that any replicas except the leader have + // executed this request. + EXPECT_EQ(ops[0].back(), req); + + if (requestNum == 5) { + // Restore replica 1 + transport->RemoveFilter(10); + } + + if (requestNum < 9) { + ClientSendNext(upcall); + } else { + transport->CancelAllTimers(); + } + }; + + ClientSendNext(upcall); + + // Drop messages to or from replica 1 + transport->AddFilter(10, [](TransportReceiver *src, int srcIdx, + TransportReceiver *dst, int dstIdx, + Message &m, uint64_t &delay) { + if ((srcIdx == 1) || (dstIdx == 1)) { + return false; + } + return true; + }); + + transport->Run(); + + // By now, they all should have executed the last request. + for (int i = 0; i < config->n; i++) { + EXPECT_EQ(10, ops[i].size()); + for (int j = 0; j < 10; j++) { + EXPECT_EQ(RequestOp(j), ops[i][j]); + } + } +} + + +TEST_P(VRTest, FailedLeader) +{ + Client::continuation_t upcall = [&](const string &req, const string &reply) { + EXPECT_EQ(req, LastRequestOp()); + EXPECT_EQ(reply, "reply: "+LastRequestOp()); + + if (requestNum == 5) { + // Drop messages to or from replica 0 + transport->AddFilter(10, [](TransportReceiver *src, int srcIdx, + TransportReceiver *dst, int dstIdx, + Message &m, uint64_t &delay) { + if ((srcIdx == 0) || (dstIdx == 0)) { + return false; + } + return true; + }); + } + if (requestNum < 9) { + ClientSendNext(upcall); + } else { + transport->CancelAllTimers(); + } + }; + + ClientSendNext(upcall); + + transport->Run(); + + // By now, they all should have executed the last request. + for (int i = 0; i < config->n; i++) { + if (i == 0) { + continue; + } + EXPECT_EQ(10, ops[i].size()); + for (int j = 0; j < 10; j++) { + EXPECT_EQ(RequestOp(j), ops[i][j]); + } + } +} + +TEST_P(VRTest, DroppedReply) +{ + bool received = false; + Client::continuation_t upcall = [&](const string &req, const string &reply) { + EXPECT_EQ(req, LastRequestOp()); + EXPECT_EQ(reply, "reply: "+LastRequestOp()); + transport->CancelAllTimers(); + received = true; + }; + + // Drop the first ReplyMessage + bool dropped = false; + transport->AddFilter(10, [&dropped](TransportReceiver *src, int srcIdx, + TransportReceiver *dst, int dstIdx, + Message &m, uint64_t &delay) { + ReplyMessage r; + if (m.GetTypeName() == r.GetTypeName()) { + if (!dropped) { + dropped = true; + return false; + } + } + return true; + }); + ClientSendNext(upcall); + + transport->Run(); + + EXPECT_TRUE(received); + + // Each replica should have executed only one request + for (int i = 0; i < config->n; i++) { + EXPECT_EQ(1, ops[i].size()); + } +} + +TEST_P(VRTest, DroppedReplyThenFailedLeader) +{ + bool received = false; + Client::continuation_t upcall = [&](const string &req, const string &reply) { + EXPECT_EQ(req, LastRequestOp()); + EXPECT_EQ(reply, "reply: "+LastRequestOp()); + transport->CancelAllTimers(); + received = true; + }; + + // Drop the first ReplyMessage + bool dropped = false; + transport->AddFilter(10, [&dropped](TransportReceiver *src, int srcIdx, + TransportReceiver *dst, int dstIdx, + Message &m, uint64_t &delay) { + ReplyMessage r; + if (m.GetTypeName() == r.GetTypeName()) { + if (!dropped) { + dropped = true; + return false; + } + } + return true; + }); + + // ...and after we've done that, fail the leader altogether + transport->AddFilter(20, [&dropped](TransportReceiver *src, int srcIdx, + TransportReceiver *dst, int dstIdx, + Message &m, uint64_t &delay) { + if ((srcIdx == 0) || (dstIdx == 0)) { + return !dropped; + } + return true; + }); + + ClientSendNext(upcall); + + transport->Run(); + + EXPECT_TRUE(received); + + // Each replica should have executed only one request + // (and actually the faulty one should too, but don't check that) + for (int i = 0; i < config->n; i++) { + if (i != 0) { + EXPECT_EQ(1, ops[i].size()); + } + } +} + +TEST_P(VRTest, ManyClients) +{ + const int NUM_CLIENTS = 10; + const int MAX_REQS = 100; + + std::vector<VRClient *> clients; + std::vector<int> lastReq; + std::vector<Client::continuation_t> upcalls; + for (int i = 0; i < NUM_CLIENTS; i++) { + clients.push_back(new VRClient(*config, transport)); + lastReq.push_back(0); + upcalls.push_back([&, i](const string &req, const string &reply) { + EXPECT_EQ("reply: "+RequestOp(lastReq[i]), reply); + lastReq[i] += 1; + if (lastReq[i] < MAX_REQS) { + clients[i]->Invoke(RequestOp(lastReq[i]), upcalls[i]); + } + }); + clients[i]->Invoke(RequestOp(lastReq[i]), upcalls[i]); + } + + // This could take a while; simulate two hours + transport->Timer(7200000, [&]() { + transport->CancelAllTimers(); + }); + + transport->Run(); + + for (int i = 0; i < config->n; i++) { + ASSERT_EQ(NUM_CLIENTS * MAX_REQS, ops[i].size()); + } + + for (int i = 0; i < NUM_CLIENTS*MAX_REQS; i++) { + for (int j = 0; j < config->n; j++) { + ASSERT_EQ(ops[0][i], ops[j][i]); + } + } + + for (VRClient *c : clients) { + delete c; + } +} + +TEST_P(VRTest, Stress) +{ + const int NUM_CLIENTS = 10; + const int MAX_REQS = 100; + const int MAX_DELAY = 1; + const int DROP_PROBABILITY = 10; // 1/x + + std::vector<VRClient *> clients; + std::vector<int> lastReq; + std::vector<Client::continuation_t> upcalls; + for (int i = 0; i < NUM_CLIENTS; i++) { + clients.push_back(new VRClient(*config, transport)); + lastReq.push_back(0); + upcalls.push_back([&, i](const string &req, const string &reply) { + EXPECT_EQ("reply: "+RequestOp(lastReq[i]), reply); + lastReq[i] += 1; + if (lastReq[i] < MAX_REQS) { + clients[i]->Invoke(RequestOp(lastReq[i]), upcalls[i]); + } + }); + clients[i]->Invoke(RequestOp(lastReq[i]), upcalls[i]); + } + + srand(time(NULL)); + + // Delay messages from clients by a random amount, and drop some + // of them + transport->AddFilter(10, [=](TransportReceiver *src, int srcIdx, + TransportReceiver *dst, int dstIdx, + Message &m, uint64_t &delay) { + if (srcIdx == -1) { + delay = rand() % MAX_DELAY; + } + return ((rand() % DROP_PROBABILITY) != 0); + }); + + // This could take a while; simulate two hours + transport->Timer(7200000, [&]() { + transport->CancelAllTimers(); + }); + + transport->Run(); + + for (int i = 0; i < config->n; i++) { + ASSERT_EQ(NUM_CLIENTS * MAX_REQS, ops[i].size()); + } + + for (int i = 0; i < NUM_CLIENTS*MAX_REQS; i++) { + for (int j = 0; j < config->n; j++) { + ASSERT_EQ(ops[0][i], ops[j][i]); + } + } + + for (VRClient *c : clients) { + delete c; + } +} + +INSTANTIATE_TEST_CASE_P(Batching, + VRTest, + ::testing::Values(1, 8)); diff --git a/replication/vr/vr-proto.proto b/replication/vr/vr-proto.proto new file mode 100644 index 0000000..419f30e --- /dev/null +++ b/replication/vr/vr-proto.proto @@ -0,0 +1,94 @@ +import "replication/common/request.proto"; + +package replication.vr.proto; + +message RequestMessage { + required replication.Request req = 1; +} + +message ReplyMessage { + required uint64 view = 1; + required uint64 opnum = 2; + required uint64 clientreqid = 3; + required bytes reply = 4; +} + +message UnloggedRequestMessage { + required replication.UnloggedRequest req = 1; +} + +message UnloggedReplyMessage { + required bytes reply = 1; +} + +message PrepareMessage { + required uint64 view = 1; + required uint64 opnum = 2; + required uint64 batchstart = 3; + repeated Request request = 4; +} + +message PrepareOKMessage { + required uint64 view = 1; + required uint64 opnum = 2; + required uint32 replicaIdx = 3; +} + +message CommitMessage { + required uint64 view = 1; + required uint64 opnum = 2; +} + +message RequestStateTransferMessage { + required uint64 view = 1; + required uint64 opnum = 2; +} + +message StateTransferMessage { + message LogEntry { + required uint64 view = 1; + required uint64 opnum = 2; + required replication.Request request = 3; + optional uint32 state = 4; + optional bytes hash = 5; + } + required uint64 view = 1; + required uint64 opnum = 2; + repeated LogEntry entries = 3; +} + +message StartViewChangeMessage { + required uint64 view = 1; + required uint32 replicaIdx = 2; + required uint64 lastCommitted = 3; +} + +message DoViewChangeMessage { + message LogEntry { + required uint64 view = 1; + required uint64 opnum = 2; + required replication.Request request = 3; + optional uint32 state = 4; + optional bytes hash = 5; + } + required uint64 view = 1; + required uint64 lastNormalView = 2; + required uint64 lastOp = 3; + required uint64 lastCommitted = 4; + repeated LogEntry entries = 5; + required uint32 replicaIdx = 6; +} + +message StartViewMessage { + message LogEntry { + required uint64 view = 1; + required uint64 opnum = 2; + required replication.Request request = 3; + optional uint32 state = 4; + optional bytes hash = 5; + } + required uint64 view = 1; + required uint64 lastOp = 2; + required uint64 lastCommitted = 3; + repeated LogEntry entries = 4; +} -- GitLab