Skip to content
Snippets Groups Projects
Commit 061301e5 authored by Irene Y Zhang's avatar Irene Y Zhang
Browse files

reorganizing and updating some VR code

parent c767ac1f
No related branches found
No related tags found
No related merge requests found
Showing
with 540 additions and 61 deletions
......@@ -122,6 +122,8 @@ $(foreach bin,$(1),$(eval LDFLAGS-$(bin) += $(2)))
endef
include lib/Rules.mk
include replication/common/Rules.mk
include replication/vr/Rules.mk
##################################################################
# General rules
......
......@@ -38,7 +38,7 @@
#include <string>
#include <string.h>
namespace specpaxos {
namespace transport {
ReplicaAddress::ReplicaAddress(const string &host, const string &port)
: host(host), port(port)
......@@ -213,4 +213,4 @@ Configuration::operator==(const Configuration &other) const
return true;
}
} // namespace specpaxos
} // namespace transport
......@@ -32,7 +32,7 @@
#ifndef _LIB_CONFIGURATION_H_
#define _LIB_CONFIGURATION_H_
#include "replication/vr/viewstamp.h"
#include "replication/common/viewstamp.h"
#include <fstream>
#include <stdbool.h>
......@@ -41,7 +41,7 @@
using std::string;
namespace specpaxos {
namespace transport {
struct ReplicaAddress
{
......@@ -82,12 +82,12 @@ private:
bool hasMulticast;
};
} // namespace specpaxos
} // namespace replication
namespace std {
template <> struct hash<specpaxos::ReplicaAddress>
template <> struct hash<transport::ReplicaAddress>
{
size_t operator()(const specpaxos::ReplicaAddress & x) const
size_t operator()(const transport::ReplicaAddress & x) const
{
return hash<string>()(x.host) * 37 + hash<string>()(x.port);
}
......@@ -95,15 +95,15 @@ template <> struct hash<specpaxos::ReplicaAddress>
}
namespace std {
template <> struct hash<specpaxos::Configuration>
template <> struct hash<transport::Configuration>
{
size_t operator()(const specpaxos::Configuration & x) const
size_t operator()(const transport::Configuration & x) const
{
size_t out = 0;
out = x.n * 37 + x.f;
for (int i = 0; i < x.n; i++) {
out *= 37;
out += hash<specpaxos::ReplicaAddress>()(x.replica(i));
out += hash<transport::ReplicaAddress>()(x.replica(i));
}
return out;
}
......
package specpaxos.latency.format;
package transport.latency.format;
message LatencyDist
{
......
......@@ -328,10 +328,10 @@ Latency_FlushTo(const char *fname)
{
std::ofstream outfile(fname);
Latency_t *l = latencyHead;
::specpaxos::latency::format::LatencyFile out;
::transport::latency::format::LatencyFile out;
for (; l; l = l->next) {
::specpaxos::latency::format::Latency lout;
::transport::latency::format::Latency lout;
Latency_Put(l, lout);
*(out.add_latencies()) = lout;
}
......@@ -356,14 +356,14 @@ Latency_Flush(void)
}
void
Latency_Put(Latency_t *l, ::specpaxos::latency::format::Latency &out)
Latency_Put(Latency_t *l, ::transport::latency::format::Latency &out)
{
out.Clear();
out.set_name(l->name);
for (int i = 0; i < l->distPoolNext; ++i) {
Latency_Dist_t *d = &l->distPool[i];
::specpaxos::latency::format::LatencyDist *outd = out.add_dists();
::transport::latency::format::LatencyDist *outd = out.add_dists();
outd->set_type(d->type);
outd->set_min(d->min);
outd->set_max(d->max);
......@@ -377,12 +377,12 @@ Latency_Put(Latency_t *l, ::specpaxos::latency::format::Latency &out)
}
bool
Latency_TryGet(const ::specpaxos::latency::format::Latency &in, Latency_t *l)
Latency_TryGet(const ::transport::latency::format::Latency &in, Latency_t *l)
{
LatencyInit(l, strdup(in.name().c_str())); // XXX Memory leak
l->distPoolNext = in.dists_size();
for (int i = 0; i < l->distPoolNext; ++i) {
const ::specpaxos::latency::format::LatencyDist &ind =
const ::transport::latency::format::LatencyDist &ind =
in.dists(i);
Latency_Dist_t *d = &l->distPool[i];
d->type = ind.type();
......
......@@ -108,8 +108,8 @@ void Latency_FlushTo(const char *fname);
void Latency_Flush(void);
void Latency_Put(Latency_t *l,
::specpaxos::latency::format::Latency &out);
bool Latency_TryGet(const ::specpaxos::latency::format::Latency &in,
::transport::latency::format::Latency &out);
bool Latency_TryGet(const ::transport::latency::format::Latency &in,
Latency_t *l);
static inline void
......
......@@ -75,7 +75,7 @@ SimulatedTransport::~SimulatedTransport()
void
SimulatedTransport::Register(TransportReceiver *receiver,
const specpaxos::Configuration &config,
const transport::Configuration &config,
int replicaIdx)
{
// Allocate an endpoint
......@@ -137,7 +137,7 @@ SimulatedTransport::SendMessageInternal(TransportReceiver *src,
}
SimulatedTransportAddress
SimulatedTransport::LookupAddress(const specpaxos::Configuration &cfg,
SimulatedTransport::LookupAddress(const transport::Configuration &cfg,
int idx)
{
// Check every registered replica to see if its configuration and
......@@ -162,7 +162,7 @@ SimulatedTransport::LookupAddress(const specpaxos::Configuration &cfg,
}
const SimulatedTransportAddress *
SimulatedTransport::LookupMulticastAddress(const specpaxos::Configuration *cfg)
SimulatedTransport::LookupMulticastAddress(const transport::Configuration *cfg)
{
return NULL;
}
......
......@@ -67,7 +67,7 @@ public:
SimulatedTransport();
~SimulatedTransport();
void Register(TransportReceiver *receiver,
const specpaxos::Configuration &config,
const transport::Configuration &config,
int replicaIdx);
void Run();
void AddFilter(int id, filter_t filter);
......@@ -83,9 +83,9 @@ protected:
bool multicast);
SimulatedTransportAddress
LookupAddress(const specpaxos::Configuration &cfg, int idx);
LookupAddress(const transport::Configuration &cfg, int idx);
const SimulatedTransportAddress *
LookupMulticastAddress(const specpaxos::Configuration *cfg);
LookupMulticastAddress(const transport::Configuration *cfg);
private:
struct QueuedMessage {
......
......@@ -32,7 +32,7 @@
#include <gtest/gtest.h>
using namespace specpaxos;
using namespace transport;
using std::vector;
TEST(Configuration, Basic)
......
......@@ -35,7 +35,7 @@
#include <gtest/gtest.h>
using namespace specpaxos::test;
using namespace transport::test;
using ::google::protobuf::Message;
......@@ -67,11 +67,11 @@ TestReceiver::ReceiveMessage(const TransportAddress &src,
class SimTransportTest : public testing::Test
{
protected:
std::vector<specpaxos::ReplicaAddress> replicaAddrs =
std::vector<transport::ReplicaAddress> replicaAddrs =
{ { "localhost", "12345" },
{ "localhost", "12346" },
{ "localhost", "12347" }};
specpaxos::Configuration config{3, 1, replicaAddrs};
transport::Configuration config{3, 1, replicaAddrs};
TestReceiver *receiver0;
TestReceiver *receiver1;
......
package specpaxos.test;
package transport.test;
message TestMessage {
required string test = 1;
......
......@@ -74,7 +74,7 @@ protected:
public:
virtual ~Transport() {}
virtual void Register(TransportReceiver *receiver,
const specpaxos::Configuration &config,
const transport::Configuration &config,
int replicaIdx) = 0;
virtual bool SendMessage(TransportReceiver *src, const TransportAddress &dst,
const Message &m) = 0;
......
......@@ -70,7 +70,7 @@ public:
SendMessageToReplica(TransportReceiver *src, int replicaIdx,
const Message &m)
{
const specpaxos::Configuration *cfg = configurations[src];
const transport::Configuration *cfg = configurations[src];
ASSERT(cfg != NULL);
if (!replicaAddressesInitialized) {
......@@ -86,7 +86,7 @@ public:
virtual bool
SendMessageToAll(TransportReceiver *src, const Message &m)
{
const specpaxos::Configuration *cfg = configurations[src];
const transport::Configuration *cfg = configurations[src];
ASSERT(cfg != NULL);
if (!replicaAddressesInitialized) {
......@@ -117,25 +117,25 @@ protected:
const ADDR &dst,
const Message &m,
bool multicast = false) = 0;
virtual ADDR LookupAddress(const specpaxos::Configuration &cfg,
virtual ADDR LookupAddress(const transport::Configuration &cfg,
int replicaIdx) = 0;
virtual const ADDR *
LookupMulticastAddress(const specpaxos::Configuration *cfg) = 0;
LookupMulticastAddress(const transport::Configuration *cfg) = 0;
std::unordered_map<specpaxos::Configuration,
specpaxos::Configuration *> canonicalConfigs;
std::unordered_map<transport::Configuration,
transport::Configuration *> canonicalConfigs;
std::map<TransportReceiver *,
specpaxos::Configuration *> configurations;
std::map<const specpaxos::Configuration *,
transport::Configuration *> configurations;
std::map<const transport::Configuration *,
std::map<int, ADDR> > replicaAddresses;
std::map<const specpaxos::Configuration *,
std::map<const transport::Configuration *,
std::map<int, TransportReceiver *> > replicaReceivers;
std::map<const specpaxos::Configuration *, ADDR> multicastAddresses;
std::map<const transport::Configuration *, ADDR> multicastAddresses;
bool replicaAddressesInitialized;
virtual specpaxos::Configuration *
virtual transport::Configuration *
RegisterConfiguration(TransportReceiver *receiver,
const specpaxos::Configuration &config,
const transport::Configuration &config,
int replicaIdx)
{
ASSERT(receiver != NULL);
......@@ -144,10 +144,10 @@ protected:
// pointer to the canonical copy; if not, create one. This
// allows us to use that pointer as a key in various
// structures.
specpaxos::Configuration *canonical
transport::Configuration *canonical
= canonicalConfigs[config];
if (canonical == NULL) {
canonical = new specpaxos::Configuration(config);
canonical = new transport::Configuration(config);
canonicalConfigs[config] = canonical;
}
......@@ -177,7 +177,7 @@ protected:
// For every configuration, look up all addresses and cache
// them.
for (auto &kv : canonicalConfigs) {
specpaxos::Configuration *cfg = kv.second;
transport::Configuration *cfg = kv.second;
for (int i = 0; i < cfg->n; i++) {
const ADDR addr = LookupAddress(*cfg, i);
......
......@@ -83,7 +83,7 @@ bool operator<(const UDPTransportAddress &a, const UDPTransportAddress &b)
}
UDPTransportAddress
UDPTransport::LookupAddress(const specpaxos::ReplicaAddress &addr)
UDPTransport::LookupAddress(const replication::ReplicaAddress &addr)
{
int res;
struct addrinfo hints;
......@@ -106,15 +106,15 @@ UDPTransport::LookupAddress(const specpaxos::ReplicaAddress &addr)
}
UDPTransportAddress
UDPTransport::LookupAddress(const specpaxos::Configuration &config,
UDPTransport::LookupAddress(const replication::Configuration &config,
int idx)
{
const specpaxos::ReplicaAddress &addr = config.replica(idx);
const replication::ReplicaAddress &addr = config.replica(idx);
return LookupAddress(addr);
}
const UDPTransportAddress *
UDPTransport::LookupMulticastAddress(const specpaxos::Configuration
UDPTransport::LookupMulticastAddress(const replication::Configuration
*config)
{
if (!config->multicast()) {
......@@ -232,7 +232,7 @@ UDPTransport::~UDPTransport()
}
void
UDPTransport::ListenOnMulticastPort(const specpaxos::Configuration
UDPTransport::ListenOnMulticastPort(const replication::Configuration
*canonicalConfig)
{
if (!canonicalConfig->multicast()) {
......@@ -298,13 +298,13 @@ UDPTransport::ListenOnMulticastPort(const specpaxos::Configuration
void
UDPTransport::Register(TransportReceiver *receiver,
const specpaxos::Configuration &config,
const replication::Configuration &config,
int replicaIdx)
{
ASSERT(replicaIdx < config.n);
struct sockaddr_in sin;
const specpaxos::Configuration *canonicalConfig =
const replication::Configuration *canonicalConfig =
RegisterConfiguration(receiver, config, replicaIdx);
// Create socket
......@@ -624,7 +624,7 @@ UDPTransport::OnReadable(int fd)
// If so, deliver the message to all replicas for that
// config, *except* if that replica was the sender of the
// message.
const specpaxos::Configuration *cfg = it->second;
const replication::Configuration *cfg = it->second;
for (auto &kv : replicaReceivers[cfg]) {
TransportReceiver *receiver = kv.second;
const UDPTransportAddress &raddr =
......
......@@ -69,7 +69,7 @@ public:
int dscp = 0, event_base *evbase = nullptr);
virtual ~UDPTransport();
void Register(TransportReceiver *receiver,
const specpaxos::Configuration &config,
const replication::Configuration &config,
int replicaIdx);
void Run();
void Stop();
......@@ -106,8 +106,8 @@ private:
std::vector<event *> signalEvents;
std::map<int, TransportReceiver*> receivers; // fd -> receiver
std::map<TransportReceiver*, int> fds; // receiver -> fd
std::map<const specpaxos::Configuration *, int> multicastFds;
std::map<int, const specpaxos::Configuration *> multicastConfigs;
std::map<const replication::Configuration *, int> multicastFds;
std::map<int, const replication::Configuration *> multicastConfigs;
int lastTimerId;
std::map<int, UDPTransportTimerInfo *> timers;
uint64_t lastFragMsgId;
......@@ -122,13 +122,13 @@ private:
const UDPTransportAddress &dst,
const Message &m, bool multicast = false);
UDPTransportAddress
LookupAddress(const specpaxos::ReplicaAddress &addr);
LookupAddress(const replication::ReplicaAddress &addr);
UDPTransportAddress
LookupAddress(const specpaxos::Configuration &cfg,
LookupAddress(const replication::Configuration &cfg,
int replicaIdx);
const UDPTransportAddress *
LookupMulticastAddress(const specpaxos::Configuration *cfg);
void ListenOnMulticastPort(const specpaxos::Configuration
LookupMulticastAddress(const replication::Configuration *cfg);
void ListenOnMulticastPort(const replication::Configuration
*canonicalConfig);
void OnReadable(int fd);
void OnTimer(UDPTransportTimerInfo *info);
......
d := $(dir $(lastword $(MAKEFILE_LIST)))
SRCS += $(addprefix $(d), \
client.cc replica.cc log.cc)
PROTOS += $(addprefix $(d), \
request.proto)
LIB-request := $(o)request.o
OBJS-client := $(o)client.o \
$(LIB-message) $(LIB-configuration) $(LIB-transport) \
$(LIB-request)
OBJS-replica := $(o)replica.o $(o)log.o \
$(LIB-message) $(LIB-request) \
$(LIB-configuration) $(LIB-udptransport)
include $(d)tests/Rules.mk
// -*- mode: c++; c-file-style: "k&r"; c-basic-offset: 4 -*-
/***********************************************************************
*
* client.cc:
* interface to replication client stubs
*
* Copyright 2013 Dan R. K. Ports <drkp@cs.washington.edu>
*
* Permission is hereby granted, free of charge, to any person
* obtaining a copy of this software and associated documentation
* files (the "Software"), to deal in the Software without
* restriction, including without limitation the rights to use, copy,
* modify, merge, publish, distribute, sublicense, and/or sell copies
* of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be
* included in all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
* MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
* NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
* BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
* ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
* CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*
**********************************************************************/
#include "replication/common/client.h"
#include "replication/common/request.pb.h"
#include "lib/message.h"
#include "lib/transport.h"
#include <random>
namespace replication {
Client::Client(const Configuration &config, Transport *transport,
uint64_t clientid)
: config(config), transport(transport)
{
this->clientid = clientid;
// Randomly generate a client ID
// This is surely not the fastest way to get a random 64-bit int,
// but it should be fine for this purpose.
while (this->clientid == 0) {
std::random_device rd;
std::mt19937_64 gen(rd());
std::uniform_int_distribution<uint64_t> dis;
this->clientid = dis(gen);
Debug("VRClient ID: %lu", this->clientid);
}
transport->Register(this, config, -1);
}
Client::~Client()
{
}
void
Client::ReceiveMessage(const TransportAddress &remote,
const string &type, const string &data)
{
Panic("Received unexpected message type: %s",
type.c_str());
}
} // namespace replication
// -*- mode: c++; c-file-style: "k&r"; c-basic-offset: 4 -*-
/***********************************************************************
*
* client.h:
* interface to replication client stubs
*
* Copyright 2013-2015 Irene Zhang <iyzhang@cs.washington.edu>
* Naveen Kr. Sharma <nksharma@cs.washington.edu>
* Dan R. K. Ports <drkp@cs.washington.edu>
*
* Permission is hereby granted, free of charge, to any person
* obtaining a copy of this software and associated documentation
* files (the "Software"), to deal in the Software without
* restriction, including without limitation the rights to use, copy,
* modify, merge, publish, distribute, sublicense, and/or sell copies
* of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be
* included in all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
* MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
* NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
* BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
* ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
* CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*
**********************************************************************/
#ifndef _COMMON_CLIENT_H_
#define _COMMON_CLIENT_H_
#include "lib/configuration.h"
#include "replication/common/request.pb.h"
#include "lib/transport.h"
#include <functional>
namespace replication {
class Client : public TransportReceiver
{
public:
typedef std::function<void (const string &, const string &)> continuation_t;
typedef std::function<void (const string &)> timeout_continuation_t;
static const uint32_t DEFAULT_UNLOGGED_OP_TIMEOUT = 1000; // milliseconds
Client(const transport::Configuration &config, Transport *transport,
uint64_t clientid = 0);
virtual ~Client();
virtual void Invoke(const string &request,
continuation_t continuation) = 0;
virtual void InvokeUnlogged(int replicaIdx,
const string &request,
continuation_t continuation,
timeout_continuation_t timeoutContinuation = nullptr,
uint32_t timeout = DEFAULT_UNLOGGED_OP_TIMEOUT) = 0;
virtual void ReceiveMessage(const TransportAddress &remote,
const string &type,
const string &data);
protected:
transport::Configuration config;
Transport *transport;
uint64_t clientid;
};
} // namespace replication
#endif /* _COMMON_CLIENT_H_ */
// -*- mode: c++; c-file-style: "k&r"; c-basic-offset: 4 -*-
/***********************************************************************
*
* log.h:
* a replica's log of pending and committed operations
*
* Copyright 2013-2015 Irene Zhang <iyzhang@cs.washington.edu>
* Naveen Kr. Sharma <nksharma@cs.washington.edu>
* Dan R. K. Ports <drkp@cs.washington.edu>
*
* Permission is hereby granted, free of charge, to any person
* obtaining a copy of this software and associated documentation
* files (the "Software"), to deal in the Software without
* restriction, including without limitation the rights to use, copy,
* modify, merge, publish, distribute, sublicense, and/or sell copies
* of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be
* included in all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
* MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
* NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
* BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
* ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
* CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*
**********************************************************************/
#ifndef _COMMON_LOG_IMPL_H_
#define _COMMON_LOG_IMPL_H_
template <class T> void
Log::Dump(opnum_t from, T out)
{
for (opnum_t i = std::max(from, start);
i <= LastOpnum(); i++) {
const LogEntry *entry = Find(i);
ASSERT(entry != NULL);
auto elem = out->Add();
elem->set_view(entry->viewstamp.view);
elem->set_opnum(entry->viewstamp.opnum);
elem->set_state(entry->state);
elem->set_hash(entry->hash);
*(elem->mutable_request()) = entry->request;
}
}
template <class iter> void
Log::Install(iter start, iter end)
{
// Find the first divergence in the log
iter it = start;
for (it = start; it != end; it++) {
const LogEntry *oldEntry = Find(it->opnum());
if (oldEntry == NULL) {
break;
}
if (it->view() != oldEntry->viewstamp.view) {
RemoveAfter(it->opnum());
break;
}
}
if (it == end) {
// We didn't find a divergence. This means that the logs
// should be identical. If the existing log is longer,
// something is wrong.
// it--;
// ASSERT(it->opnum() == lastViewstamp.opnum);
// ASSERT(it->view() == lastViewstamp.view);
// ASSERT(Find(it->opnum()+1) == NULL);
}
// Install the new log entries
for (; it != end; it++) {
viewstamp_t vs = { it->view(), it->opnum() };
Append(vs, it->request(), LOG_STATE_PREPARED);
}
}
#endif /* _COMMON_LOG_IMPL_H_ */
// -*- mode: c++; c-file-style: "k&r"; c-basic-offset: 4 -*-
/***********************************************************************
*
* log.h:
* a replica's log of pending and committed operations
*
* Copyright 2013 Dan R. K. Ports <drkp@cs.washington.edu>
*
* Permission is hereby granted, free of charge, to any person
* obtaining a copy of this software and associated documentation
* files (the "Software"), to deal in the Software without
* restriction, including without limitation the rights to use, copy,
* modify, merge, publish, distribute, sublicense, and/or sell copies
* of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be
* included in all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
* MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
* NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
* BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
* ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
* CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*
**********************************************************************/
#include "replicationcommon/log.h"
#include "replication/common/request.pb.h"
#include "lib/assert.h"
#include <openssl/sha.h>
namespace replication {
const string Log::EMPTY_HASH = string(SHA_DIGEST_LENGTH, '\0');
Log::Log(bool useHash, opnum_t start, string initialHash)
: useHash(useHash)
{
this->initialHash = initialHash;
this->start = start;
if (start == 1) {
ASSERT(initialHash == EMPTY_HASH);
}
}
LogEntry &
Log::Append(viewstamp_t vs, const Request &req, LogEntryState state)
{
if (entries.empty()) {
ASSERT(vs.opnum == start);
} else {
ASSERT(vs.opnum == LastOpnum()+1);
}
LogEntry entry;
entry.viewstamp = vs;
entry.request = req;
entry.state = state;
if (useHash) {
entry.hash = ComputeHash(LastHash(), entry);
}
entries.push_back(entry);
return *Find(vs.opnum);
}
// This really ought to be const
LogEntry *
Log::Find(opnum_t opnum)
{
if (entries.empty()) {
return NULL;
}
if (opnum < start) {
return NULL;
}
if (opnum-start > entries.size()-1) {
return NULL;
}
LogEntry *entry = &entries[opnum-start];
ASSERT(entry->viewstamp.opnum == opnum);
return entry;
}
bool
Log::SetStatus(opnum_t op, LogEntryState state)
{
LogEntry *entry = Find(op);
if (entry == NULL) {
return false;
}
entry->state = state;
return true;
}
bool
Log::SetRequest(opnum_t op, const Request &req)
{
if (useHash) {
Panic("Log::SetRequest on hashed log not supported.");
}
LogEntry *entry = Find(op);
if (entry == NULL) {
return false;
}
entry->request = req;
return true;
}
void
Log::RemoveAfter(opnum_t op)
{
#if PARANOID
// We'd better not be removing any committed entries.
for (opnum_t i = op; i <= LastOpnum(); i++) {
ASSERT(Find(i)->state != LOG_STATE_COMMITTED);
}
#endif
if (op > LastOpnum()) {
return;
}
Debug("Removing log entries after " FMT_OPNUM, op);
ASSERT(op-start < entries.size());
entries.resize(op-start);
ASSERT(LastOpnum() == op-1);
}
LogEntry *
Log::Last()
{
if (entries.empty()) {
return NULL;
}
return &entries.back();
}
viewstamp_t
Log::LastViewstamp() const
{
if (entries.empty()) {
return viewstamp_t(0, start-1);
} else {
return entries.back().viewstamp;
}
}
opnum_t
Log::LastOpnum() const
{
if (entries.empty()) {
return start-1;
} else {
return entries.back().viewstamp.opnum;
}
}
opnum_t
Log::FirstOpnum() const
{
// XXX Not really sure what's appropriate to return here if the
// log is empty
return start;
}
bool
Log::Empty() const
{
return entries.empty();
}
const string &
Log::LastHash() const
{
if (entries.empty()) {
return initialHash;
} else {
return entries.back().hash;
}
}
string
Log::ComputeHash(string lastHash, const LogEntry &entry)
{
SHA_CTX ctx;
unsigned char out[SHA_DIGEST_LENGTH];
SHA1_Init(&ctx);
SHA1_Update(&ctx, lastHash.c_str(), lastHash.size());
SHA1_Update(&ctx, &entry.viewstamp, sizeof(entry.viewstamp));
uint64_t x;
x = entry.request.clientid();
SHA1_Update(&ctx, &x, sizeof(x));
x = entry.request.clientreqid();
SHA1_Update(&ctx, &x, sizeof(x));
SHA1_Update(&ctx, entry.request.op().c_str(),
entry.request.op().size());
SHA1_Final(out, &ctx);
return string((char *)out, SHA_DIGEST_LENGTH);
}
} // namespace replication
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment