Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • syslab/tapir
  • aaasz/tapir
  • ashmrtnz/tapir
3 results
Show changes
Showing
with 1811 additions and 42 deletions
......@@ -5,7 +5,7 @@
* simulated message-passing interface for testing use
*
* Copyright 2013-2015 Irene Zhang <iyzhang@cs.washington.edu>
* Naveen Kr. Sharma <nksharma@cs.washington.edu>
* Naveen Kr. Sharma <naveenks@cs.washington.edu>
* Dan R. K. Ports <drkp@cs.washington.edu>
*
* Permission is hereby granted, free of charge, to any person
......
// -*- mode: c++; c-file-style: "k&r"; c-basic-offset: 4 -*-
/***********************************************************************
*
* tcptransport.cc:
* message-passing network interface that uses TCP message delivery
* and libasync
*
* Copyright 2013 Dan R. K. Ports <drkp@cs.washington.edu>
*
* Permission is hereby granted, free of charge, to any person
* obtaining a copy of this software and associated documentation
* files (the "Software"), to deal in the Software without
* restriction, including without limitation the rights to use, copy,
* modify, merge, publish, distribute, sublicense, and/or sell copies
* of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be
* included in all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
* MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
* NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
* BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
* ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
* CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*
**********************************************************************/
#include "lib/assert.h"
#include "lib/configuration.h"
#include "lib/message.h"
#include "lib/tcptransport.h"
#include <google/protobuf/message.h>
#include <event2/thread.h>
#include <netinet/tcp.h>
#include <arpa/inet.h>
#include <netinet/in.h>
#include <fcntl.h>
#include <unistd.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netdb.h>
#include <signal.h>
const size_t MAX_TCP_SIZE = 100; // XXX
const uint32_t MAGIC = 0x06121983;
using std::pair;
TCPTransportAddress::TCPTransportAddress(const sockaddr_in &addr)
: addr(addr)
{
memset((void *)addr.sin_zero, 0, sizeof(addr.sin_zero));
}
TCPTransportAddress *
TCPTransportAddress::clone() const
{
TCPTransportAddress *c = new TCPTransportAddress(*this);
return c;
}
bool operator==(const TCPTransportAddress &a, const TCPTransportAddress &b)
{
return (memcmp(&a.addr, &b.addr, sizeof(a.addr)) == 0);
}
bool operator!=(const TCPTransportAddress &a, const TCPTransportAddress &b)
{
return !(a == b);
}
bool operator<(const TCPTransportAddress &a, const TCPTransportAddress &b)
{
return (memcmp(&a.addr, &b.addr, sizeof(a.addr)) < 0);
}
TCPTransportAddress
TCPTransport::LookupAddress(const transport::ReplicaAddress &addr)
{
int res;
struct addrinfo hints;
memset(&hints, 0, sizeof(hints));
hints.ai_family = AF_INET;
hints.ai_socktype = SOCK_STREAM;
hints.ai_protocol = 0;
hints.ai_flags = 0;
struct addrinfo *ai;
if ((res = getaddrinfo(addr.host.c_str(), addr.port.c_str(),
&hints, &ai))) {
Panic("Failed to resolve %s:%s: %s",
addr.host.c_str(), addr.port.c_str(), gai_strerror(res));
}
if (ai->ai_addr->sa_family != AF_INET) {
Panic("getaddrinfo returned a non IPv4 address");
}
TCPTransportAddress out =
TCPTransportAddress(*((sockaddr_in *)ai->ai_addr));
freeaddrinfo(ai);
return out;
}
TCPTransportAddress
TCPTransport::LookupAddress(const transport::Configuration &config,
int idx)
{
const transport::ReplicaAddress &addr = config.replica(idx);
return LookupAddress(addr);
}
static void
BindToPort(int fd, const string &host, const string &port)
{
struct sockaddr_in sin;
// look up its hostname and port number (which
// might be a service name)
struct addrinfo hints;
hints.ai_family = AF_INET;
hints.ai_socktype = SOCK_STREAM;
hints.ai_protocol = 0;
hints.ai_flags = AI_PASSIVE;
struct addrinfo *ai;
int res;
if ((res = getaddrinfo(host.c_str(),
port.c_str(),
&hints, &ai))) {
Panic("Failed to resolve host/port %s:%s: %s",
host.c_str(), port.c_str(), gai_strerror(res));
}
ASSERT(ai->ai_family == AF_INET);
ASSERT(ai->ai_socktype == SOCK_STREAM);
if (ai->ai_addr->sa_family != AF_INET) {
Panic("getaddrinfo returned a non IPv4 address");
}
sin = *(sockaddr_in *)ai->ai_addr;
freeaddrinfo(ai);
Debug("Binding to %s %d TCP", inet_ntoa(sin.sin_addr), htons(sin.sin_port));
if (bind(fd, (sockaddr *)&sin, sizeof(sin)) < 0) {
PPanic("Failed to bind to socket");
}
}
TCPTransport::TCPTransport(double dropRate, double reorderRate,
int dscp, bool handleSignals)
{
lastTimerId = 0;
// Set up libevent
evthread_use_pthreads();
event_set_log_callback(LogCallback);
event_set_fatal_callback(FatalCallback);
libeventBase = event_base_new();
evthread_make_base_notifiable(libeventBase);
// Set up signal handler
if (handleSignals) {
signalEvents.push_back(evsignal_new(libeventBase, SIGTERM,
SignalCallback, this));
signalEvents.push_back(evsignal_new(libeventBase, SIGINT,
SignalCallback, this));
for (event *x : signalEvents) {
event_add(x, NULL);
}
}
}
TCPTransport::~TCPTransport()
{
// XXX Shut down libevent?
// for (auto kv : timers) {
// delete kv.second;
// }
}
void
TCPTransport::ConnectTCP(TransportReceiver *src, const TCPTransportAddress &dst)
{
// Create socket
int fd;
if ((fd = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
PPanic("Failed to create socket for outgoing TCP connection");
}
// Put it in non-blocking mode
if (fcntl(fd, F_SETFL, O_NONBLOCK, 1)) {
PWarning("Failed to set O_NONBLOCK on outgoing TCP socket");
}
// Set TCP_NODELAY
int n = 1;
if (setsockopt(fd, IPPROTO_TCP,
TCP_NODELAY, (char *)&n, sizeof(n)) < 0) {
PWarning("Failed to set TCP_NODELAY on TCP listening socket");
}
TCPTransportTCPListener *info = new TCPTransportTCPListener();
info->transport = this;
info->acceptFd = 0;
info->receiver = src;
info->replicaIdx = -1;
info->acceptEvent = NULL;
tcpListeners.push_back(info);
struct bufferevent *bev =
bufferevent_socket_new(libeventBase, fd,
BEV_OPT_CLOSE_ON_FREE);
bufferevent_setcb(bev, TCPReadableCallback, NULL,
TCPOutgoingEventCallback, info);
if (bufferevent_socket_connect(bev,
(struct sockaddr *)&(dst.addr),
sizeof(dst.addr)) < 0) {
bufferevent_free(bev);
Warning("Failed to connect to server via TCP");
return;
}
if (bufferevent_enable(bev, EV_READ|EV_WRITE) < 0) {
Panic("Failed to enable bufferevent");
}
// Tell the receiver its address
struct sockaddr_in sin;
socklen_t sinsize = sizeof(sin);
if (getsockname(fd, (sockaddr *) &sin, &sinsize) < 0) {
PPanic("Failed to get socket name");
}
TCPTransportAddress *addr = new TCPTransportAddress(sin);
src->SetAddress(addr);
tcpOutgoing[dst] = bev;
tcpAddresses.insert(pair<struct bufferevent*, TCPTransportAddress>(bev,dst));
Debug("Opened TCP connection to %s:%d",
inet_ntoa(dst.addr.sin_addr), htons(dst.addr.sin_port));
}
void
TCPTransport::Register(TransportReceiver *receiver,
const transport::Configuration &config,
int replicaIdx)
{
ASSERT(replicaIdx < config.n);
struct sockaddr_in sin;
//const transport::Configuration *canonicalConfig =
RegisterConfiguration(receiver, config, replicaIdx);
// Clients don't need to accept TCP connections
if (replicaIdx == -1) {
return;
}
// Create socket
int fd;
if ((fd = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
PPanic("Failed to create socket to accept TCP connections");
}
// Put it in non-blocking mode
if (fcntl(fd, F_SETFL, O_NONBLOCK, 1)) {
PWarning("Failed to set O_NONBLOCK");
}
// Set SO_REUSEADDR
int n;
if (setsockopt(fd, SOL_SOCKET,
SO_REUSEADDR, (char *)&n, sizeof(n)) < 0) {
PWarning("Failed to set SO_REUSEADDR on TCP listening socket");
}
// Set TCP_NODELAY
n = 1;
if (setsockopt(fd, IPPROTO_TCP,
TCP_NODELAY, (char *)&n, sizeof(n)) < 0) {
PWarning("Failed to set TCP_NODELAY on TCP listening socket");
}
// Registering a replica. Bind socket to the designated
// host/port
const string &host = config.replica(replicaIdx).host;
const string &port = config.replica(replicaIdx).port;
BindToPort(fd, host, port);
// Listen for connections
if (listen(fd, 5) < 0) {
PPanic("Failed to listen for TCP connections");
}
// Create event to accept connections
TCPTransportTCPListener *info = new TCPTransportTCPListener();
info->transport = this;
info->acceptFd = fd;
info->receiver = receiver;
info->replicaIdx = replicaIdx;
info->acceptEvent = event_new(libeventBase, fd,
EV_READ | EV_PERSIST,
TCPAcceptCallback, (void *)info);
event_add(info->acceptEvent, NULL);
tcpListeners.push_back(info);
// Tell the receiver its address
socklen_t sinsize = sizeof(sin);
if (getsockname(fd, (sockaddr *) &sin, &sinsize) < 0) {
PPanic("Failed to get socket name");
}
TCPTransportAddress *addr = new TCPTransportAddress(sin);
receiver->SetAddress(addr);
// Update mappings
receivers[fd] = receiver;
fds[receiver] = fd;
Debug("Accepting connections on TCP port %hu", ntohs(sin.sin_port));
}
bool
TCPTransport::SendMessageInternal(TransportReceiver *src,
const TCPTransportAddress &dst,
const Message &m,
bool multicast)
{
auto kv = tcpOutgoing.find(dst);
// See if we have a connection open
if (kv == tcpOutgoing.end()) {
ConnectTCP(src, dst);
kv = tcpOutgoing.find(dst);
}
struct bufferevent *ev = kv->second;
ASSERT(ev != NULL);
// Serialize message
string data = m.SerializeAsString();
string type = m.GetTypeName();
size_t typeLen = type.length();
size_t dataLen = data.length();
size_t totalLen = (typeLen + sizeof(typeLen) +
dataLen + sizeof(dataLen) +
sizeof(totalLen) +
sizeof(uint32_t));
Debug("Sending %ld byte %s message to server over TCP",
totalLen, type.c_str());
char buf[totalLen];
char *ptr = buf;
*((uint32_t *) ptr) = MAGIC;
ptr += sizeof(uint32_t);
ASSERT((size_t)(ptr-buf) < totalLen);
*((size_t *) ptr) = totalLen;
ptr += sizeof(size_t);
ASSERT((size_t)(ptr-buf) < totalLen);
*((size_t *) ptr) = typeLen;
ptr += sizeof(size_t);
ASSERT((size_t)(ptr-buf) < totalLen);
ASSERT((size_t)(ptr+typeLen-buf) < totalLen);
memcpy(ptr, type.c_str(), typeLen);
ptr += typeLen;
*((size_t *) ptr) = dataLen;
ptr += sizeof(size_t);
ASSERT((size_t)(ptr-buf) < totalLen);
ASSERT((size_t)(ptr+dataLen-buf) == totalLen);
memcpy(ptr, data.c_str(), dataLen);
ptr += dataLen;
if (bufferevent_write(ev, buf, totalLen) < 0) {
Warning("Failed to write to TCP buffer");
return false;
}
return true;
}
void
TCPTransport::Run()
{
event_base_dispatch(libeventBase);
}
void
TCPTransport::Stop()
{
event_base_loopbreak(libeventBase);
}
int
TCPTransport::Timer(uint64_t ms, timer_callback_t cb)
{
std::lock_guard<std::mutex> lck(mtx);
TCPTransportTimerInfo *info = new TCPTransportTimerInfo();
struct timeval tv;
tv.tv_sec = ms/1000;
tv.tv_usec = (ms % 1000) * 1000;
++lastTimerId;
info->transport = this;
info->id = lastTimerId;
info->cb = cb;
info->ev = event_new(libeventBase, -1, 0,
TimerCallback, info);
timers[info->id] = info;
event_add(info->ev, &tv);
return info->id;
}
bool
TCPTransport::CancelTimer(int id)
{
std::lock_guard<std::mutex> lck(mtx);
TCPTransportTimerInfo *info = timers[id];
if (info == NULL) {
return false;
}
timers.erase(info->id);
event_del(info->ev);
event_free(info->ev);
delete info;
return true;
}
void
TCPTransport::CancelAllTimers()
{
while (!timers.empty()) {
auto kv = timers.begin();
CancelTimer(kv->first);
}
}
void
TCPTransport::OnTimer(TCPTransportTimerInfo *info)
{
{
std::lock_guard<std::mutex> lck(mtx);
timers.erase(info->id);
event_del(info->ev);
event_free(info->ev);
}
info->cb();
delete info;
}
void
TCPTransport::TimerCallback(evutil_socket_t fd, short what, void *arg)
{
TCPTransport::TCPTransportTimerInfo *info =
(TCPTransport::TCPTransportTimerInfo *)arg;
ASSERT(what & EV_TIMEOUT);
info->transport->OnTimer(info);
}
void
TCPTransport::LogCallback(int severity, const char *msg)
{
Message_Type msgType;
switch (severity) {
case _EVENT_LOG_DEBUG:
msgType = MSG_DEBUG;
break;
case _EVENT_LOG_MSG:
msgType = MSG_NOTICE;
break;
case _EVENT_LOG_WARN:
msgType = MSG_WARNING;
break;
case _EVENT_LOG_ERR:
msgType = MSG_WARNING;
break;
default:
NOT_REACHABLE();
}
_Message(msgType, "libevent", 0, NULL, "%s", msg);
}
void
TCPTransport::FatalCallback(int err)
{
Panic("Fatal libevent error: %d", err);
}
void
TCPTransport::SignalCallback(evutil_socket_t fd, short what, void *arg)
{
Debug("Terminating on SIGTERM/SIGINT");
TCPTransport *transport = (TCPTransport *)arg;
event_base_loopbreak(transport->libeventBase);
}
void
TCPTransport::TCPAcceptCallback(evutil_socket_t fd, short what, void *arg)
{
TCPTransportTCPListener *info = (TCPTransportTCPListener *)arg;
TCPTransport *transport = info->transport;
if (what & EV_READ) {
int newfd;
struct sockaddr_in sin;
socklen_t sinLength = sizeof(sin);
struct bufferevent *bev;
// Accept a connection
if ((newfd = accept(fd, (struct sockaddr *)&sin,
&sinLength)) < 0) {
PWarning("Failed to accept incoming TCP connection");
return;
}
// Put it in non-blocking mode
if (fcntl(newfd, F_SETFL, O_NONBLOCK, 1)) {
PWarning("Failed to set O_NONBLOCK");
}
// Set TCP_NODELAY
int n = 1;
if (setsockopt(newfd, IPPROTO_TCP,
TCP_NODELAY, (char *)&n, sizeof(n)) < 0) {
PWarning("Failed to set TCP_NODELAY on TCP listening socket");
}
// Create a buffered event
bev = bufferevent_socket_new(transport->libeventBase, newfd,
BEV_OPT_CLOSE_ON_FREE);
bufferevent_setcb(bev, TCPReadableCallback, NULL,
TCPIncomingEventCallback, info);
if (bufferevent_enable(bev, EV_READ|EV_WRITE) < 0) {
Panic("Failed to enable bufferevent");
}
info->connectionEvents.push_back(bev);
TCPTransportAddress client = TCPTransportAddress(sin);
transport->tcpOutgoing[client] = bev;
transport->tcpAddresses.insert(pair<struct bufferevent*, TCPTransportAddress>(bev,client));
Debug("Opened incoming TCP connection from %s:%d",
inet_ntoa(sin.sin_addr), htons(sin.sin_port));
}
}
void
TCPTransport::TCPReadableCallback(struct bufferevent *bev, void *arg)
{
TCPTransportTCPListener *info = (TCPTransportTCPListener *)arg;
TCPTransport *transport = info->transport;
struct evbuffer *evbuf = bufferevent_get_input(bev);
Debug("Readable on bufferevent %p", bev);
while (evbuffer_get_length(evbuf) > 0) {
uint32_t *magic;
magic = (uint32_t *)evbuffer_pullup(evbuf, sizeof(*magic));
if (magic == NULL) {
return;
}
ASSERT(*magic == MAGIC);
size_t *sz;
unsigned char *x = evbuffer_pullup(evbuf, sizeof(*magic) + sizeof(*sz));
sz = (size_t *) (x + sizeof(*magic));
if (x == NULL) {
return;
}
size_t totalSize = *sz;
ASSERT(totalSize < 1073741826);
if (evbuffer_get_length(evbuf) < totalSize) {
Debug("Don't have %ld bytes for a message yet, only %ld",
totalSize, evbuffer_get_length(evbuf));
return;
}
Debug("Receiving %ld byte message", totalSize);
char buf[totalSize];
size_t copied = evbuffer_remove(evbuf, buf, totalSize);
ASSERT(copied == totalSize);
// Parse message
char *ptr = buf + sizeof(*sz) + sizeof(*magic);
size_t typeLen = *((size_t *)ptr);
ptr += sizeof(size_t);
ASSERT((size_t)(ptr-buf) < totalSize);
ASSERT((size_t)(ptr+typeLen-buf) < totalSize);
string msgType(ptr, typeLen);
ptr += typeLen;
size_t msgLen = *((size_t *)ptr);
ptr += sizeof(size_t);
ASSERT((size_t)(ptr-buf) < totalSize);
ASSERT((size_t)(ptr+msgLen-buf) <= totalSize);
string msg(ptr, msgLen);
ptr += msgLen;
auto addr = transport->tcpAddresses.find(bev);
ASSERT(addr != transport->tcpAddresses.end());
// Dispatch
info->receiver->ReceiveMessage(addr->second, msgType, msg);
Debug("Done processing large %s message", msgType.c_str());
}
}
void
TCPTransport::TCPIncomingEventCallback(struct bufferevent *bev,
short what, void *arg)
{
if (what & BEV_EVENT_ERROR) {
Warning("Error on incoming TCP connection: %s",
evutil_socket_error_to_string(EVUTIL_SOCKET_ERROR()));
bufferevent_free(bev);
return;
} else if (what & BEV_EVENT_ERROR) {
Warning("EOF on incoming TCP connection");
bufferevent_free(bev);
return;
}
}
void
TCPTransport::TCPOutgoingEventCallback(struct bufferevent *bev,
short what, void *arg)
{
TCPTransportTCPListener *info = (TCPTransportTCPListener *)arg;
TCPTransport *transport = info->transport;
auto it = transport->tcpAddresses.find(bev);
ASSERT(it != transport->tcpAddresses.end());
TCPTransportAddress addr = it->second;
if (what & BEV_EVENT_CONNECTED) {
Debug("Established outgoing TCP connection to server");
} else if (what & BEV_EVENT_ERROR) {
Warning("Error on outgoing TCP connection to server: %s",
evutil_socket_error_to_string(EVUTIL_SOCKET_ERROR()));
bufferevent_free(bev);
auto it2 = transport->tcpOutgoing.find(addr);
transport->tcpOutgoing.erase(it2);
transport->tcpAddresses.erase(bev);
return;
} else if (what & BEV_EVENT_EOF) {
Warning("EOF on outgoing TCP connection to server");
bufferevent_free(bev);
auto it2 = transport->tcpOutgoing.find(addr);
transport->tcpOutgoing.erase(it2);
transport->tcpAddresses.erase(bev);
return;
}
}
// -*- mode: c++; c-file-style: "k&r"; c-basic-offset: 4 -*-
/***********************************************************************
*
* tcptransport.h:
* message-passing network interface that uses UDP message delivery
* and libasync
*
* Copyright 2013 Dan R. K. Ports <drkp@cs.washington.edu>
*
* Permission is hereby granted, free of charge, to any person
* obtaining a copy of this software and associated documentation
* files (the "Software"), to deal in the Software without
* restriction, including without limitation the rights to use, copy,
* modify, merge, publish, distribute, sublicense, and/or sell copies
* of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be
* included in all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
* MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
* NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
* BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
* ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
* CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*
**********************************************************************/
#ifndef _LIB_TCPTRANSPORT_H_
#define _LIB_TCPTRANSPORT_H_
#include "lib/configuration.h"
#include "lib/transport.h"
#include "lib/transportcommon.h"
#include <event2/event.h>
#include <event2/buffer.h>
#include <event2/bufferevent.h>
#include <map>
#include <unordered_map>
#include <list>
#include <random>
#include <mutex>
#include <netinet/in.h>
class TCPTransportAddress : public TransportAddress
{
public:
TCPTransportAddress * clone() const;
private:
TCPTransportAddress(const sockaddr_in &addr);
sockaddr_in addr;
friend class TCPTransport;
friend bool operator==(const TCPTransportAddress &a,
const TCPTransportAddress &b);
friend bool operator!=(const TCPTransportAddress &a,
const TCPTransportAddress &b);
friend bool operator<(const TCPTransportAddress &a,
const TCPTransportAddress &b);
};
class TCPTransport : public TransportCommon<TCPTransportAddress>
{
public:
TCPTransport(double dropRate = 0.0, double reogrderRate = 0.0,
int dscp = 0, bool handleSignals = true);
virtual ~TCPTransport();
void Register(TransportReceiver *receiver,
const transport::Configuration &config,
int replicaIdx);
void Run();
void Stop();
int Timer(uint64_t ms, timer_callback_t cb);
bool CancelTimer(int id);
void CancelAllTimers();
private:
std::mutex mtx;
struct TCPTransportTimerInfo
{
TCPTransport *transport;
timer_callback_t cb;
event *ev;
int id;
};
struct TCPTransportTCPListener
{
TCPTransport *transport;
TransportReceiver *receiver;
int acceptFd;
int replicaIdx;
event *acceptEvent;
std::list<struct bufferevent *> connectionEvents;
};
event_base *libeventBase;
std::vector<event *> listenerEvents;
std::vector<event *> signalEvents;
std::map<int, TransportReceiver*> receivers; // fd -> receiver
std::map<TransportReceiver*, int> fds; // receiver -> fd
int lastTimerId;
std::map<int, TCPTransportTimerInfo *> timers;
std::list<TCPTransportTCPListener *> tcpListeners;
std::map<TCPTransportAddress, struct bufferevent *> tcpOutgoing;
std::map<struct bufferevent *, TCPTransportAddress> tcpAddresses;
bool SendMessageInternal(TransportReceiver *src,
const TCPTransportAddress &dst,
const Message &m, bool multicast = false);
TCPTransportAddress
LookupAddress(const transport::ReplicaAddress &addr);
TCPTransportAddress
LookupAddress(const transport::Configuration &cfg,
int replicaIdx);
const TCPTransportAddress *
LookupMulticastAddress(const transport::Configuration*config) { return NULL; };
void ConnectTCP(TransportReceiver *src, const TCPTransportAddress &dst);
void OnTimer(TCPTransportTimerInfo *info);
static void TimerCallback(evutil_socket_t fd,
short what, void *arg);
static void LogCallback(int severity, const char *msg);
static void FatalCallback(int err);
static void SignalCallback(evutil_socket_t fd,
short what, void *arg);
static void TCPAcceptCallback(evutil_socket_t fd, short what,
void *arg);
static void TCPReadableCallback(struct bufferevent *bev,
void *arg);
static void TCPEventCallback(struct bufferevent *bev,
short what, void *arg);
static void TCPIncomingEventCallback(struct bufferevent *bev,
short what, void *arg);
static void TCPOutgoingEventCallback(struct bufferevent *bev,
short what, void *arg);
};
#endif // _LIB_TCPTRANSPORT_H_
syntax = "proto2";
package transport.test;
message TestMessage {
......
......@@ -5,7 +5,7 @@
* utility functions for manipulating timevals
*
* Copyright 2013-2015 Irene Zhang <iyzhang@cs.washington.edu>
* Naveen Kr. Sharma <nksharma@cs.washington.edu>
* Naveen Kr. Sharma <naveenks@cs.washington.edu>
* Dan R. K. Ports <drkp@cs.washington.edu>
* Copyright 2009-2012 Massachusetts Institute of Technology
*
......
......@@ -5,7 +5,7 @@
* message-passing network interface; common definitions
*
* Copyright 2013-2015 Irene Zhang <iyzhang@cs.washington.edu>
* Naveen Kr. Sharma <nksharma@cs.washington.edu>
* Naveen Kr. Sharma <naveenks@cs.washington.edu>
* Dan R. K. Ports <drkp@cs.washington.edu>
*
* Permission is hereby granted, free of charge, to any person
......
......@@ -5,7 +5,7 @@
* template support for implementing transports
*
* Copyright 2013-2015 Irene Zhang <iyzhang@cs.washington.edu>
* Naveen Kr. Sharma <nksharma@cs.washington.edu>
* Naveen Kr. Sharma <naveenks@cs.washington.edu>
* Dan R. K. Ports <drkp@cs.washington.edu>
*
* Permission is hereby granted, free of charge, to any person
......
......@@ -38,6 +38,7 @@
#include <event2/event.h>
#include <event2/thread.h>
#include <memory>
#include <random>
#include <arpa/inet.h>
......@@ -178,9 +179,8 @@ BindToPort(int fd, const string &host, const string &port)
}
UDPTransport::UDPTransport(double dropRate, double reorderRate,
int dscp, event_base *evbase)
: dropRate(dropRate), reorderRate(reorderRate),
dscp(dscp)
int dscp, bool handleSignals)
: dropRate(dropRate), reorderRate(reorderRate), dscp(dscp)
{
lastTimerId = 0;
......@@ -197,27 +197,23 @@ UDPTransport::UDPTransport(double dropRate, double reorderRate,
}
// Set up libevent
evthread_use_pthreads();
event_set_log_callback(LogCallback);
event_set_fatal_callback(FatalCallback);
// XXX Hack for Naveen: allow the user to specify an existing
// libevent base. This will probably not work exactly correctly
// for error messages or signals, but that doesn't much matter...
if (evbase) {
libeventBase = evbase;
} else {
evthread_use_pthreads();
libeventBase = event_base_new();
evthread_make_base_notifiable(libeventBase);
}
// Set up signal handler
signalEvents.push_back(evsignal_new(libeventBase, SIGTERM,
SignalCallback, this));
signalEvents.push_back(evsignal_new(libeventBase, SIGINT,
SignalCallback, this));
libeventBase = event_base_new();
evthread_make_base_notifiable(libeventBase);
for (event *x : signalEvents) {
event_add(x, NULL);
// Set up signal handler
if (handleSignals) {
signalEvents.push_back(evsignal_new(libeventBase, SIGTERM,
SignalCallback, this));
signalEvents.push_back(evsignal_new(libeventBase, SIGINT,
SignalCallback, this));
for (event *x : signalEvents) {
event_add(x, NULL);
}
}
}
......@@ -385,7 +381,8 @@ UDPTransport::Register(TransportReceiver *receiver,
}
static size_t
SerializeMessage(const ::google::protobuf::Message &m, char **out)
SerializeMessage(const ::google::protobuf::Message &m,
std::unique_ptr<char[]> *out)
{
string data = m.SerializeAsString();
string type = m.GetTypeName();
......@@ -394,8 +391,9 @@ SerializeMessage(const ::google::protobuf::Message &m, char **out)
ssize_t totalLen = (typeLen + sizeof(typeLen) +
dataLen + sizeof(dataLen));
char *buf = new char[totalLen];
std::unique_ptr<char[]> unique_buf(new char[totalLen]);
char *buf = unique_buf.get();
char *ptr = buf;
*((size_t *) ptr) = typeLen;
ptr += sizeof(size_t);
......@@ -409,8 +407,8 @@ SerializeMessage(const ::google::protobuf::Message &m, char **out)
ASSERT(ptr+dataLen-buf == totalLen);
memcpy(ptr, data.c_str(), dataLen);
ptr += dataLen;
*out = buf;
*out = std::move(unique_buf);
return totalLen;
}
......@@ -423,11 +421,12 @@ UDPTransport::SendMessageInternal(TransportReceiver *src,
sockaddr_in sin = dynamic_cast<const UDPTransportAddress &>(dst).addr;
// Serialize message
char *buf;
size_t msgLen = SerializeMessage(m, &buf);
std::unique_ptr<char[]> unique_buf;
size_t msgLen = SerializeMessage(m, &unique_buf);
char *buf = unique_buf.get();
int fd = fds[src];
// XXX All of this assumes that the socket is going to be
// available for writing, which since it's a UDP socket it ought
// to be.
......@@ -435,7 +434,7 @@ UDPTransport::SendMessageInternal(TransportReceiver *src,
if (sendto(fd, buf, msgLen, 0,
(sockaddr *)&sin, sizeof(sin)) < 0) {
PWarning("Failed to send message");
goto fail;
return false;
}
} else {
int numFrags = ((msgLen-1) / MAX_UDP_MESSAGE_SIZE) + 1;
......@@ -458,22 +457,17 @@ UDPTransport::SendMessageInternal(TransportReceiver *src,
*((size_t *)ptr) = msgLen;
ptr += sizeof(size_t);
memcpy(ptr, &buf[fragStart], fragLen);
if (sendto(fd, fragBuf, fragLen + fragHeaderLen, 0,
(sockaddr *)&sin, sizeof(sin)) < 0) {
PWarning("Failed to send message fragment %ld",
fragStart);
goto fail;
return false;
}
}
}
}
delete [] buf;
return true;
fail:
delete [] buf;
return false;
}
void
......
......@@ -66,7 +66,7 @@ class UDPTransport : public TransportCommon<UDPTransportAddress>
{
public:
UDPTransport(double dropRate = 0.0, double reorderRate = 0.0,
int dscp = 0, event_base *evbase = nullptr);
int dscp = 0, bool handleSignals = true);
virtual ~UDPTransport();
void Register(TransportReceiver *receiver,
const transport::Configuration &config,
......
d := $(dir $(lastword $(MAKEFILE_LIST)))
$(d)libtapir.so: $(patsubst %.o,%-pic.o, $(OBJS-all-clients))
LDFLAGS-$(d)libtapir.so += -shared
BINS += $(d)libtapir.so
d := $(dir $(lastword $(MAKEFILE_LIST)))
SRCS += $(addprefix $(d), \
server.cc client.cc server-main.cc client-main.cc \
lockserver-repl.cc)
PROTOS += $(addprefix $(d), locks-proto.proto)
$(d)server-main: $(o)server-main.o \
$(o)locks-proto.o \
$(o)server.o \
$(LIB-udptransport) \
$(OBJS-ir-replica)
$(d)client-main: $(o)client-main.o \
$(o)locks-proto.o \
$(o)client.o \
$(LIB-udptransport) \
$(OBJS-ir-client) \
$(LIB-store-common)
$(d)lockserver-repl: $(o)lockserver-repl.o \
$(o)locks-proto.o \
$(o)server.o \
$(o)client.o \
$(OBJS-ir-replica) \
$(OBJS-ir-client) \
$(LIB-configuration) \
$(LIB-repltransport) \
$(LIB-store-common) \
$(GTEST_MAIN)
BINS += $(d)server-main $(d)client-main $(d)lockserver-repl
include $(d)tests/Rules.mk
#include <thread>
#include "lockserver/client.h"
#include "lib/udptransport.h"
namespace {
void
usage()
{
printf("Unknown command.. Try again!\n");
printf("Usage: exit | q | lock <key> | unlock <key>\n");
}
} // namespace
int
main(int argc, char **argv)
{
const char *configPath = NULL;
// Parse arguments
int opt;
while ((opt = getopt(argc, argv, "c:")) != -1) {
switch (opt) {
case 'c':
configPath = optarg;
break;
default:
fprintf(stderr, "Unknown argument %s\n", argv[optind]);
}
}
if (!configPath) {
fprintf(stderr, "option -c is required\n");
return EXIT_FAILURE;
}
// Load configuration
std::ifstream configStream(configPath);
if (configStream.fail()) {
Panic("Unable to read configuration file: %s\n", configPath);
}
transport::Configuration config(configStream);
// Create lock client.
UDPTransport transport(0.0, 0.0, 0);
lockserver::LockClient locker(&transport, config);
std::thread run_transport([&transport]() { transport.Run(); });
char c, cmd[2048], *tok;
int clen, status;
string key, value;
while (1) {
printf(">> ");
fflush(stdout);
clen = 0;
while ((c = getchar()) != '\n')
cmd[clen++] = c;
cmd[clen] = '\0';
tok = strtok(cmd, " ,.-");
if (tok == NULL) continue;
if (strcasecmp(tok, "exit") == 0 || strcasecmp(tok, "q") == 0) {
printf("Exiting..\n");
break;
} else if (strcasecmp(tok, "lock") == 0) {
tok = strtok(NULL, " ,.-");
if (tok == NULL) {
usage();
continue;
}
key = string(tok);
status = locker.lock(key);
if (status) {
printf("Lock Successful\n");
} else {
printf("Failed to acquire lock..\n");
}
} else if (strcasecmp(tok, "unlock") == 0) {
tok = strtok(NULL, " ,.-");
if (tok == NULL) {
usage();
continue;
}
key = string(tok);
locker.unlock(key);
printf("Unlock Successful\n");
} else {
usage();
}
fflush(stdout);
}
transport.Stop();
run_transport.join();
return EXIT_SUCCESS;
}
// -*- mode: c++; c-file-style: "k&r"; c-basic-offset: 4 -*-
/***********************************************************************
*
* lockserver/client.cc:
* A single lockserver client with commandline interface.
*
* Copyright 2015 Naveen Kr. Sharma <naveenks@cs.washington.edu>
*
* Permission is hereby granted, free of charge, to any person
* obtaining a copy of this software and associated documentation
* files (the "Software"), to deal in the Software without
* restriction, including without limitation the rights to use, copy,
* modify, merge, publish, distribute, sublicense, and/or sell copies
* of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be
* included in all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
* MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
* NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
* BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
* ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
* CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*
**********************************************************************/
#include "lockserver/client.h"
namespace lockserver {
using namespace std;
using namespace proto;
LockClient::LockClient(Transport *transport,
const transport::Configuration &config)
: transport(transport) {
client_id = 0;
while (client_id == 0) {
random_device rd;
mt19937_64 gen(rd());
uniform_int_distribution<uint64_t> dis;
client_id = dis(gen);
}
client = new replication::ir::IRClient(config, transport, client_id);
}
LockClient::~LockClient() { }
void
LockClient::lock_async(const std::string &key) {
ASSERT(waiting == nullptr);
Debug("Sending LOCK");
string request_str;
Request request;
request.set_clientid(client_id);
request.set_key(key);
request.set_type(true);
request.SerializeToString(&request_str);
waiting = new Promise(1000);
transport->Timer(0, [=]() {
client->InvokeConsensus(request_str,
bind(&LockClient::Decide,
this,
placeholders::_1),
bind(&LockClient::LockCallback,
this,
placeholders::_1,
placeholders::_2),
bind(&LockClient::ErrorCallback,
this,
placeholders::_1,
placeholders::_2));
});
}
bool
LockClient::lock_wait() {
ASSERT(waiting != nullptr);
int status = waiting->GetReply();
delete waiting;
waiting = nullptr;
if (status == 0) {
return true;
} else if (status == -1) {
Debug("Lock held by someone else.");
}
return false;
}
void
LockClient::unlock_async(const std::string &key) {
ASSERT(waiting == nullptr);
Debug("Sending UNLOCK");
string request_str;
Request request;
request.set_clientid(client_id);
request.set_key(key);
request.set_type(false);
request.SerializeToString(&request_str);
waiting = new Promise(1000);
transport->Timer(0, [=]() {
client->InvokeInconsistent(request_str,
bind(&LockClient::UnlockCallback,
this,
placeholders::_1,
placeholders::_2));
});
}
void
LockClient::unlock_wait() {
waiting->GetReply();
delete waiting;
waiting = nullptr;
}
bool
LockClient::lock(const string &key)
{
lock_async(key);
return lock_wait();
}
void
LockClient::unlock(const string &key)
{
unlock_async(key);
return unlock_wait();
}
string
LockClient::Decide(const map<string, std::size_t> &results)
{
// If a majority say lock, we say lock.
int success_count = 0;
string key;
for (const auto& string_and_count : results) {
const string& s = string_and_count.first;
const std::size_t count = string_and_count.second;
Reply reply;
reply.ParseFromString(s);
key = reply.key();
if (reply.status() == 0) {
success_count += count;
}
}
string final_reply_str;
Reply final_reply;
final_reply.set_key(key);
if (success_count >= 2) {
final_reply.set_status(0);
} else {
final_reply.set_status(-1);
}
final_reply.SerializeToString(&final_reply_str);
return final_reply_str;
}
void
LockClient::LockCallback(const std::string &request_str, const std::string &reply_str)
{
Debug("Lock Callback: %s %s", request_str.c_str(), reply_str.c_str());
Reply reply;
reply.ParseFromString(reply_str);
Promise *w = waiting;
waiting = NULL;
w->Reply(reply.status());
}
void
LockClient::UnlockCallback(const std::string &request_str, const std::string &reply_str)
{
Debug("Lock Callback: %s %s", request_str.c_str(), reply_str.c_str());
Promise *w = waiting;
waiting = NULL;
w->Reply(0);
}
void
LockClient::ErrorCallback(const std::string &request_str,
replication::ErrorCode err)
{
Debug("Error Callback: %s %s", request_str.c_str(),
replication::ErrorCodeToString(err).c_str());
Promise *w = waiting;
waiting = NULL;
w->Reply(-3); // Invalid command.
}
} // namespace lockserver
// -*- mode: c++; c-file-style: "k&r"; c-basic-offset: 4 -*-
/***********************************************************************
*
* lockserver/client.h:
* A lockserver client interface.
*
* Copyright 2015 Naveen Kr. Sharma <naveenks@cs.washington.edu>
*
* Permission is hereby granted, free of charge, to any person
* obtaining a copy of this software and associated documentation
* files (the "Software"), to deal in the Software without
* restriction, including without limitation the rights to use, copy,
* modify, merge, publish, distribute, sublicense, and/or sell copies
* of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be
* included in all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
* MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
* NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
* BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
* ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
* CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*
**********************************************************************/
#ifndef _IR_LOCK_CLIENT_H_
#define _IR_LOCK_CLIENT_H_
#include "lib/assert.h"
#include "lib/message.h"
#include "lib/transport.h"
#include "replication/ir/client.h"
#include "store/common/promise.h"
#include "lockserver/locks-proto.pb.h"
#include <map>
#include <set>
#include <string>
#include <thread>
#include <random>
namespace lockserver {
class LockClient
{
public:
LockClient(Transport* transport, const transport::Configuration &config);
~LockClient();
// Synchronously lock and unlock. Calling lock (or unlock) will block until
// the lock (or unlock) request is fully processed.
bool lock(const std::string &key);
void unlock(const std::string &key);
// Asynchronously lock and unlock. Calling lock_async or unlock_async will
// not block. Calling lock_wait (or unlock_wait) will block for the
// previous invocation of lock_async (or unlock_async) to complete.
//
// All async calls must be followed by a corresponding wait call. It is an
// error to issue multiple async requests without waiting. It is also
// erroneous to wait for a request which was never issued.
void lock_async(const std::string &key);
bool lock_wait();
void unlock_async(const std::string &key);
void unlock_wait();
private:
/* Unique ID for this client. */
uint64_t client_id;
/* Transport layer and thread. */
Transport *transport;
/* Function to run the transport thread. */
void run_client();
/* Decide function for a lock server. */
string Decide(const std::map<string, std::size_t> &results);
/* IR client proxy. */
replication::ir::IRClient *client;
/* Promise to wait for pending operation. */
Promise *waiting = nullptr;
/* Callbacks for hearing back for an operation. */
void LockCallback(const std::string &, const std::string &);
void UnlockCallback(const std::string &, const std::string &);
void ErrorCallback(const std::string &, replication::ErrorCode);
};
} // namespace lockserver
#endif /* _IR_LOCK_CLIENT_H_ */
syntax = "proto2";
package lockserver.proto;
message Request {
required uint64 clientid = 1;
required string key = 2;
required bool type = 3;
// true = lock
// false = unlock
}
message Reply {
required string key = 1;
required int32 status = 2;
// 0 = Operation Success
// -1 = Held by someone else (for lock)
// -2 = Not held by you (for unlock)
// -3 = Invalid command
}
// -*- mode: c++; c-file-style: "k&r"; c-basic-offset: 4 -*-
/***********************************************************************
*
* lockserver-repl.cc: Step-by-step lock server evaluation.
*
* Copyright 2013 Dan R. K. Ports <drkp@cs.washington.edu>
*
* Permission is hereby granted, free of charge, to any person
* obtaining a copy of this software and associated documentation
* files (the "Software"), to deal in the Software without
* restriction, including without limitation the rights to use, copy,
* modify, merge, publish, distribute, sublicense, and/or sell copies
* of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be
* included in all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
* MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
* NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
* BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
* ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
* CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*
**********************************************************************/
#include <thread>
#include <memory>
#include "lib/configuration.h"
#include "lib/repltransport.h"
#include "lockserver/client.h"
#include "lockserver/server.h"
#include "replication/ir/replica.h"
int main() {
ReplTransport transport;
std::vector<transport::ReplicaAddress> replica_addrs = {
{"replica", "0"},
{"replica", "1"},
{"replica", "2"},
{"replica", "3"},
{"replica", "4"}};
transport::Configuration config(5 /* n */, 2 /* f */, replica_addrs);
// Clients.
lockserver::LockClient client_a(&transport, config);
lockserver::LockClient client_b(&transport, config);
lockserver::LockClient client_c(&transport, config);
client_a.lock_async("a");
client_b.lock_async("b");
client_c.lock_async("c");
// Servers.
std::vector<std::unique_ptr<lockserver::LockServer>> servers;
std::vector<std::unique_ptr<replication::ir::IRReplica>> replicas;
for (std::size_t i = 0; i < replica_addrs.size(); ++i) {
auto server = std::unique_ptr<lockserver::LockServer>(
new lockserver::LockServer());
servers.push_back(std::move(server));
auto replica = std::unique_ptr<replication::ir::IRReplica>(
new replication::ir::IRReplica(config, i, &transport,
servers[i].get()));
replicas.push_back(std::move(replica));
}
// Launch REPL.
transport.Run();
// Remove persisted files.
for (std::size_t i = 0; i < replica_addrs.size(); ++i) {
const transport::ReplicaAddress &addr = replica_addrs[i];
const std::string filename =
addr.host + ":" + addr.port + "_" + std::to_string(i) + ".bin";
int success = std::remove(filename.c_str());
ASSERT(success == 0);
}
}
#include "lockserver/server.h"
int
main(int argc, char **argv)
{
int index = -1;
const char *configPath = NULL;
// Parse arguments
int opt;
char *strtolPtr;
while ((opt = getopt(argc, argv, "c:i:")) != -1) {
switch (opt) {
case 'c':
configPath = optarg;
break;
case 'i':
index = strtol(optarg, &strtolPtr, 10);
if ((*optarg == '\0') || (*strtolPtr != '\0') || (index < 0)) {
fprintf(stderr, "option -i requires a numeric arg\n");
}
break;
default:
fprintf(stderr, "Unknown argument %s\n", argv[optind]);
}
}
if (!configPath) {
fprintf(stderr, "option -c is required\n");
return EXIT_FAILURE;
}
if (index == -1) {
fprintf(stderr, "option -i is required\n");
return EXIT_FAILURE;
}
// Load configuration
std::ifstream configStream(configPath);
if (configStream.fail()) {
fprintf(stderr, "unable to read configuration file: %s\n", configPath);
return EXIT_FAILURE;
}
transport::Configuration config(configStream);
if (index >= config.n) {
fprintf(stderr, "replica index %d is out of bounds; "
"only %d replicas defined\n", index, config.n);
return EXIT_FAILURE;
}
UDPTransport transport(0.0, 0.0, 0);
lockserver::LockServer server;
replication::ir::IRReplica replica(config, index, &transport, &server);
transport.Run();
return EXIT_SUCCESS;
}
// -*- mode: c++; c-file-style: "k&r"; c-basic-offset: 4 -*-
/***********************************************************************
*
* lockserver/server.cc:
* A lockserver replica.
*
* Copyright 2015 Naveen Kr. Sharma <naveenks@cs.washington.edu>
*
* Permission is hereby granted, free of charge, to any person
* obtaining a copy of this software and associated documentation
* files (the "Software"), to deal in the Software without
* restriction, including without limitation the rights to use, copy,
* modify, merge, publish, distribute, sublicense, and/or sell copies
* of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be
* included in all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
* MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
* NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
* BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
* ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
* CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*
**********************************************************************/
#include "lockserver/server.h"
#include <algorithm>
#include <iterator>
#include <unordered_set>
namespace lockserver {
using namespace proto;
LockServer::LockServer() { }
LockServer::~LockServer() { }
void
LockServer::ExecInconsistentUpcall(const string &str1)
{
Debug("ExecInconsistent: %s\n", str1.c_str());
Request request;
request.ParseFromString(str1);
string key = request.key();
uint64_t client_id = request.clientid();
if (request.type()) { // Lock operation.
Warning("Lock operation being sent as Inconsistent. Ignored.");
} else {
if (locks.find(key) != locks.end()) {
if (client_id == locks[key]) {
Debug("Releasing lock %lu: %s", client_id, key.c_str());
locks.erase(key);
} else {
Debug("Lock held by someone else %lu: %s, %lu",
client_id, key.c_str(), locks[key]);
}
} else {
Debug("Lock held by no one.");
}
}
}
void
LockServer::ExecConsensusUpcall(const string &str1, string &str2)
{
Debug("ExecConsensus: %s\n", str1.c_str());
Request request;
Reply reply;
request.ParseFromString(str1);
string key = request.key();
uint64_t client_id = request.clientid();
reply.set_key(key);
int status = 0;
if (request.type()) { // Lock operation.
if (locks.find(key) == locks.end()) {
Debug("Assigning lock %lu: %s", client_id, key.c_str());
locks[key] = client_id;
} else if (locks[key] != client_id) {
Debug("Lock already held %lu: %s", client_id, key.c_str());
status = -1;
}
} else {
Warning("Unlock operation being sent as Consensus");
if (locks.find(key) == locks.end()) {
Debug("Lock held by no-one %lu: %s", client_id, key.c_str());
status = -2;
} else if (locks[key] != client_id) {
Debug("Lock held by someone else %lu: %s, %lu",
client_id, key.c_str(), locks[key]);
status = -2;
} else {
Debug("Releasing lock %lu: %s", client_id, key.c_str());
locks.erase(key);
}
}
reply.set_status(status);
reply.SerializeToString(&str2);
}
void
LockServer::UnloggedUpcall(const string &str1, string &str2)
{
Debug("Unlogged: %s\n", str1.c_str());
}
void
LockServer::Sync(const std::map<opid_t, RecordEntry>& record) {
locks.clear();
struct KeyLockInfo {
std::unordered_set<uint64_t> locked;
std::unordered_set<uint64_t> unlocked;
};
std::unordered_map<std::string, KeyLockInfo> key_lock_info;
for (const std::pair<const opid_t, RecordEntry> &p : record) {
const opid_t &opid = p.first;
const RecordEntry &entry = p.second;
Request request;
request.ParseFromString(entry.request.op());
Reply reply;
reply.ParseFromString(entry.result);
KeyLockInfo &info = key_lock_info[request.key()];
Debug("Sync opid=(%lu, %lu), clientid=%lu, key=%s, type=%d, status=%d.",
opid.first, opid.second, request.clientid(),
request.key().c_str(), request.type(), reply.status());
if (request.type() && reply.status() == 0) {
// Lock.
info.locked.insert(request.clientid());
} else if (!request.type() && reply.status() == 0) {
// Unlock.
info.unlocked.insert(request.clientid());
}
}
for (const std::pair<const std::string, KeyLockInfo> &p : key_lock_info) {
const std::string &key = p.first;
const KeyLockInfo &info = p.second;
std::unordered_set<uint64_t> diff;
std::set_difference(std::begin(info.locked), std::end(info.locked),
std::begin(info.unlocked), std::end(info.unlocked),
std::inserter(diff, diff.begin()));
ASSERT(diff.size() == 0 || diff.size() == 1);
if (diff.size() == 1) {
uint64_t client_id = *std::begin(diff);
Debug("Assigning lock %lu: %s", client_id, key.c_str());
locks[key] = client_id;
}
}
}
std::map<opid_t, std::string>
LockServer::Merge(const std::map<opid_t, std::vector<RecordEntry>> &d,
const std::map<opid_t, std::vector<RecordEntry>> &u,
const std::map<opid_t, std::string> &majority_results_in_d) {
// First note that d and u only contain consensus operations, and lock
// requests are the only consensus operations (unlock is an inconsistent
// operation), so d and u only contain lock requests. To merge, we grant
// any majority successful lock request in d if it does not conflict with a
// currently held lock. We do not grant any other lock request.
std::map<opid_t, std::string> results;
using EntryVec = std::vector<RecordEntry>;
for (const std::pair<const opid_t, EntryVec>& p: d) {
const opid_t &opid = p.first;
const EntryVec &entries = p.second;
// Get the request and reply.
const RecordEntry &entry = *std::begin(entries);
Request request;
request.ParseFromString(entry.request.op());
Reply reply;
auto iter = majority_results_in_d.find(opid);
ASSERT(iter != std::end(majority_results_in_d));
reply.ParseFromString(iter->second);
// Form the final result.
const bool operation_successful = reply.status() == 0;
if (operation_successful) {
// If the lock was successful, then we acquire the lock so long as
// it is not already held.
const std::string &key = reply.key();
if (locks.count(key) == 0) {
Debug("Assigning lock %lu: %s", request.clientid(),
key.c_str());
locks[key] = request.clientid();
results[opid] = iter->second;
} else {
Debug("Rejecting lock %lu: %s", request.clientid(),
key.c_str());
reply.set_status(-1);
std::string s;
reply.SerializeToString(&s);
results[opid] = s;
}
} else {
// If the lock was not successful, then we maintain this as the
// majority result.
results[opid] = iter->second;
}
}
// We reject all lock requests in u. TODO: We could acquire a lock if
// it is free, but it's simplest to just reject them unilaterally.
for (const std::pair<const opid_t, EntryVec>& p: u) {
const opid_t &opid = p.first;
const EntryVec &entries = p.second;
const RecordEntry &entry = *std::begin(entries);
Request request;
request.ParseFromString(entry.request.op());
Debug("Rejecting lock %lu: %s", request.clientid(),
request.key().c_str());
Reply reply;
reply.set_key(request.key());
reply.set_status(-1);
std::string s;
reply.SerializeToString(&s);
results[opid] = s;
}
return results;
}
} // namespace lockserver
// -*- mode: c++; c-file-style: "k&r"; c-basic-offset: 4 -*-
/***********************************************************************
*
* lockserver/server.h:
* A lockserver replica interface.
*
* Copyright 2015 Naveen Kr. Sharma <naveenks@cs.washington.edu>
*
* Permission is hereby granted, free of charge, to any person
* obtaining a copy of this software and associated documentation
* files (the "Software"), to deal in the Software without
* restriction, including without limitation the rights to use, copy,
* modify, merge, publish, distribute, sublicense, and/or sell copies
* of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be
* included in all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
* MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
* NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
* BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
* ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
* CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*
**********************************************************************/
#ifndef _IR_LOCK_SERVER_H_
#define _IR_LOCK_SERVER_H_
#include <string>
#include <unordered_map>
#include "lib/transport.h"
#include "replication/ir/replica.h"
#include "lockserver/locks-proto.pb.h"
namespace lockserver {
using opid_t = replication::ir::opid_t;
using RecordEntry = replication::ir::RecordEntry;
class LockServer : public replication::ir::IRAppReplica
{
public:
LockServer();
~LockServer();
// Invoke inconsistent operation, no return value
void ExecInconsistentUpcall(const string &str1) override;
// Invoke consensus operation
void ExecConsensusUpcall(const string &str1, string &str2) override;
// Invoke unreplicated operation
void UnloggedUpcall(const string &str1, string &str2) override;
// Sync
void Sync(const std::map<opid_t, RecordEntry>& record) override;
// Merge
std::map<opid_t, std::string> Merge(
const std::map<opid_t, std::vector<RecordEntry>> &d,
const std::map<opid_t, std::vector<RecordEntry>> &u,
const std::map<opid_t, std::string> &majority_results_in_d) override;
private:
std::unordered_map<std::string, uint64_t> locks;
};
} // namespace lockserver
#endif /* _IR_LOCK_SERVER_H_ */
d := $(dir $(lastword $(MAKEFILE_LIST)))
GTEST_SRCS += $(addprefix $(d), lockserver-test.cc)
$(d)lockserver-test: $(o)lockserver-test.o \
$(o)../locks-proto.o \
$(o)../server.o \
$(o)../client.o \
$(OBJS-ir-replica) \
$(OBJS-ir-client) \
$(LIB-configuration) \
$(LIB-repltransport) \
$(LIB-store-common) \
$(GTEST_MAIN)
TEST_BINS += $(d)lockserver-test