Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • syslab/tapir
  • aaasz/tapir
  • ashmrtnz/tapir
3 results
Show changes
Showing
with 3515 additions and 266 deletions
......@@ -33,49 +33,65 @@
#include "lib/assert.h"
#include "lib/message.h"
#include "lib/udptransport.h"
#include "lib/transport.h"
#include "replication/ir/client.h"
#include "store/common/promise.h"
#include "lockserver/locks-proto.pb.h"
#include <map>
#include <set>
#include <string>
#include <thread>
#include <random>
namespace lockserver {
class LockClient
{
public:
LockClient(const std::string &configPath);
LockClient(Transport* transport, const transport::Configuration &config);
~LockClient();
// Synchronously lock and unlock. Calling lock (or unlock) will block until
// the lock (or unlock) request is fully processed.
bool lock(const std::string &key);
void unlock(const std::string &key);
// Asynchronously lock and unlock. Calling lock_async or unlock_async will
// not block. Calling lock_wait (or unlock_wait) will block for the
// previous invocation of lock_async (or unlock_async) to complete.
//
// All async calls must be followed by a corresponding wait call. It is an
// error to issue multiple async requests without waiting. It is also
// erroneous to wait for a request which was never issued.
void lock_async(const std::string &key);
bool lock_wait();
void unlock_async(const std::string &key);
void unlock_wait();
private:
/* Unique ID for this client. */
uint64_t client_id;
/* Transport layer and thread. */
UDPTransport transport;
std::thread *clientTransport;
Transport *transport;
/* Function to run the transport thread. */
void run_client();
/* Decide function for a lock server. */
string Decide(const std::set<string> &results);
string Decide(const std::map<string, std::size_t> &results);
/* IR client proxy. */
replication::ir::IRClient *client;
/* Promise to wait for pending operation. */
Promise *waiting;
Promise *waiting = nullptr;
/* Callbacks for hearing back for an operation. */
void LockCallback(const std::string &, const std::string &);
void UnlockCallback(const std::string &, const std::string &);
void ErrorCallback(const std::string &, replication::ErrorCode);
};
} // namespace lockserver
......
syntax = "proto2";
package lockserver.proto;
message Request {
......
// -*- mode: c++; c-file-style: "k&r"; c-basic-offset: 4 -*-
/***********************************************************************
*
* lockserver-repl.cc: Step-by-step lock server evaluation.
*
* Copyright 2013 Dan R. K. Ports <drkp@cs.washington.edu>
*
* Permission is hereby granted, free of charge, to any person
* obtaining a copy of this software and associated documentation
* files (the "Software"), to deal in the Software without
* restriction, including without limitation the rights to use, copy,
* modify, merge, publish, distribute, sublicense, and/or sell copies
* of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be
* included in all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
* MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
* NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
* BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
* ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
* CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*
**********************************************************************/
#include <thread>
#include <memory>
#include "lib/configuration.h"
#include "lib/repltransport.h"
#include "lockserver/client.h"
#include "lockserver/server.h"
#include "replication/ir/replica.h"
int main() {
ReplTransport transport;
std::vector<transport::ReplicaAddress> replica_addrs = {
{"replica", "0"},
{"replica", "1"},
{"replica", "2"},
{"replica", "3"},
{"replica", "4"}};
transport::Configuration config(5 /* n */, 2 /* f */, replica_addrs);
// Clients.
lockserver::LockClient client_a(&transport, config);
lockserver::LockClient client_b(&transport, config);
lockserver::LockClient client_c(&transport, config);
client_a.lock_async("a");
client_b.lock_async("b");
client_c.lock_async("c");
// Servers.
std::vector<std::unique_ptr<lockserver::LockServer>> servers;
std::vector<std::unique_ptr<replication::ir::IRReplica>> replicas;
for (std::size_t i = 0; i < replica_addrs.size(); ++i) {
auto server = std::unique_ptr<lockserver::LockServer>(
new lockserver::LockServer());
servers.push_back(std::move(server));
auto replica = std::unique_ptr<replication::ir::IRReplica>(
new replication::ir::IRReplica(config, i, &transport,
servers[i].get()));
replicas.push_back(std::move(replica));
}
// Launch REPL.
transport.Run();
// Remove persisted files.
for (std::size_t i = 0; i < replica_addrs.size(); ++i) {
const transport::ReplicaAddress &addr = replica_addrs[i];
const std::string filename =
addr.host + ":" + addr.port + "_" + std::to_string(i) + ".bin";
int success = std::remove(filename.c_str());
ASSERT(success == 0);
}
}
#include "lockserver/server.h"
int
main(int argc, char **argv)
{
int index = -1;
const char *configPath = NULL;
// Parse arguments
int opt;
char *strtolPtr;
while ((opt = getopt(argc, argv, "c:i:")) != -1) {
switch (opt) {
case 'c':
configPath = optarg;
break;
case 'i':
index = strtol(optarg, &strtolPtr, 10);
if ((*optarg == '\0') || (*strtolPtr != '\0') || (index < 0)) {
fprintf(stderr, "option -i requires a numeric arg\n");
}
break;
default:
fprintf(stderr, "Unknown argument %s\n", argv[optind]);
}
}
if (!configPath) {
fprintf(stderr, "option -c is required\n");
return EXIT_FAILURE;
}
if (index == -1) {
fprintf(stderr, "option -i is required\n");
return EXIT_FAILURE;
}
// Load configuration
std::ifstream configStream(configPath);
if (configStream.fail()) {
fprintf(stderr, "unable to read configuration file: %s\n", configPath);
return EXIT_FAILURE;
}
transport::Configuration config(configStream);
if (index >= config.n) {
fprintf(stderr, "replica index %d is out of bounds; "
"only %d replicas defined\n", index, config.n);
return EXIT_FAILURE;
}
UDPTransport transport(0.0, 0.0, 0);
lockserver::LockServer server;
replication::ir::IRReplica replica(config, index, &transport, &server);
transport.Run();
return EXIT_SUCCESS;
}
......@@ -30,66 +30,9 @@
#include "lockserver/server.h"
int
main(int argc, char **argv)
{
int index = -1;
const char *configPath = NULL;
// Parse arguments
int opt;
char *strtolPtr;
while ((opt = getopt(argc, argv, "c:i:")) != -1) {
switch (opt) {
case 'c':
configPath = optarg;
break;
case 'i':
index = strtol(optarg, &strtolPtr, 10);
if ((*optarg == '\0') || (*strtolPtr != '\0') || (index < 0)) {
fprintf(stderr, "option -i requires a numeric arg\n");
}
break;
default:
fprintf(stderr, "Unknown argument %s\n", argv[optind]);
}
}
if (!configPath) {
fprintf(stderr, "option -c is required\n");
return EXIT_FAILURE;
}
if (index == -1) {
fprintf(stderr, "option -i is required\n");
return EXIT_FAILURE;
}
// Load configuration
std::ifstream configStream(configPath);
if (configStream.fail()) {
fprintf(stderr, "unable to read configuration file: %s\n", configPath);
return EXIT_FAILURE;
}
transport::Configuration config(configStream);
if (index >= config.n) {
fprintf(stderr, "replica index %d is out of bounds; "
"only %d replicas defined\n", index, config.n);
return EXIT_FAILURE;
}
UDPTransport transport(0.0, 0.0, 0);
lockserver::LockServer server;
replication::ir::IRReplica replica(config, index, &transport, &server);
transport.Run();
return EXIT_SUCCESS;
}
#include <algorithm>
#include <iterator>
#include <unordered_set>
namespace lockserver {
......@@ -173,4 +116,131 @@ LockServer::UnloggedUpcall(const string &str1, string &str2)
Debug("Unlogged: %s\n", str1.c_str());
}
void
LockServer::Sync(const std::map<opid_t, RecordEntry>& record) {
locks.clear();
struct KeyLockInfo {
std::unordered_set<uint64_t> locked;
std::unordered_set<uint64_t> unlocked;
};
std::unordered_map<std::string, KeyLockInfo> key_lock_info;
for (const std::pair<const opid_t, RecordEntry> &p : record) {
const opid_t &opid = p.first;
const RecordEntry &entry = p.second;
Request request;
request.ParseFromString(entry.request.op());
Reply reply;
reply.ParseFromString(entry.result);
KeyLockInfo &info = key_lock_info[request.key()];
Debug("Sync opid=(%lu, %lu), clientid=%lu, key=%s, type=%d, status=%d.",
opid.first, opid.second, request.clientid(),
request.key().c_str(), request.type(), reply.status());
if (request.type() && reply.status() == 0) {
// Lock.
info.locked.insert(request.clientid());
} else if (!request.type() && reply.status() == 0) {
// Unlock.
info.unlocked.insert(request.clientid());
}
}
for (const std::pair<const std::string, KeyLockInfo> &p : key_lock_info) {
const std::string &key = p.first;
const KeyLockInfo &info = p.second;
std::unordered_set<uint64_t> diff;
std::set_difference(std::begin(info.locked), std::end(info.locked),
std::begin(info.unlocked), std::end(info.unlocked),
std::inserter(diff, diff.begin()));
ASSERT(diff.size() == 0 || diff.size() == 1);
if (diff.size() == 1) {
uint64_t client_id = *std::begin(diff);
Debug("Assigning lock %lu: %s", client_id, key.c_str());
locks[key] = client_id;
}
}
}
std::map<opid_t, std::string>
LockServer::Merge(const std::map<opid_t, std::vector<RecordEntry>> &d,
const std::map<opid_t, std::vector<RecordEntry>> &u,
const std::map<opid_t, std::string> &majority_results_in_d) {
// First note that d and u only contain consensus operations, and lock
// requests are the only consensus operations (unlock is an inconsistent
// operation), so d and u only contain lock requests. To merge, we grant
// any majority successful lock request in d if it does not conflict with a
// currently held lock. We do not grant any other lock request.
std::map<opid_t, std::string> results;
using EntryVec = std::vector<RecordEntry>;
for (const std::pair<const opid_t, EntryVec>& p: d) {
const opid_t &opid = p.first;
const EntryVec &entries = p.second;
// Get the request and reply.
const RecordEntry &entry = *std::begin(entries);
Request request;
request.ParseFromString(entry.request.op());
Reply reply;
auto iter = majority_results_in_d.find(opid);
ASSERT(iter != std::end(majority_results_in_d));
reply.ParseFromString(iter->second);
// Form the final result.
const bool operation_successful = reply.status() == 0;
if (operation_successful) {
// If the lock was successful, then we acquire the lock so long as
// it is not already held.
const std::string &key = reply.key();
if (locks.count(key) == 0) {
Debug("Assigning lock %lu: %s", request.clientid(),
key.c_str());
locks[key] = request.clientid();
results[opid] = iter->second;
} else {
Debug("Rejecting lock %lu: %s", request.clientid(),
key.c_str());
reply.set_status(-1);
std::string s;
reply.SerializeToString(&s);
results[opid] = s;
}
} else {
// If the lock was not successful, then we maintain this as the
// majority result.
results[opid] = iter->second;
}
}
// We reject all lock requests in u. TODO: We could acquire a lock if
// it is free, but it's simplest to just reject them unilaterally.
for (const std::pair<const opid_t, EntryVec>& p: u) {
const opid_t &opid = p.first;
const EntryVec &entries = p.second;
const RecordEntry &entry = *std::begin(entries);
Request request;
request.ParseFromString(entry.request.op());
Debug("Rejecting lock %lu: %s", request.clientid(),
request.key().c_str());
Reply reply;
reply.set_key(request.key());
reply.set_status(-1);
std::string s;
reply.SerializeToString(&s);
results[opid] = s;
}
return results;
}
} // namespace lockserver
......@@ -31,15 +31,18 @@
#ifndef _IR_LOCK_SERVER_H_
#define _IR_LOCK_SERVER_H_
#include "lib/udptransport.h"
#include "replication/ir/replica.h"
#include "lockserver/locks-proto.pb.h"
#include <string>
#include <unordered_map>
#include "lib/transport.h"
#include "replication/ir/replica.h"
#include "lockserver/locks-proto.pb.h"
namespace lockserver {
using opid_t = replication::ir::opid_t;
using RecordEntry = replication::ir::RecordEntry;
class LockServer : public replication::ir::IRAppReplica
{
public:
......@@ -47,13 +50,22 @@ public:
~LockServer();
// Invoke inconsistent operation, no return value
void ExecInconsistentUpcall(const string &str1);
void ExecInconsistentUpcall(const string &str1) override;
// Invoke consensus operation
void ExecConsensusUpcall(const string &str1, string &str2);
void ExecConsensusUpcall(const string &str1, string &str2) override;
// Invoke unreplicated operation
void UnloggedUpcall(const string &str1, string &str2);
void UnloggedUpcall(const string &str1, string &str2) override;
// Sync
void Sync(const std::map<opid_t, RecordEntry>& record) override;
// Merge
std::map<opid_t, std::string> Merge(
const std::map<opid_t, std::vector<RecordEntry>> &d,
const std::map<opid_t, std::vector<RecordEntry>> &u,
const std::map<opid_t, std::string> &majority_results_in_d) override;
private:
std::unordered_map<std::string, uint64_t> locks;
......
d := $(dir $(lastword $(MAKEFILE_LIST)))
GTEST_SRCS += $(addprefix $(d), lockserver-test.cc)
$(d)lockserver-test: $(o)lockserver-test.o \
$(o)../locks-proto.o \
$(o)../server.o \
$(o)../client.o \
$(OBJS-ir-replica) \
$(OBJS-ir-client) \
$(LIB-configuration) \
$(LIB-repltransport) \
$(LIB-store-common) \
$(GTEST_MAIN)
TEST_BINS += $(d)lockserver-test
// -*- mode: c++; c-file-style: "k&r"; c-basic-offset: 4 -*-
/***********************************************************************
*
* lockserver_test.cc:
* test cases for lock server
*
* Copyright 2013 Dan R. K. Ports <drkp@cs.washington.edu>
*
* Permission is hereby granted, free of charge, to any person
* obtaining a copy of this software and associated documentation
* files (the "Software"), to deal in the Software without
* restriction, including without limitation the rights to use, copy,
* modify, merge, publish, distribute, sublicense, and/or sell copies
* of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be
* included in all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
* MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
* NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
* BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
* ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
* CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*
**********************************************************************/
#include <fstream>
#include <memory>
#include <thread>
#include <gtest/gtest.h>
#include "lib/configuration.h"
#include "lib/repltransport.h"
#include "lockserver/client.h"
#include "lockserver/server.h"
#include "replication/ir/replica.h"
class LockServerTest : public testing::Test {
protected:
std::vector<transport::ReplicaAddress> replica_addrs_;
std::unique_ptr<transport::Configuration> config_;
ReplTransport transport_;
std::vector<std::unique_ptr<lockserver::LockClient>> clients_;
std::vector<std::unique_ptr<lockserver::LockServer>> servers_;
std::vector<std::unique_ptr<replication::ir::IRReplica>> replicas_;
LockServerTest() {
replica_addrs_ = {{"replica", "0"},
{"replica", "1"},
{"replica", "2"},
{"replica", "3"},
{"replica", "4"}};
config_ = std::unique_ptr<transport::Configuration>(
new transport::Configuration(5, 2, replica_addrs_));
RemovePersistedFiles();
for (std::size_t i = 0; i < 3; ++i) {
auto client = std::unique_ptr<lockserver::LockClient>(
new lockserver::LockClient(&transport_, *config_));
client->lock_async(std::to_string(i));
clients_.push_back(std::move(client));
}
for (std::size_t i = 0; i < replica_addrs_.size(); ++i) {
auto server = std::unique_ptr<lockserver::LockServer>(
new lockserver::LockServer());
servers_.push_back(std::move(server));
auto replica = std::unique_ptr<replication::ir::IRReplica>(
new replication::ir::IRReplica(*config_, i, &transport_,
servers_[i].get()));
replicas_.push_back(std::move(replica));
}
}
virtual void TearDown() {
RemovePersistedFiles();
}
virtual void RemovePersistedFiles() {
for (std::size_t i = 0; i < replica_addrs_.size(); ++i) {
const transport::ReplicaAddress &addr = replica_addrs_[i];
const std::string filename =
addr.host + ":" + addr.port + "_" + std::to_string(i) + ".bin";
std::ifstream f(filename);
if (f.good()) {
int success = std::remove(filename.c_str());
ASSERT(success == 0);
}
}
}
};
// Note that these tests are all white box smoke tests. They depend on the
// low-level details of knowing exactly which timeouts are registered and which
// messages are sent. If an implementation detail is changed to make some of
// these tests fail, you should cal transport_.Run() and walk through the
// execution to trigger the desired behavior. Also, they only check to make
// sure that nothing crashes, though you can read through the Debug prints to
// make sure everything looks right.
//
// TODO: Use a ReplTransport for tests like the ones in ir-test.cc to assert
// that the correct messages are being sent.
TEST_F(LockServerTest, SuccessfulFastPathLock) {
// Send client 0's lock request.
transport_.TriggerTimer(1);
// Deliver lock request to replicas.
for (const auto &addr : replica_addrs_) {
transport_.DeliverMessage({addr.host, addr.port}, 0);
}
// Deliver lock reply to client.
for (std::size_t i = 0; i < replica_addrs_.size(); ++i) {
transport_.DeliverMessage({"client", "0"}, i);
}
// Deliver finalize to replicas.
for (const auto &addr : replica_addrs_) {
transport_.DeliverMessage({addr.host, addr.port}, 1);
}
// Deliver confirm to client.
int j = replica_addrs_.size();
for (std::size_t i = j; i < j + replica_addrs_.size(); ++i) {
transport_.DeliverMessage({"client", "0"}, i);
}
}
TEST_F(LockServerTest, SuccessfulSlowPathLock) {
// Send client 0's lock request.
transport_.TriggerTimer(1);
// Transition to slow path.
transport_.TriggerTimer(clients_.size() + replica_addrs_.size() + 1);
// Deliver lock request to replicas.
for (const auto &addr : replica_addrs_) {
transport_.DeliverMessage({addr.host, addr.port}, 0);
}
// Deliver lock reply to client.
for (std::size_t i = 0; i < replica_addrs_.size(); ++i) {
transport_.DeliverMessage({"client", "0"}, i);
}
// Deliver finalize to replicas.
for (const auto &addr : replica_addrs_) {
transport_.DeliverMessage({addr.host, addr.port}, 1);
}
// Deliver confirm to client.
int j = replica_addrs_.size();
for (std::size_t i = j; i < j + replica_addrs_.size(); ++i) {
transport_.DeliverMessage({"client", "0"}, i);
}
}
TEST_F(LockServerTest, SuccessfulViewChange) {
// Send client 0's lock request.
transport_.TriggerTimer(1);
// Deliver lock request to replicas.
for (const auto &addr : replica_addrs_) {
transport_.DeliverMessage({addr.host, addr.port}, 0);
}
// Initiate view changes on all replicas.
const std::size_t nclients = clients_.size();
const std::size_t nreplicas = replica_addrs_.size();
for (std::size_t i = nclients + 1; i < nclients + nreplicas + 1; ++i) {
transport_.TriggerTimer(i);
}
// Deliver DoViewChangeMessages to new primary.
const transport::ReplicaAddress& primary = replica_addrs_[1];
for (std::size_t i = 1; i < 1 + nreplicas - 1; ++i) {
transport_.DeliverMessage({primary.host, primary.port}, i);
}
// Deliver StartViewMessage to all replicas.
for (std::size_t i = 0; i < nreplicas; ++i) {
if (i == 1) {
continue;
}
const transport::ReplicaAddress& addr = replica_addrs_[i];
transport_.DeliverMessage({addr.host, addr.port}, nreplicas);
}
}
TEST_F(LockServerTest, SuccessfulViewChangeNonemptyRdu) {
const std::size_t nclients = clients_.size();
const std::size_t nreplicas = replica_addrs_.size();
ASSERT_GE(nclients, 3);
ASSERT_GE(nreplicas, 3);
// Send client 0's lock request.
transport_.TriggerTimer(1);
// Deliver lock request to replicas.
for (const auto &addr : replica_addrs_) {
transport_.DeliverMessage({addr.host, addr.port}, 0);
}
// Deliver lock reply to client.
for (std::size_t i = 0; i < replica_addrs_.size(); ++i) {
transport_.DeliverMessage({"client", "0"}, i);
}
// Deliver finalize to replicas.
for (const auto &addr : replica_addrs_) {
transport_.DeliverMessage({addr.host, addr.port}, 1);
}
// Send client 1's lock request.
transport_.TriggerTimer(2);
// Deliver lock request to first three replicas.
for (std::size_t i = 0; i < 3; ++i) {
const transport::ReplicaAddress &addr = replica_addrs_[i];
transport_.DeliverMessage({addr.host, addr.port}, 2);
}
// Send client 2's lock request.
transport_.TriggerTimer(3);
// Deliver lock request to first replica.
const transport::ReplicaAddress &addr = replica_addrs_[0];
transport_.DeliverMessage({addr.host, addr.port}, 3);
// View change first three replicas.
for (std::size_t i = nclients + 1; i < nclients + 1 + 3; ++i) {
transport_.TriggerTimer(i);
}
// Deliver DoViewChangeMessages to new primary.
const transport::ReplicaAddress& primary = replica_addrs_[1];
for (std::size_t i = 4; i < 4 + 2; ++i) {
transport_.DeliverMessage({primary.host, primary.port}, i);
}
// Deliver StartViewMessage to replica 0 and 2.
const transport::ReplicaAddress& addr0 = replica_addrs_[0];
const transport::ReplicaAddress& addr2 = replica_addrs_[2];
transport_.DeliverMessage({addr0.host, addr0.port}, 6);
transport_.DeliverMessage({addr2.host, addr2.port}, 6);
}
TEST_F(LockServerTest, FinalizeConsensusReply) {
const std::size_t nclients = clients_.size();
const std::size_t nreplicas = replica_addrs_.size();
// Send client 0's lock request.
transport_.TriggerTimer(1);
// Deliver lock request to replicas.
for (const auto &addr : replica_addrs_) {
transport_.DeliverMessage({addr.host, addr.port}, 0);
}
// Trigger view change.
for (std::size_t i = nclients + 1; i < nclients + 1 + nreplicas; ++i) {
transport_.TriggerTimer(i);
}
// Deliver DoViewChangeMessages to new primary.
const transport::ReplicaAddress& primary = replica_addrs_[1];
for (std::size_t i = 1; i < 1 + nreplicas - 1; ++i) {
transport_.DeliverMessage({primary.host, primary.port}, i);
}
// Deliver StartViewMessage to all replicas.
for (std::size_t i = 0; i < nreplicas; ++i) {
if (i == 1) {
continue;
}
const transport::ReplicaAddress& addr = replica_addrs_[i];
transport_.DeliverMessage({addr.host, addr.port}, nreplicas);
}
// Deliver lock request to replicas.
for (const auto &addr : replica_addrs_) {
transport_.DeliverMessage({addr.host, addr.port}, 0);
}
// Deliver finalized reply to client.
transport_.DeliverMessage({"client", "0"}, nreplicas);
}
TEST_F(LockServerTest, MismatchedConsensus) {
const std::size_t nclients = clients_.size();
const std::size_t nreplicas = replica_addrs_.size();
// Send client 0's lock request.
transport_.TriggerTimer(1);
// Transition to slow path.
transport_.TriggerTimer(nclients + nreplicas + 1);
// Deliver lock request to replicas.
for (const auto &addr : replica_addrs_) {
transport_.DeliverMessage({addr.host, addr.port}, 0);
}
// Deliver lock reply to client.
for (std::size_t i = 0; i < replica_addrs_.size(); ++i) {
transport_.DeliverMessage({"client", "0"}, i);
}
// Trigger view change.
for (std::size_t i = nclients + 1; i < nclients + 1 + nreplicas; ++i) {
transport_.TriggerTimer(i);
}
// Deliver DoViewChangeMessages to new primary.
const transport::ReplicaAddress& primary = replica_addrs_[1];
for (std::size_t i = 2; i < 2 + nreplicas - 1; ++i) {
transport_.DeliverMessage({primary.host, primary.port}, i);
}
// Deliver StartViewMessage to all replicas.
for (std::size_t i = 0; i < nreplicas; ++i) {
if (i == 1) {
continue;
}
const transport::ReplicaAddress& addr = replica_addrs_[i];
transport_.DeliverMessage({addr.host, addr.port}, 2 + nreplicas - 1);
}
// Deliver FinalizeConsensusMessage to all replicas.
for (const auto &addr : replica_addrs_) {
transport_.DeliverMessage({addr.host, addr.port}, 1);
}
// Deliver ConfirmMessages to client 0.
for (std::size_t i = nreplicas; i < nreplicas + nreplicas; ++i) {
transport_.DeliverMessage({"client", "0"}, i);
}
}
This diff is collapsed.
This diff is collapsed.
------------------------------- MODULE TAPIR -------------------------------
(***************************************************************************)
(* This is a TLA+ specification of the TAPIR algorithm. *)
(***************************************************************************)
EXTENDS FiniteSets, Naturals, TLC, TLAPS
Max(S) == IF S = {} THEN 0 ELSE CHOOSE i \in S: \A j \in S: j <= i
(***************************************************************************)
(* TAPIR constants: *)
(* 1. Shards: function from shard id to set of replica ids in the shard *)
(* 2. Transactions: set of all possible transactions *)
(* 3. nr_shards: number of shards *)
(***************************************************************************)
CONSTANTS Shards, Transactions, NrShards
\* Note: assume unique number ids for replicas
(***************************************************************************)
(* IR constants & variables (description in the IR module) *)
(***************************************************************************)
CONSTANTS Clients, Quorums, SuperQuorums,
max_vc, max_req, f
VARIABLES rState, rRecord, rCrtView, rLastView, rViewReplies, rNonce,
rViewOnDisk,
sentMsg, cCrtOp,
cCrtOpToFinalize, cMsgCounter, cCrtOpReplies, cCrtOpConfirms,
cState, aSuccessful, arRecord, aVisibility, gViewChangesNo
irReplicaVars == <<rState, rRecord, rCrtView, rViewOnDisk, rLastView,
rViewReplies, rNonce>>
irClientVars == <<cCrtOp, \* current operation at a client
cCrtOpReplies, \* current operation replies
cMsgCounter,
cState,
cCrtOpToFinalize,
cCrtOpConfirms>> \* Client variables.
irAppVars == <<aSuccessful, arRecord, aVisibility>> \* Application variables.
irOtherVars == <<sentMsg, gViewChangesNo>> \* Other variables.
IRMessageId == [cid: Clients, msgid: Nat]
(***************************************************************************)
(* TAPIR Variables/State: *)
(* 1. State at each replica: *)
(* rPrepareTxns = List of txns this replica is prepared *)
(* to commit *)
(* rTxnsLog = Log of committed and aborted txns in ts order *)
(* rStore = Versioned store *)
(* rBkpTable = Table of txns for which this replica *)
(* is the bkp coordinator *)
(* 2. State of communication medium: *)
(* sentMsg = sent (and duplicate) messages *)
(* 3. State at client: *)
(* cCrtTxn = crt txn requested by the client *)
(* *)
(***************************************************************************)
\* TAPIR variables & data structures
VARIABLES rPreparedTxns, rStore, rTxnsLogAborted, rTxnsLogCommitted,
rClock, cCrtTxn, cClock
tapirReplicaVars == <<rPreparedTxns, rStore, rTxnsLogAborted,
rTxnsLogCommitted,
rClock>>
tapirClientVars == <<cCrtTxn, cClock>>
vars == <<irReplicaVars, irClientVars, irAppVars, irOtherVars,
tapirReplicaVars, tapirClientVars>>
StoreEntry == [vs: Nat, val: Nat] \* vs = version
Store == [key: Nat,
entries: SUBSET StoreEntry,
latestVs: Nat,
latestVal: Nat]
TransactionTs == [cid: Clients, clock: Nat] \* Timestamp
ReadSet == [key: Nat, val: Nat, vs: Nat]
WriteSet == [key: Nat, val: Nat]
Transaction == [rSet: SUBSET ReadSet,
wSet: SUBSET WriteSet,
shards: SUBSET Nat]
TypeOK ==
/\ rStore \in [UNION {Shards[i]: i \in 1..NrShards} -> SUBSET Store]
/\ rPreparedTxns \in [UNION {Shards[i]: i \in 1..NrShards} -> SUBSET Transaction]
/\ rTxnsLogAborted \in [UNION {Shards[i]: i \in 1..NrShards} -> SUBSET Transaction]
/\ rTxnsLogCommitted \in [UNION {Shards[i]: i \in 1..NrShards} -> SUBSET Transaction]
TAPIRResults == {"Prepare-OK", "Retry", "Prepare-Abstain", "Abort"}
TAPIROpType == {"Prepare", "ABORT", "COMMIT"}
TAPIROpBody == [opType : TAPIROpType, txn: Transaction]
TAPIRClientFail == TRUE \* state we lose at the app level
TAPIRReplicaFail == TRUE \* state we lose at the app level
\* TAPIR implementation of IR interface
TAPIRExecInconsistent(op) == TRUE
TAPIRExecConsensus(op) == IF op.type = "Consensus" THEN "Prepare-OK" ELSE "Abort"
TAPIRDecide(results) == "Prepare-OK"
TAPIRMerge(R, d, u) == R \cup d \cup
{[msgid |-> x.msgid, op |-> x.op, res |-> "Prepare-OK"]: x \in u}
TAPIRSync(records) == TRUE
TAPIRSuccessfulInconsistentOp(c, S, op) == TRUE
TAPIRSuccessfulConsensusOp(c, S, op, res) == TRUE
\* Initialize for all shards
InitIR ==
/\ rState = [s \in 1..NrShards |-> [r \in Shards[s] |-> "NORMAL"]]
/\ rRecord = [s \in 1..NrShards |-> [r \in Shards[s] |-> {}]]
/\ rCrtView = [s \in 1..NrShards |-> [r \in Shards[s] |-> 0]]
/\ rViewOnDisk = [s \in 1..NrShards |-> [r \in Shards[s] |-> 0]]
/\ rLastView = [s \in 1..NrShards |-> [r \in Shards[s] |-> 0]]
/\ rViewReplies = [s \in 1..NrShards |-> [r \in Shards[s] |-> {}]]
/\ rNonce = [s \in 1..NrShards |-> [r \in Shards[s] |-> 0]]
/\ sentMsg = [s \in 1..NrShards |-> {}]
/\ cCrtOp = [s \in 1..NrShards |-> [c \in Clients |-> <<>>]]
/\ cCrtOpToFinalize = [s \in 1..NrShards |-> [c \in Clients |-> <<>>]]
/\ cMsgCounter = [s \in 1..NrShards |-> [c \in Clients |-> 0]]
/\ cCrtOpReplies = [s \in 1..NrShards |-> [c \in Clients |-> {}]]
/\ cCrtOpConfirms = [s \in 1..NrShards |-> [c \in Clients |-> {}]]
/\ cState = [c \in Clients |-> "NORMAL"]
/\ aSuccessful = {}
/\ arRecord = [s \in 1..NrShards |-> [r \in Shards[s] |-> {}]]
/\ aVisibility = [s \in 1..NrShards |-> [o \in IRMessageId |-> {}]]
/\ gViewChangesNo = [s \in 1..NrShards |-> 0]
\* IR instance per shard TODO: modify replica also
IR(s) == INSTANCE IR_consensus WITH AppClientFail <- TAPIRClientFail,
AppReplicaFail <- TAPIRReplicaFail,
OpBody <- TAPIROpBody,
ExecInconsistent <- TAPIRExecInconsistent,
ExecConsensus <- TAPIRExecConsensus,
Merge <- TAPIRMerge,
Sync <- TAPIRSync,
SuccessfulInconsistentOp <- TAPIRSuccessfulInconsistentOp,
SuccessfulConsensusOp <- TAPIRSuccessfulConsensusOp,
Decide <- TAPIRDecide,
Results <- TAPIRResults,
Replicas <- Shards[s],
Quorums <- Quorums[s],
SuperQuorums <- SuperQuorums[s],
S <- s
\* TAPIR messages
Message ==
[type: {"READ"},
key: Nat,
dst: UNION Shards]
\cup
[type: {"READ-REPLY"},
key: Nat,
val: Nat,
vs: Nat, \* version
dst: Clients]
\cup
[type: {"READ-VERSION"},
key: Nat,
vs: Nat,
dst: UNION Shards]
\cup
[type: {"READ-VERSION-REPLY"},
key: Nat,
vs: Nat,
dst: Clients]
InitTAPIR == /\ cCrtTxn = [c \in Clients |-> <<>>]
/\ cClock = [c \in Clients |-> 0]
/\ rPreparedTxns = [s \in 1..NrShards |-> [r \in Shards[s] |-> {}]]
/\ rStore = [r \in UNION {Shards[i]: i \in 1..NrShards} |-> {}]
/\ rTxnsLogAborted = [s \in 1..NrShards |-> [r \in Shards[s] |-> {}]]
/\ rTxnsLogCommitted = [s \in 1..NrShards |-> [r \in Shards[s] |-> {}]]
/\ rClock = [s \in 1..NrShards |-> [r \in Shards[s] |-> 0]]
Init == InitIR /\ InitTAPIR
-----------------------------------------------------------------------------
(***************************************************************************)
(* `^ \center \large{\textbf{Tapir replica actions}} ^' *)
(***************************************************************************)
\*TAPIRReplicaReceiveRead(r) == TRUE
\*TAPIRReplicaAction(r) ==
\* \/ /\ rState[r] = "NORMAL"
\* /\ \/ TAPIRReplicaReceiveRead(r)
-----------------------------------------------------------------------------
(***************************************************************************)
(* `^ \center \large{\textbf{Tapir client actions}} ^' *)
(***************************************************************************)
TAPIRClientExecuteTxn(c) ==
\* first, resolve all reads (read from any replica and get the vs)
\* then send prepares in all shard involved by seting the cCrtOp in the
\* respective IR shard instance
\* TODO: for now just simulate this, pick a transaction from
\* transaction pool, get some versions from the replica
\* stores
/\ cCrtTxn[c] = <<>>
/\ \E t \in Transactions:
LET rSet == {rse \in ReadSet:
/\ \E trse \in t.rSet : rse = trse
/\ LET
r == Max({r \in Shards[(rse.key % NrShards) + 1]:
\E se \in rStore[r]: rse.key = se.key})
IN
/\ r /= 0
/\ \E se \in rStore[r]:
/\ rse.key = se.key
/\ rse.val = se.latestVal
/\ rse.vs = se.latestVs
}
shards == {s \in 1..NrShards:
\/ \E trse \in t.rSet: s = (trse.key % NrShards) + 1
\/ \E twse \in t.wSet: s = (twse.key % NrShards) + 1 }
IN
/\ Cardinality(rSet) = Cardinality(t.rSet) \* found all the reads
/\ cCrtTxn' = [cCrtTxn EXCEPT ![c] = [rSet |-> rSet,
wSet |-> t.wSet,
shards |-> shards]]
/\ UNCHANGED <<irReplicaVars, irClientVars, irOtherVars, irAppVars,
tapirReplicaVars, cClock>>
TAPIRClientPrepareTxn(c) ==
/\ cCrtTxn[c] /= <<>>
/\ \E s \in cCrtTxn[c].shards: \* prepare in shard s
\* - ok if already prepared
/\ IR(s)!ClientRequest(c, [type |-> "Consensus",
body |-> [opType |-> "Prepare",
txn |-> cCrtTxn[c]]])
/\ UNCHANGED <<irReplicaVars, irAppVars,
cCrtOpReplies,
cCrtOpConfirms,
cCrtOpToFinalize,
gViewChangesNo,
cState, tapirClientVars, tapirReplicaVars>>
TAPIRClientAction(c) ==
\/ /\ cState[c] = "NORMAL"
/\ \/ TAPIRClientExecuteTxn(c) \* for now just simulate this
\* (don't send explicit READ messages)
\/ TAPIRClientPrepareTxn(c)
-----------------------------------------------------------------------------
(***************************************************************************)
(* `^ \center \large{\textbf{High-Level Actions}} ^' *)
(***************************************************************************)
Next ==
\/ \E c \in Clients: TAPIRClientAction(c)
\/ /\ \E s \in 1..NrShards: IR(s)!Next
/\ UNCHANGED <<tapirClientVars, tapirReplicaVars>>
\/ \* Avoid deadlock by termination
((\A s \in 1..NrShards:
\A i \in 1..Cardinality(Shards[s]):
rLastView[s][i] = max_vc) /\ UNCHANGED <<vars>>)
Inv == IR(1)!TypeOK /\ IR(1)!FaultTolerance
=============================================================================
\* Modification History
\* Last modified Mon Aug 31 12:55:38 PDT 2015 by aaasz
\* Created Sat Jan 31 18:31:52 PST 2015 by aaasz
--------------------------------- MODULE Test ---------------------------------
(***************************************************************************)
(* This is a TLA+ specification of the Inconsistent Replication algorithm. *)
(* (And a mechanically-checked proof of its correctness using TLAPS) *)
(***************************************************************************)
EXTENDS Naturals, FiniteSets, TLC
VARIABLES rViewReplies, recoveredOps
OpType == {"Inconsistent", "Consensus"}
OpStatus == {"TENTATIVE", "FINALIZED"}
Operations == [type: OpType, body: Nat]
TypeOK ==
/\ rViewReplies \in SUBSET ([lv: Nat,
r: SUBSET ([msgid: Nat,
op: Operations,
res: Nat,
status: OpStatus]
\cup [msgid: Nat,
op: Operations,
status: OpStatus]),
src: Nat])
A ==
rViewReplies
B ==
\* set of all records received in replies in A
UNION {x.r: x \in A}
test_recoveredConensusOps_R ==
\* any finalized consensus operation (in at least one record, in the maximum
\* latest view)
{x \in B:
/\ x.op.type = "Consensus"
/\ x.status = "FINALIZED"
/\ LET most_updated_reply ==
CHOOSE reply \in A:
/\ \E rec \in reply.r: /\ rec.msgid = x.msgid
/\ rec.status = "FINALIZED"
/\ \A rep \in A:
IF \E rec \in rep.r: /\ rec.msgid = x.msgid
/\ rec.status = "FINALIZED"
THEN rep.lv <= reply.lv
ELSE TRUE
IN
x \in most_updated_reply.r}
Init ==
/\ rViewReplies = {[lv |-> 1, r |-> {[msgid |-> 1, op |-> [type |-> "Consensus", body |-> 1], res |-> 1, status |-> "FINALIZED"]}, src |-> 1],
[lv |-> 2, r |-> {[msgid |-> 1, op |-> [type |-> "Consensus", body |-> 1], res |-> 2, status |-> "FINALIZED"]}, src |-> 2],
[lv |-> 3, r |-> {[msgid |-> 1, op |-> [type |-> "Consensus", body |-> 1], res |-> 3, status |-> "FINALIZED"]}, src |-> 3]}
/\ recoveredOps = {}
Next ==
/\ recoveredOps' = test_recoveredConensusOps_R
/\ Assert(Cardinality(recoveredOps) = 0, "Should fail")
/\ UNCHANGED <<rViewReplies>>
=============================================================================
\* Modification History
\* Last modified Fri Apr 24 14:34:42 PDT 2015 by aaasz
\* Created Fri Dec 12 17:42:14 PST 2014 by aaasz
This diff is collapsed.
Quorums <- {{1,2}, {1,3}, {2,3}}
Clients <- {1}
max_req <- 1
f <- 1
max_vc <- 3
Operations <- {1}
Replicas <- {1,2,3}
max_c <- 3
......@@ -36,7 +36,19 @@
#include <random>
namespace replication {
std::string ErrorCodeToString(ErrorCode err) {
switch (err) {
case ErrorCode::TIMEOUT:
return "TIMEOUT";
case ErrorCode::MISMATCHED_CONSENSUS_VIEWS:
return "MISMATCHED_CONSENSUS_VIEWS";
default:
Assert(false);
return "";
}
}
Client::Client(const transport::Configuration &config, Transport *transport,
uint64_t clientid)
: config(config), transport(transport)
......@@ -61,7 +73,7 @@ Client::~Client()
{
}
void
Client::ReceiveMessage(const TransportAddress &remote,
const string &type, const string &data)
......@@ -69,5 +81,5 @@ Client::ReceiveMessage(const TransportAddress &remote,
Panic("Received unexpected message type: %s",
type.c_str());
}
} // namespace replication
......@@ -42,32 +42,54 @@
namespace replication {
// A client's request may fail for various reasons. For example, if enough
// replicas are down, a client's request may time out. An ErrorCode indicates
// the reason that a client's request failed.
enum class ErrorCode {
// For whatever reason (failed replicas, slow network), the request took
// too long and timed out.
TIMEOUT,
// For IR, if a client issues a consensus operation and receives a majority
// of replies and confirms in different views, then the operation fails.
MISMATCHED_CONSENSUS_VIEWS
};
std::string ErrorCodeToString(ErrorCode err);
class Client : public TransportReceiver
{
public:
typedef std::function<void (const string &, const string &)> continuation_t;
typedef std::function<void (const string &)> timeout_continuation_t;
using continuation_t =
std::function<void(const string &request, const string &reply)>;
using error_continuation_t =
std::function<void(const string &request, ErrorCode err)>;
static const uint32_t DEFAULT_UNLOGGED_OP_TIMEOUT = 1000; // milliseconds
Client(const transport::Configuration &config, Transport *transport,
uint64_t clientid = 0);
virtual ~Client();
virtual void Invoke(const string &request,
continuation_t continuation) = 0;
virtual void InvokeUnlogged(int replicaIdx,
const string &request,
continuation_t continuation,
timeout_continuation_t timeoutContinuation = nullptr,
uint32_t timeout = DEFAULT_UNLOGGED_OP_TIMEOUT) = 0;
virtual void Invoke(
const string &request,
continuation_t continuation,
error_continuation_t error_continuation = nullptr) = 0;
virtual void InvokeUnlogged(
int replicaIdx,
const string &request,
continuation_t continuation,
error_continuation_t error_continuation = nullptr,
uint32_t timeout = DEFAULT_UNLOGGED_OP_TIMEOUT) = 0;
virtual void ReceiveMessage(const TransportAddress &remote,
const string &type,
const string &data);
protected:
transport::Configuration config;
Transport *transport;
uint64_t clientid;
};
......
......@@ -35,7 +35,7 @@
#define _COMMON_QUORUMSET_H_
namespace replication {
template <class IDTYPE, class MSGTYPE>
class QuorumSet
{
......@@ -43,9 +43,9 @@ public:
QuorumSet(int numRequired)
: numRequired(numRequired)
{
}
void
Clear()
{
......@@ -80,7 +80,20 @@ public:
return &vsmessages;
} else {
return NULL;
}
}
}
const std::map<int, MSGTYPE> *
CheckForQuorum()
{
for (const auto &p : messages) {
const IDTYPE &vs = p.first;
const std::map<int, MSGTYPE> *quorum = CheckForQuorum(vs);
if (quorum != nullptr) {
return quorum;
}
}
return nullptr;
}
const std::map<int, MSGTYPE> *
......@@ -98,7 +111,7 @@ public:
}
vsmessages[replicaIdx] = msg;
return CheckForQuorum(vs);
}
......@@ -107,7 +120,7 @@ public:
{
AddAndCheckForQuorum(vs, replicaIdx, msg);
}
public:
int numRequired;
private:
......
syntax = "proto2";
package replication;
message Request {
......
......@@ -12,7 +12,7 @@ OBJS-ir-client := $(o)ir-proto.o $(o)client.o \
OBJS-ir-replica := $(o)record.o $(o)replica.o $(o)ir-proto.o \
$(OBJS-replica) $(LIB-message) \
$(LIB-configuration)
$(LIB-configuration) $(LIB-persistent_register)
include $(d)tests/Rules.mk
This diff is collapsed.