Commit afe18dd9 authored by Irene Y Zhang's avatar Irene Y Zhang

merging rdma from github

parents 40786c0f 0dec3498
......@@ -8,10 +8,10 @@ LD = g++
EXPAND = lib/tmpl/expand
#CFLAGS := -g -Wall -pthread -iquote.obj/gen -Wno-uninitialized -O2 -DNASSERT
CFLAGS := -g -Wall -pthread -iquote.obj/gen -Wno-uninitialized -lrdmacm -libverbs
CFLAGS := -g -Wall -pthread -iquote.obj/gen -Wno-uninitialized -O2 -DNASSERT
#CFLAGS := -g -Wall -pthread -iquote.obj/gen -Wno-uninitialized
CXXFLAGS := -g -std=c++0x
LDFLAGS := -levent_pthreads -lrdmacm -libverbs
LDFLAGS := -levent_pthreads
## Debian package: check
#CHECK_CFLAGS := $(shell pkg-config --cflags check)
#CHECK_LDFLAGS := $(shell pkg-config --cflags --libs check)
......@@ -31,6 +31,11 @@ LIBSSL_CFLAGS := $(shell pkg-config --cflags openssl)
LIBSSL_LDFLAGS := $(shell pkg-config --libs openssl)
CFLAGS += $(LIBSSL_CFLAGS)
LDFLAGS += $(LIBSSL_LDFLAGS)
#LDFLAGS += $(JEMALLOC_LDFLAGS)
RDMA_CFLAGS := -lrdmacm -libverbs
RDMA_LDFLAGS := -lrdmacm -libverbs
CFLAGS += $(RDMA_CFLAGS)
LDFLAGS += $(RDMA_LDFLAGS)
# Google test framework. This doesn't use pkgconfig
......
......@@ -111,11 +111,10 @@ static __attribute__((unused)) signed char _Message_FileDebugFlag = -1;
static inline bool
Message_DebugEnabled(const char *fname)
{
return true;
// if (_Message_FileDebugFlag >= 0)
// return _Message_FileDebugFlag;
// _Message_FileDebugFlag = _Message_DebugEnabled(fname);
// return true;//_Message_FileDebugFlag;
if (_Message_FileDebugFlag >= 0)
return _Message_FileDebugFlag;
_Message_FileDebugFlag = _Message_DebugEnabled(fname);
return _Message_FileDebugFlag;
}
#include "hash.h"
......
This diff is collapsed.
......@@ -48,8 +48,9 @@
#include <netinet/in.h>
#include <rdma/rdma_cma.h>
const size_t MAX_RDMA_SIZE = 4096; // Our RDMA buffers
#define MAX_RDMA_SIZE 4096 // Our RDMA buffers
#define DEFAULT_RECEIVE_NUM 4
#define MAX_RECEIVE_NUM 64
class RDMATransportAddress : public TransportAddress
{
public:
......@@ -91,6 +92,18 @@ private:
event *ev;
int id;
};
struct RDMABuffer
{
int magic;
uint8_t *start;
size_t size;
RDMABuffer *next;
RDMABuffer *prev;
bool inUse = false;
struct ibv_mr *mr;
};
struct RDMATransportRDMAListener
{
RDMATransport *transport;
......@@ -105,13 +118,12 @@ private:
event *cmevent;
event *cqevent;
// message passing space
char sendType[MAX_RDMA_SIZE];
char sendData[MAX_RDMA_SIZE];
ibv_mr *sendmr[2];
char recvType[MAX_RDMA_SIZE];
char recvData[MAX_RDMA_SIZE];
ibv_mr *recvmr[2];
std::list<RDMABuffer *> sendQ;
RDMABuffer *buffers = NULL;
int posted = DEFAULT_RECEIVE_NUM;
};
event_base *libeventBase;
int lastTimerId;
std::map<int, RDMATransportTimerInfo *> timers;
......@@ -123,20 +135,32 @@ private:
const RDMATransportAddress &dst,
const Message &m, bool multicast = false);
// Library functions for setting up the network
RDMATransportAddress
LookupAddress(const transport::ReplicaAddress &addr);
RDMATransportAddress
LookupAddress(const transport::Configuration &cfg,
int replicaIdx);
RDMATransportAddress*
BindToPort(struct rdma_cm_id *id, const string &host, const string &port);
static int PostReceive(RDMATransportRDMAListener *info);
const RDMATransportAddress *
const RDMATransportAddress*
LookupMulticastAddress(const transport::Configuration*config) { return NULL; };
void ConnectRDMA(TransportReceiver *src, const RDMATransportAddress &dst);
void ConnectRDMA(TransportReceiver *src, const RDMATransportAddress &dst,
RDMATransportAddress *
BindToPort(struct rdma_cm_id *id, const string &host, const string &port);
void ConnectRDMA(TransportReceiver *src,
const RDMATransportAddress &dst);
void ConnectRDMA(TransportReceiver *src,
const RDMATransportAddress &dst,
struct rdma_cm_id *id);
static void CleanupConnection(RDMATransportRDMAListener *info);
// Libraries for managing rdma and buffers
static int PostReceive(RDMATransportRDMAListener *info);
static RDMABuffer * AllocBuffer(RDMATransportRDMAListener *info,
size_t size = 0);
static void FreeBuffer(RDMABuffer *buf);
static int FlushSendQueue(RDMATransportRDMAListener *info);
// libevent callbacks
void OnTimer(RDMATransportTimerInfo *info);
static void TimerCallback(evutil_socket_t fd,
short what, void *arg);
......
......@@ -6,7 +6,6 @@
* and libasync
*
* Copyright 2013 Dan R. K. Ports <drkp@cs.washington.edu>
* Copyright 2018 Irene Zhang <iyzhang@cs.washington.edu>
*
* Permission is hereby granted, free of charge, to any person
* obtaining a copy of this software and associated documentation
......@@ -86,7 +85,6 @@ TCPTransport::LookupAddress(const transport::ReplicaAddress &addr)
{
int res;
struct addrinfo hints;
memset(&hints, 0, sizeof(hints));
hints.ai_family = AF_INET;
hints.ai_socktype = SOCK_STREAM;
hints.ai_protocol = 0;
......@@ -224,15 +222,6 @@ TCPTransport::ConnectTCP(TransportReceiver *src, const TCPTransportAddress &dst)
Panic("Failed to enable bufferevent");
}
// Tell the receiver its address
struct sockaddr_in sin;
socklen_t sinsize = sizeof(sin);
if (getsockname(fd, (sockaddr *) &sin, &sinsize) < 0) {
PPanic("Failed to get socket name");
}
TCPTransportAddress *addr = new TCPTransportAddress(sin);
src->SetAddress(addr);
tcpOutgoing[dst] = bev;
tcpAddresses.insert(pair<struct bufferevent*, TCPTransportAddress>(bev,dst));
......@@ -253,7 +242,7 @@ TCPTransport::Register(TransportReceiver *receiver,
// Clients don't need to accept TCP connections
if (replicaIdx == -1) {
return;
return;
}
// Create socket
......@@ -546,11 +535,11 @@ TCPTransport::TCPAcceptCallback(evutil_socket_t fd, short what, void *arg)
}
info->connectionEvents.push_back(bev);
TCPTransportAddress client = TCPTransportAddress(sin);
transport->tcpOutgoing[client] = bev;
transport->tcpAddresses.insert(pair<struct bufferevent*, TCPTransportAddress>(bev,client));
TCPTransportAddress client = TCPTransportAddress(sin);
transport->tcpOutgoing[client] = bev;
transport->tcpAddresses.insert(pair<struct bufferevent*, TCPTransportAddress>(bev,client));
Debug("Opened incoming TCP connection from %s:%d",
inet_ntoa(sin.sin_addr), htons(sin.sin_port));
inet_ntoa(sin.sin_addr), htons(sin.sin_port));
}
}
......@@ -562,62 +551,60 @@ TCPTransport::TCPReadableCallback(struct bufferevent *bev, void *arg)
struct evbuffer *evbuf = bufferevent_get_input(bev);
Debug("Readable on bufferevent %p", bev);
uint32_t *magic;
magic = (uint32_t *)evbuffer_pullup(evbuf, sizeof(*magic));
if (magic == NULL) {
return;
}
ASSERT(*magic == MAGIC);
size_t *sz;
unsigned char *x = evbuffer_pullup(evbuf, sizeof(*magic) + sizeof(*sz));
sz = (size_t *) (x + sizeof(*magic));
if (x == NULL) {
return;
}
size_t totalSize = *sz;
ASSERT(totalSize < 1073741826);
if (evbuffer_get_length(evbuf) < totalSize) {
Debug("Don't have %ld bytes for a message yet, only %ld",
totalSize, evbuffer_get_length(evbuf));
return;
}
Debug("Receiving %ld byte message", totalSize);
while (evbuffer_get_length(evbuf) > 0) {
uint32_t *magic;
magic = (uint32_t *)evbuffer_pullup(evbuf, sizeof(*magic));
if (magic == NULL) {
return;
}
ASSERT(*magic == MAGIC);
char buf[totalSize];
size_t copied = evbuffer_remove(evbuf, buf, totalSize);
ASSERT(copied == totalSize);
size_t *sz;
unsigned char *x = evbuffer_pullup(evbuf, sizeof(*magic) + sizeof(*sz));
sz = (size_t *) (x + sizeof(*magic));
if (x == NULL) {
return;
}
size_t totalSize = *sz;
ASSERT(totalSize < 1073741826);
// Parse message
char *ptr = buf + sizeof(*sz) + sizeof(*magic);
size_t typeLen = *((size_t *)ptr);
ptr += sizeof(size_t);
ASSERT((size_t)(ptr-buf) < totalSize);
if (evbuffer_get_length(evbuf) < totalSize) {
Debug("Don't have %ld bytes for a message yet, only %ld",
totalSize, evbuffer_get_length(evbuf));
return;
}
Debug("Receiving %ld byte message", totalSize);
char buf[totalSize];
size_t copied = evbuffer_remove(evbuf, buf, totalSize);
ASSERT(copied == totalSize);
// Parse message
char *ptr = buf + sizeof(*sz) + sizeof(*magic);
size_t typeLen = *((size_t *)ptr);
ptr += sizeof(size_t);
ASSERT((size_t)(ptr-buf) < totalSize);
ASSERT((size_t)(ptr+typeLen-buf) < totalSize);
string msgType(ptr, typeLen);
ptr += typeLen;
ASSERT((size_t)(ptr+typeLen-buf) < totalSize);
string msgType(ptr, typeLen);
ptr += typeLen;
size_t msgLen = *((size_t *)ptr);
ptr += sizeof(size_t);
ASSERT((size_t)(ptr-buf) < totalSize);
ASSERT((size_t)(ptr+msgLen-buf) <= totalSize);
string msg(ptr, msgLen);
ptr += msgLen;
size_t msgLen = *((size_t *)ptr);
ptr += sizeof(size_t);
ASSERT((size_t)(ptr-buf) < totalSize);
ASSERT((size_t)(ptr+msgLen-buf) <= totalSize);
string msg(ptr, msgLen);
ptr += msgLen;
auto addr = transport->tcpAddresses.find(bev);
ASSERT(addr != transport->tcpAddresses.end());
// Dispatch
info->receiver->ReceiveMessage(addr->second, msgType, msg);
Debug("Done processing large %s message", msgType.c_str());
}
auto addr = transport->tcpAddresses.find(bev);
ASSERT(addr != transport->tcpAddresses.end());
// Dispatch
info->receiver->ReceiveMessage(addr->second, msgType, msg);
Debug("Done processing large %s message", msgType.c_str());
}
void
......
......@@ -40,6 +40,11 @@
#define REPLICA_NETWORK_DELAY 0
#define READ_AT_LEADER 1
#define UDP 0
#define TCP 1
#define RDMA 2
#define TRANSPORT RDMA
class TransportAddress
{
public:
......
......@@ -215,6 +215,7 @@ UDPTransport::UDPTransport(double dropRate, double reorderRate,
event_add(x, NULL);
}
}
Debug("Using UDP Transport");
}
UDPTransport::~UDPTransport()
......
syntax = "proto2";
syntax = "proto3";
option cc_enable_arenas = true;
package lockserver.proto;
message Request {
required uint64 clientid = 1;
required string key = 2;
required bool type = 3;
uint64 clientid = 1;
string key = 2;
bool type = 3;
// true = lock
// false = unlock
}
message Reply {
required string key = 1;
required int32 status = 2;
string key = 1;
int32 status = 2;
// 0 = Operation Success
// -1 = Held by someone else (for lock)
// -2 = Not held by you (for unlock)
......
syntax = "proto2";
syntax = "proto3";
option cc_enable_arenas = true;
package replication;
message Request {
required bytes op = 1;
required uint64 clientid = 2;
required uint64 clientreqid = 3;
bytes op = 1;
uint64 clientid = 2;
uint64 clientreqid = 3;
}
message UnloggedRequest {
required bytes op = 1;
required uint64 clientid = 2;
required uint64 clientreqid = 3;
bytes op = 1;
uint64 clientid = 2;
uint64 clientreqid = 3;
}
syntax = "proto2";
syntax = "proto3";
option cc_enable_arenas = true;
import "replication/common/request.proto";
package replication.ir.proto;
message OpID {
required uint64 clientid = 1;
required uint64 clientreqid = 2;
uint64 clientid = 1;
uint64 clientreqid = 2;
}
// For the view change and recovery protocol, a replica stores two things on
......@@ -14,8 +15,8 @@ message OpID {
// NORMAL. Replicas pack this information into this proto buf and serialize it
// to disk.
message PersistedViewInfo {
required uint64 view = 1;
required uint64 latest_normal_view = 2;
uint64 view = 1;
uint64 latest_normal_view = 2;
}
enum RecordEntryState {
......@@ -29,12 +30,12 @@ enum RecordEntryType {
}
message RecordEntryProto {
required uint64 view = 1;
required OpID opid = 2;
required RecordEntryState state = 3;
required RecordEntryType type = 4;
required bytes op = 5;
required bytes result = 6;
uint64 view = 1;
OpID opid = 2;
RecordEntryState state = 3;
RecordEntryType type = 4;
bytes op = 5;
bytes result = 6;
}
// TODO: Currently, replicas send entire records to one another. Figure out if
......@@ -44,61 +45,61 @@ message RecordProto {
}
message ProposeInconsistentMessage {
required replication.Request req = 1;
replication.Request req = 1;
}
message ReplyInconsistentMessage {
required uint64 view = 1;
required uint32 replicaIdx = 2;
required OpID opid = 3;
required bool finalized = 4;
uint64 view = 1;
uint32 replicaIdx = 2;
OpID opid = 3;
bool finalized = 4;
}
message FinalizeInconsistentMessage {
required OpID opid = 1;
OpID opid = 1;
}
message ConfirmMessage {
required uint64 view = 1;
required uint32 replicaIdx = 2;
required OpID opid = 3;
uint64 view = 1;
uint32 replicaIdx = 2;
OpID opid = 3;
}
message ProposeConsensusMessage {
required replication.Request req = 1;
replication.Request req = 1;
}
message ReplyConsensusMessage {
required uint64 view = 1;
required uint32 replicaIdx = 2;
required OpID opid = 3;
required bytes result = 4;
required bool finalized = 5;
uint64 view = 1;
uint32 replicaIdx = 2;
OpID opid = 3;
bytes result = 4;
bool finalized = 5;
}
message FinalizeConsensusMessage {
required OpID opid = 1;
required bytes result = 2;
OpID opid = 1;
bytes result = 2;
}
message DoViewChangeMessage {
required uint32 replicaIdx = 1;
// record is optional because a replica only sends its record to the
uint32 replicaIdx = 1;
// record is because a replica only sends its record to the
// leader of the new view.
optional RecordProto record = 2;
required uint64 new_view = 3;
required uint64 latest_normal_view = 4;
RecordProto record = 2;
uint64 new_view = 3;
uint64 latest_normal_view = 4;
}
message StartViewMessage {
required RecordProto record = 1;
required uint64 new_view = 2;
RecordProto record = 1;
uint64 new_view = 2;
}
message UnloggedRequestMessage {
required replication.UnloggedRequest req = 1;
replication.UnloggedRequest req = 1;
}
message UnloggedReplyMessage {
required bytes reply = 1;
required uint64 clientreqid = 2;
bytes reply = 1;
uint64 clientreqid = 2;
}
syntax = "proto2";
syntax = "proto3";
option cc_enable_arenas = true;
import "replication/common/request.proto";
package replication.vr.proto;
message RequestMessage {
required replication.Request req = 1;
replication.Request req = 1;
}
message ReplyMessage {
required uint64 view = 1;
required uint64 opnum = 2;
required uint64 clientreqid = 3;
required bytes reply = 4;
uint64 view = 1;
uint64 opnum = 2;
uint64 clientreqid = 3;
bytes reply = 4;
}
message UnloggedRequestMessage {
required replication.UnloggedRequest req = 1;
replication.UnloggedRequest req = 1;
}
message UnloggedReplyMessage {
required bytes reply = 1;
required uint64 clientreqid = 2;
bytes reply = 1;
uint64 clientreqid = 2;
}
message PrepareMessage {
required uint64 view = 1;
required uint64 opnum = 2;
required uint64 batchstart = 3;
uint64 view = 1;
uint64 opnum = 2;
uint64 batchstart = 3;
repeated Request request = 4;
}
message PrepareOKMessage {
required uint64 view = 1;
required uint64 opnum = 2;
required uint32 replicaIdx = 3;
uint64 view = 1;
uint64 opnum = 2;
uint32 replicaIdx = 3;
}
message CommitMessage {
required uint64 view = 1;
required uint64 opnum = 2;
uint64 view = 1;
uint64 opnum = 2;
}
message RequestStateTransferMessage {
required uint64 view = 1;
required uint64 opnum = 2;
uint64 view = 1;
uint64 opnum = 2;
}
message StateTransferMessage {
message LogEntry {
required uint64 view = 1;
required uint64 opnum = 2;
required replication.Request request = 3;
optional uint32 state = 4;
optional bytes hash = 5;
uint64 view = 1;
uint64 opnum = 2;
replication.Request request = 3;
uint32 state = 4;
bytes hash = 5;
}
required uint64 view = 1;
required uint64 opnum = 2;
uint64 view = 1;
uint64 opnum = 2;
repeated LogEntry entries = 3;
}
message StartViewChangeMessage {
required uint64 view = 1;
required uint32 replicaIdx = 2;
required uint64 lastCommitted = 3;
uint64 view = 1;
uint32 replicaIdx = 2;
uint64 lastCommitted = 3;
}
message DoViewChangeMessage {
message LogEntry {
required uint64 view = 1;
required uint64 opnum = 2;
required replication.Request request = 3;
optional uint32 state = 4;
optional bytes hash = 5;
uint64 view = 1;
uint64 opnum = 2;
replication.Request request = 3;
uint32 state = 4;
bytes hash = 5;
}
required uint64 view = 1;
required uint64 lastNormalView = 2;
required uint64 lastOp = 3;
required uint64 lastCommitted = 4;
uint64 view = 1;
uint64 lastNormalView = 2;
uint64 lastOp = 3;
uint64 lastCommitted = 4;
repeated LogEntry entries = 5;
required uint32 replicaIdx = 6;
uint32 replicaIdx = 6;
}
message StartViewMessage {
message LogEntry {
required uint64 view = 1;
required uint64 opnum = 2;
required replication.Request request = 3;
optional uint32 state = 4;
optional bytes hash = 5;
uint64 view = 1;
uint64 opnum = 2;
replication.Request request = 3;
uin