Commit 1c135be0 authored by Michael Whittaker's avatar Michael Whittaker

IR replicas now recovery only when told.

Previously, IR replicas would always perform recovery and would always
persist their view information into a file with a specific filename.
Now, you can provide the IR replica with the filename into which you
want it to persist its state. Moreover, if you provide the empty string,
the replica will simply disable recovery entirely.
parent ba22c934
......@@ -61,20 +61,12 @@ int main() {
new lockserver::LockServer());
servers.push_back(std::move(server));
auto replica = std::unique_ptr<replication::ir::IRReplica>(
new replication::ir::IRReplica(config, i, &transport,
servers[i].get()));
new replication::ir::IRReplica(config, i,
/* recover_filename= */ "",
&transport, servers[i].get()));
replicas.push_back(std::move(replica));
}
// Launch REPL.
transport.Run();
// Remove persisted files.
for (std::size_t i = 0; i < replica_addrs.size(); ++i) {
const transport::ReplicaAddress &addr = replica_addrs[i];
const std::string filename =
addr.host + ":" + addr.port + "_" + std::to_string(i) + ".bin";
int success = std::remove(filename.c_str());
ASSERT(success == 0);
}
}
......@@ -5,11 +5,12 @@ main(int argc, char **argv)
{
int index = -1;
const char *configPath = NULL;
std::string recovery_filename;
// Parse arguments
int opt;
char *strtolPtr;
while ((opt = getopt(argc, argv, "c:i:")) != -1) {
while ((opt = getopt(argc, argv, "c:i:r:")) != -1) {
switch (opt) {
case 'c':
configPath = optarg;
......@@ -22,6 +23,10 @@ main(int argc, char **argv)
}
break;
case 'r':
recovery_filename = optarg;
break;
default:
fprintf(stderr, "Unknown argument %s\n", argv[optind]);
}
......@@ -54,7 +59,8 @@ main(int argc, char **argv)
UDPTransport transport(0.0, 0.0, 0);
lockserver::LockServer server;
replication::ir::IRReplica replica(config, index, &transport, &server);
replication::ir::IRReplica replica(config, index, recovery_filename,
&transport, &server);
transport.Run();
......
......@@ -56,7 +56,6 @@ protected:
{"replica", "4"}};
config_ = std::unique_ptr<transport::Configuration>(
new transport::Configuration(5, 2, replica_addrs_));
RemovePersistedFiles();
for (std::size_t i = 0; i < 3; ++i) {
auto client = std::unique_ptr<lockserver::LockClient>(
......@@ -70,28 +69,12 @@ protected:
new lockserver::LockServer());
servers_.push_back(std::move(server));
auto replica = std::unique_ptr<replication::ir::IRReplica>(
new replication::ir::IRReplica(*config_, i, &transport_,
servers_[i].get()));
new replication::ir::IRReplica(*config_, i,
/* recovery_filename= */ "",
&transport_, servers_[i].get()));
replicas_.push_back(std::move(replica));
}
}
virtual void TearDown() {
RemovePersistedFiles();
}
virtual void RemovePersistedFiles() {
for (std::size_t i = 0; i < replica_addrs_.size(); ++i) {
const transport::ReplicaAddress &addr = replica_addrs_[i];
const std::string filename =
addr.host + ":" + addr.port + "_" + std::to_string(i) + ".bin";
std::ifstream f(filename);
if (f.good()) {
int success = std::remove(filename.c_str());
ASSERT(success == 0);
}
}
}
};
// Note that these tests are all white box smoke tests. They depend on the
......
......@@ -19,38 +19,40 @@ using namespace std;
using namespace proto;
IRReplica::IRReplica(transport::Configuration config, int myIdx,
const std::string &recovery_filename,
Transport *transport, IRAppReplica *app)
: config(std::move(config)), myIdx(myIdx), transport(transport), app(app),
status(STATUS_NORMAL), view(0), latest_normal_view(0),
// TODO: Take these filenames in via the command line?
persistent_view_info(config.replica(myIdx).host + ":" +
config.replica(myIdx).port + "_" +
std::to_string(myIdx) + ".bin"),
// Note that a leader waits for DO-VIEW-CHANGE messages from f other
// replicas (as opposed to f + 1) for a total of f + 1 replicas.
do_view_change_quorum(config.f)
{
transport->Register(this, config, myIdx);
// If our view info was previously initialized, then we are being started
// in recovery mode. If our view info has never been initialized, then this
// is the first time we are being run.
if (persistent_view_info.Initialized()) {
Debug("View information found in %s. Starting recovery.",
persistent_view_info.Filename().c_str());
status = STATUS_RECOVERING;
RecoverViewInfo();
Debug("Recovered view = %" PRIu64 " latest_normal_view = %" PRIu64 ".",
view, latest_normal_view);
++view;
if (myIdx == config.GetLeaderIndex(view)) {
// A recoverying replica should not be the leader.
if (recovery_filename != "") {
persistent_view_info = std::unique_ptr<PersistentRegister>(
new PersistentRegister(recovery_filename));
// If our view info was previously initialized, then we are being
// started in recovery mode. If our view info has never been
// initialized, then this is the first time we are being run.
if (persistent_view_info->Initialized()) {
Debug("View information found in %s. Starting recovery.",
persistent_view_info->Filename().c_str());
status = STATUS_RECOVERING;
RecoverViewInfo();
Debug("Recovered view = %" PRIu64 " latest_normal_view = %" PRIu64
".",
view, latest_normal_view);
++view;
if (myIdx == config.GetLeaderIndex(view)) {
// A recoverying replica should not be the leader.
++view;
}
PersistViewInfo();
BroadcastDoViewChangeMessages();
} else {
PersistViewInfo();
}
PersistViewInfo();
BroadcastDoViewChangeMessages();
} else {
PersistViewInfo();
}
// TODO: Figure out a good view change timeout.
......@@ -423,19 +425,24 @@ void IRReplica::HandleViewChangeTimeout() {
}
void IRReplica::PersistViewInfo() {
PersistedViewInfo view_info;
view_info.set_view(view);
view_info.set_latest_normal_view(latest_normal_view);
std::string output;
ASSERT(view_info.SerializeToString(&output));
persistent_view_info.Write(output);
if (persistent_view_info) {
PersistedViewInfo view_info;
view_info.set_view(view);
view_info.set_latest_normal_view(latest_normal_view);
std::string output;
bool success = view_info.SerializeToString(&output);
ASSERT(success);
persistent_view_info->Write(output);
}
}
void IRReplica::RecoverViewInfo() {
PersistedViewInfo view_info;
view_info.ParseFromString(persistent_view_info.Read());
view = view_info.view();
latest_normal_view = view_info.latest_normal_view();
if (persistent_view_info) {
PersistedViewInfo view_info;
view_info.ParseFromString(persistent_view_info->Read());
view = view_info.view();
latest_normal_view = view_info.latest_normal_view();
}
}
void IRReplica::BroadcastDoViewChangeMessages() {
......
......@@ -51,8 +51,11 @@ public:
class IRReplica : TransportReceiver
{
public:
// If recovery_filename is an empty string, then recovery is disabled.
// Otherwise, view information is persisted in a file with that name.
IRReplica(transport::Configuration config, int myIdx,
Transport *transport, IRAppReplica *app);
const std::string &recovery_filename, Transport *transport,
IRAppReplica *app);
~IRReplica();
// Message handlers.
......@@ -80,11 +83,11 @@ public:
private:
// Persist `view` and `latest_normal_view` to disk using
// `persistent_view_info`.
// `persistent_view_info`, if recovery is enabled.
void PersistViewInfo();
// Recover `view` and `latest_normal_view` from disk using
// `persistent_view_info`.
// `persistent_view_info`, if recovery is enabled.
void RecoverViewInfo();
// Broadcast DO-VIEW-CHANGE messages to all other replicas with our record
......@@ -107,7 +110,9 @@ private:
// latest_normal_view and use persistent_view_info to persist it to disk.
view_t view;
view_t latest_normal_view;
PersistentRegister persistent_view_info;
// If persistent_view_info is null, then recovery is disabled and the
// replica does NOT persist it's view info.
std::unique_ptr<PersistentRegister> persistent_view_info;
Record record;
std::unique_ptr<Timeout> view_change_timeout;
......
......@@ -107,7 +107,8 @@ protected:
auto ir_app = std::unique_ptr<IRApp>(
new IRApp(&iOps[i], &cOps[i], &unloggedOps[i]));
auto p = std::unique_ptr<IRReplica>(
new IRReplica(*config, i, &transport, ir_app.get()));
new IRReplica(*config, i, /*recovery_filename=*/"", &transport,
ir_app.get()));
apps.push_back(std::move(ir_app));
replicas.push_back(std::move(p));
}
......@@ -144,22 +145,6 @@ protected:
client->InvokeUnlogged(idx, LastRequestOp(), upcall,
error_continuation, timeout);
}
virtual void TearDown() {
// Replicas store their view information in the following files:
// - localhost:12345_0.bin
// - localhost:12346_1.bin
// - localhost:12347_2.bin
// We have to make sure to delete them after every test. Otherwise,
// replicas run in recovery mode.
for (std::size_t i = 0; i < replicaAddrs.size(); ++i) {
const transport::ReplicaAddress &addr = replicaAddrs[i];
const std::string filename =
addr.host + ":" + addr.port + "_" + std::to_string(i) + ".bin";
int success = std::remove(filename.c_str());
ASSERT(success == 0);
}
}
};
TEST_F(IRTest, OneInconsistentOp)
......
......@@ -164,10 +164,11 @@ main(int argc, char **argv)
const char *configPath = NULL;
const char *keyPath = NULL;
bool linearizable = true;
std::string recovery_filename;
// Parse arguments
int opt;
while ((opt = getopt(argc, argv, "c:i:m:e:s:f:n:N:k:")) != -1) {
while ((opt = getopt(argc, argv, "c:i:m:e:s:f:n:N:k:r:")) != -1) {
switch (opt) {
case 'c':
configPath = optarg;
......@@ -235,6 +236,12 @@ main(int argc, char **argv)
break;
}
case 'r':
{
configPath = optarg;
break;
}
default:
fprintf(stderr, "Unknown argument %s\n", argv[optind]);
}
......@@ -264,7 +271,8 @@ main(int argc, char **argv)
tapirstore::Server server(linearizable);
replication::ir::IRReplica replica(config, index, &transport, &server);
replication::ir::IRReplica replica(config, index, recovery_filename,
&transport, &server);
if (keyPath) {
string key;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment