Newer
Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
// -*- mode: c++; c-file-style: "k&r"; c-basic-offset: 4 -*-
/***********************************************************************
*
* ir-test.cc:
* test cases for Inconsistent Replication Protocol
*
* Copyright 2013 Dan R. K. Ports <drkp@cs.washington.edu>
* Copyright 2015 Irene Zhang Ports <iyzhang@cs.washington.edu>
*
* Permission is hereby granted, free of charge, to any person
* obtaining a copy of this software and associated documentation
* files (the "Software"), to deal in the Software without
* restriction, including without limitation the rights to use, copy,
* modify, merge, publish, distribute, sublicense, and/or sell copies
* of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be
* included in all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
* MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
* NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
* BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
* ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
* CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*
**********************************************************************/
#include "lib/configuration.h"
#include "lib/message.h"
#include "lib/transport.h"
#include "lib/simtransport.h"
#include "replication/common/client.h"
#include "replication/common/replica.h"
#include "replication/ir/client.h"
#include "replication/ir/replica.h"
#include <stdlib.h>
#include <stdio.h>
#include <gtest/gtest.h>
#include <vector>
#include <set>
using google::protobuf::Message;
using namespace replication;
using namespace replication::ir;
using namespace replication::ir::proto;
class IRApp : public IRAppReplica {
std::vector<string> *iOps;
std::vector<string> *cOps;
std::vector<string> *unloggedOps;
public:
IRApp(std::vector<string> *i, std::vector<string> *c,
std::vector<string> *u)
: iOps(i), cOps(c), unloggedOps(u) {}
void ExecInconsistentUpcall(const string &req) {
iOps->push_back(req);
}
void ExecConsensusUpcall(const string &req, string &reply) {
cOps->push_back(req);
void UnloggedUpcall(const string &req, string &reply) {
unloggedOps->push_back(req);
reply = "unlreply: " + req;
}
};
class IRTest : public ::testing::Test
std::vector<transport::ReplicaAddress> replicaAddrs;
std::unique_ptr<transport::Configuration> config;
SimulatedTransport transport;
std::vector<std::unique_ptr<IRApp>> apps;
std::vector<std::unique_ptr<IRReplica>> replicas;
std::unique_ptr<IRClient> client;
std::vector<std::vector<string> > iOps;
std::vector<std::vector<string> > cOps;
std::vector<std::vector<string> > unloggedOps;
int requestNum;
IRTest() : requestNum(-1) {
replicaAddrs = {{"localhost", "12345"},
{"localhost", "12346"},
{"localhost", "12347"}};
config = std::unique_ptr<transport::Configuration>(
new transport::Configuration(3, 1, replicaAddrs));
iOps.resize(config->n);
cOps.resize(config->n);
unloggedOps.resize(config->n);
for (int i = 0; i < config->n; i++) {
auto ir_app = std::unique_ptr<IRApp>(
new IRApp(&iOps[i], &cOps[i], &unloggedOps[i]));
auto p = std::unique_ptr<IRReplica>(
new IRReplica(*config, i, &transport, ir_app.get()));
apps.push_back(std::move(ir_app));
replicas.push_back(std::move(p));
client = std::unique_ptr<IRClient>(new IRClient(*config, &transport));
}
virtual string RequestOp(int n) {
std::ostringstream stream;
stream << "test: " << n;
return stream.str();
}
virtual string LastRequestOp() {
return RequestOp(requestNum);
}
virtual void ClientSendNextInconsistent(Client::continuation_t upcall) {
requestNum++;
client->InvokeInconsistent(LastRequestOp(), upcall);
}
virtual void ClientSendNextConsensus(Client::continuation_t upcall,
IRClient::decide_t decide) {
requestNum++;
client->InvokeConsensus(LastRequestOp(), decide, upcall);
}
virtual void ClientSendNextUnlogged(
int idx, Client::continuation_t upcall,
Client::error_continuation_t error_continuation = nullptr,
uint32_t timeout = Client::DEFAULT_UNLOGGED_OP_TIMEOUT) {
client->InvokeUnlogged(idx, LastRequestOp(), upcall,
error_continuation, timeout);
// Replicas store their view information in the following files:
// - localhost:12345_0.bin
// - localhost:12346_1.bin
// - localhost:12347_2.bin
// We have to make sure to delete them after every test. Otherwise,
// replicas run in recovery mode.
for (std::size_t i = 0; i < replicaAddrs.size(); ++i) {
const transport::ReplicaAddress &addr = replicaAddrs[i];
const std::string filename =
addr.host + ":" + addr.port + "_" + std::to_string(i) + ".bin";
int success = std::remove(filename.c_str());
ASSERT(success == 0);
TEST_F(IRTest, OneInconsistentOp)
{
auto upcall = [this](const string &req, const string &reply) {
EXPECT_EQ(req, LastRequestOp());
// Inconsistent ops do not return a value
EXPECT_EQ(reply, "");
transport.CancelAllTimers();
ClientSendNextInconsistent(upcall);
// By now, they all should have executed the last request.
for (int i = 0; i < config->n; i++) {
EXPECT_EQ(iOps[i].size(), 1);
EXPECT_EQ(iOps[i].back(), LastRequestOp());
}
}
TEST_F(IRTest, OneConsensusOp)
{
auto upcall = [this](const string &req, const string &reply) {
EXPECT_EQ(req, LastRequestOp());
transport.CancelAllTimers();
auto decide = [this](const std::map<string, std::size_t> &results) {
// shouldn't ever get called
EXPECT_FALSE(true);
return "";
};
ClientSendNextConsensus(upcall, decide);
// By now, they all should have executed the last request.
for (int i = 0; i < config->n; i++) {
EXPECT_EQ(cOps[i].size(), 1);
EXPECT_EQ(cOps[i].back(), LastRequestOp());
}
}
TEST_F(IRTest, Unlogged)
{
auto upcall = [this](const string &req, const string &reply) {
EXPECT_EQ(req, LastRequestOp());
EXPECT_EQ(reply, "unlreply: "+LastRequestOp());
EXPECT_EQ(unloggedOps[1].back(), req);
transport.CancelAllTimers();
auto timeout = [&](const string &req, ErrorCode) {
for (unsigned int i = 0; i < iOps.size(); i++) {
EXPECT_EQ(0, iOps[i].size());
EXPECT_EQ((i == 1 ? 1 : 0), unloggedOps[i].size());
}
EXPECT_EQ(0, timeouts);
}
TEST_F(IRTest, UnloggedTimeout)
{
auto upcall = [this](const string &req, const string &reply) {
FAIL();
transport.CancelAllTimers();
auto timeout = [&](const string &req, ErrorCode) {
timeouts++;
};
// Drop messages to or from replica 1
transport.AddFilter(10, [](TransportReceiver *src, int srcIdx,
TransportReceiver *dst, int dstIdx,
Message &m, uint64_t &delay) {
if ((srcIdx == 1) || (dstIdx == 1)) {
return false;
}
return true;
});
// Run for 10 seconds
transport.Timer(10000, [&]() {
transport.CancelAllTimers();
});
ClientSendNextUnlogged(1, upcall, timeout);
for (unsigned int i = 0; i < iOps.size(); i++) {
EXPECT_EQ(0, iOps[i].size());
EXPECT_EQ(0, unloggedOps[i].size());
}
EXPECT_EQ(1, timeouts);
}
// TEST_F(IRTest, ManyOps)
// {
// Client::continuation_t upcall = [&](const string &req, const string &reply) {
// EXPECT_EQ(req, LastRequestOp());
// EXPECT_EQ(reply, "reply: "+LastRequestOp());
// // Not guaranteed that any replicas except the leader have
// // executed this request.
// EXPECT_EQ(ops[0].back(), req);
// if (requestNum < 9) {
// ClientSendNext(upcall);
// } else {
// transport.CancelAllTimers();
// transport.Run();
// // By now, they all should have executed the last request.
// for (int i = 0; i < config->n; i++) {
// EXPECT_EQ(10, ops[i].size());
// for (int j = 0; j < 10; j++) {
// EXPECT_EQ(RequestOp(j), ops[i][j]);