Skip to content
Snippets Groups Projects
Commit aa1dbd38 authored by Irene Zhang's avatar Irene Zhang
Browse files

updating some ycsb stuff and the tcp transport

parent f264018d
No related branches found
No related tags found
No related merge requests found
...@@ -2,9 +2,9 @@ ...@@ -2,9 +2,9 @@
# Top-level makefile for IR and TAPIR # Top-level makefile for IR and TAPIR
# #
CC = clang CC = gcc
CXX = clang++ CXX = g++
LD = clang++ LD = g++
EXPAND = lib/tmpl/expand EXPAND = lib/tmpl/expand
#CFLAGS := -g -Wall -pthread -iquote.obj/gen -Wno-uninitialized -O2 -DNASSERT #CFLAGS := -g -Wall -pthread -iquote.obj/gen -Wno-uninitialized -O2 -DNASSERT
......
...@@ -128,7 +128,8 @@ BindToPort(int fd, const string &host, const string &port) ...@@ -128,7 +128,8 @@ BindToPort(int fd, const string &host, const string &port)
hints.ai_flags = AI_PASSIVE; hints.ai_flags = AI_PASSIVE;
struct addrinfo *ai; struct addrinfo *ai;
int res; int res;
if ((res = getaddrinfo(host.c_str(), port.c_str(), if ((res = getaddrinfo(host.c_str(),
port.c_str(),
&hints, &ai))) { &hints, &ai))) {
Panic("Failed to resolve host/port %s:%s: %s", Panic("Failed to resolve host/port %s:%s: %s",
host.c_str(), port.c_str(), gai_strerror(res)); host.c_str(), port.c_str(), gai_strerror(res));
...@@ -203,11 +204,20 @@ TCPTransport::ConnectTCP(TransportReceiver *src, const TCPTransportAddress &dst) ...@@ -203,11 +204,20 @@ TCPTransport::ConnectTCP(TransportReceiver *src, const TCPTransportAddress &dst)
PWarning("Failed to set O_NONBLOCK on outgoing TCP socket"); PWarning("Failed to set O_NONBLOCK on outgoing TCP socket");
} }
TCPTransportTCPListener *info = new TCPTransportTCPListener();
info->transport = this;
info->acceptFd = 0;
info->receiver = src;
info->replicaIdx = -1;
info->acceptEvent = NULL;
tcpListeners.push_back(info);
struct bufferevent *bev = struct bufferevent *bev =
bufferevent_socket_new(libeventBase, fd, bufferevent_socket_new(libeventBase, fd,
BEV_OPT_CLOSE_ON_FREE); BEV_OPT_CLOSE_ON_FREE);
bufferevent_setcb(bev, NULL, NULL, bufferevent_setcb(bev, TCPReadableCallback, NULL,
TCPOutgoingEventCallback, this); TCPOutgoingEventCallback, info);
if (bufferevent_socket_connect(bev, if (bufferevent_socket_connect(bev,
(struct sockaddr *)&(dst.addr), (struct sockaddr *)&(dst.addr),
...@@ -235,10 +245,14 @@ TCPTransport::Register(TransportReceiver *receiver, ...@@ -235,10 +245,14 @@ TCPTransport::Register(TransportReceiver *receiver,
ASSERT(replicaIdx < config.n); ASSERT(replicaIdx < config.n);
struct sockaddr_in sin; struct sockaddr_in sin;
TCPTransportTCPListener *info = new TCPTransportTCPListener();
//const transport::Configuration *canonicalConfig = //const transport::Configuration *canonicalConfig =
RegisterConfiguration(receiver, config, replicaIdx); RegisterConfiguration(receiver, config, replicaIdx);
// Clients don't need to accept TCP connections
if (replicaIdx == -1) {
return;
}
// Create socket // Create socket
int fd; int fd;
if ((fd = socket(AF_INET, SOCK_STREAM, 0)) < 0) { if ((fd = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
...@@ -257,23 +271,19 @@ TCPTransport::Register(TransportReceiver *receiver, ...@@ -257,23 +271,19 @@ TCPTransport::Register(TransportReceiver *receiver,
PWarning("Failed to set SO_REUSEADDR on TCP listening socket"); PWarning("Failed to set SO_REUSEADDR on TCP listening socket");
} }
if (replicaIdx != -1) { // Registering a replica. Bind socket to the designated
// Registering a replica. Bind socket to the designated // host/port
// host/port const string &host = config.replica(replicaIdx).host;
const string &host = config.replica(replicaIdx).host; const string &port = config.replica(replicaIdx).port;
const string &port = config.replica(replicaIdx).port; BindToPort(fd, host, port);
BindToPort(fd, host, port);
} else {
// Registering a client. Bind to any available host/port
BindToPort(fd, "", "any");
}
// Listen for connections // Listen for connections
if (listen(fd, 5) < 0) { if (listen(fd, 5) < 0) {
PPanic("Failed to listen for TCP connections"); PPanic("Failed to listen for TCP connections");
} }
// Create event to accept connections // Create event to accept connections
TCPTransportTCPListener *info = new TCPTransportTCPListener();
info->transport = this; info->transport = this;
info->acceptFd = fd; info->acceptFd = fd;
info->receiver = receiver; info->receiver = receiver;
...@@ -333,24 +343,24 @@ TCPTransport::SendMessageInternal(TransportReceiver *src, ...@@ -333,24 +343,24 @@ TCPTransport::SendMessageInternal(TransportReceiver *src,
*((uint32_t *) ptr) = MAGIC; *((uint32_t *) ptr) = MAGIC;
ptr += sizeof(uint32_t); ptr += sizeof(uint32_t);
ASSERT(ptr-buf < totalLen); ASSERT((size_t)(ptr-buf) < totalLen);
*((size_t *) ptr) = totalLen; *((size_t *) ptr) = totalLen;
ptr += sizeof(size_t); ptr += sizeof(size_t);
ASSERT(ptr-buf < totalLen); ASSERT((size_t)(ptr-buf) < totalLen);
*((size_t *) ptr) = typeLen; *((size_t *) ptr) = typeLen;
ptr += sizeof(size_t); ptr += sizeof(size_t);
ASSERT(ptr-buf < totalLen); ASSERT((size_t)(ptr-buf) < totalLen);
ASSERT(ptr+typeLen-buf < totalLen); ASSERT((size_t)(ptr+typeLen-buf) < totalLen);
memcpy(ptr, type.c_str(), typeLen); memcpy(ptr, type.c_str(), typeLen);
ptr += typeLen; ptr += typeLen;
*((size_t *) ptr) = dataLen; *((size_t *) ptr) = dataLen;
ptr += sizeof(size_t); ptr += sizeof(size_t);
ASSERT(ptr-buf < totalLen); ASSERT((size_t)(ptr-buf) < totalLen);
ASSERT(ptr+dataLen-buf == totalLen); ASSERT((size_t)(ptr+dataLen-buf) == totalLen);
memcpy(ptr, data.c_str(), dataLen); memcpy(ptr, data.c_str(), dataLen);
ptr += dataLen; ptr += dataLen;
...@@ -377,6 +387,8 @@ TCPTransport::Stop() ...@@ -377,6 +387,8 @@ TCPTransport::Stop()
int int
TCPTransport::Timer(uint64_t ms, timer_callback_t cb) TCPTransport::Timer(uint64_t ms, timer_callback_t cb)
{ {
std::lock_guard<std::mutex> lck(mtx);
TCPTransportTimerInfo *info = new TCPTransportTimerInfo(); TCPTransportTimerInfo *info = new TCPTransportTimerInfo();
struct timeval tv; struct timeval tv;
...@@ -401,12 +413,14 @@ TCPTransport::Timer(uint64_t ms, timer_callback_t cb) ...@@ -401,12 +413,14 @@ TCPTransport::Timer(uint64_t ms, timer_callback_t cb)
bool bool
TCPTransport::CancelTimer(int id) TCPTransport::CancelTimer(int id)
{ {
std::lock_guard<std::mutex> lck(mtx);
TCPTransportTimerInfo *info = timers[id]; TCPTransportTimerInfo *info = timers[id];
if (info == NULL) { if (info == NULL) {
return false; return false;
} }
timers.erase(info->id);
event_del(info->ev); event_del(info->ev);
event_free(info->ev); event_free(info->ev);
delete info; delete info;
...@@ -426,9 +440,13 @@ TCPTransport::CancelAllTimers() ...@@ -426,9 +440,13 @@ TCPTransport::CancelAllTimers()
void void
TCPTransport::OnTimer(TCPTransportTimerInfo *info) TCPTransport::OnTimer(TCPTransportTimerInfo *info)
{ {
timers.erase(info->id); {
event_del(info->ev); std::lock_guard<std::mutex> lck(mtx);
event_free(info->ev);
timers.erase(info->id);
event_del(info->ev);
event_free(info->ev);
}
info->cb(); info->cb();
...@@ -518,7 +536,9 @@ TCPTransport::TCPAcceptCallback(evutil_socket_t fd, short what, void *arg) ...@@ -518,7 +536,9 @@ TCPTransport::TCPAcceptCallback(evutil_socket_t fd, short what, void *arg)
} }
info->connectionEvents.push_back(bev); info->connectionEvents.push_back(bev);
transport->tcpAddresses.insert(pair<struct bufferevent*, TCPTransportAddress>(bev,TCPTransportAddress(sin))); TCPTransportAddress client = TCPTransportAddress(sin);
transport->tcpOutgoing[client] = bev;
transport->tcpAddresses.insert(pair<struct bufferevent*, TCPTransportAddress>(bev,client));
Debug("Opened incoming TCP connection from %s:%d", Debug("Opened incoming TCP connection from %s:%d",
inet_ntoa(sin.sin_addr), htons(sin.sin_port)); inet_ntoa(sin.sin_addr), htons(sin.sin_port));
} }
...@@ -565,7 +585,7 @@ TCPTransport::TCPReadableCallback(struct bufferevent *bev, void *arg) ...@@ -565,7 +585,7 @@ TCPTransport::TCPReadableCallback(struct bufferevent *bev, void *arg)
ptr += sizeof(size_t); ptr += sizeof(size_t);
ASSERT((size_t)(ptr-buf) < totalSize); ASSERT((size_t)(ptr-buf) < totalSize);
ASSERT(ptr+typeLen-buf < totalSize); ASSERT((size_t)(ptr+typeLen-buf) < totalSize);
string msgType(ptr, typeLen); string msgType(ptr, typeLen);
ptr += typeLen; ptr += typeLen;
...@@ -573,7 +593,7 @@ TCPTransport::TCPReadableCallback(struct bufferevent *bev, void *arg) ...@@ -573,7 +593,7 @@ TCPTransport::TCPReadableCallback(struct bufferevent *bev, void *arg)
ptr += sizeof(size_t); ptr += sizeof(size_t);
ASSERT((size_t)(ptr-buf) < totalSize); ASSERT((size_t)(ptr-buf) < totalSize);
ASSERT(ptr+msgLen-buf <= totalSize); ASSERT((size_t)(ptr+msgLen-buf) <= totalSize);
string msg(ptr, msgLen); string msg(ptr, msgLen);
ptr += msgLen; ptr += msgLen;
...@@ -605,7 +625,8 @@ void ...@@ -605,7 +625,8 @@ void
TCPTransport::TCPOutgoingEventCallback(struct bufferevent *bev, TCPTransport::TCPOutgoingEventCallback(struct bufferevent *bev,
short what, void *arg) short what, void *arg)
{ {
TCPTransport *transport = (TCPTransport *)arg; TCPTransportTCPListener *info = (TCPTransportTCPListener *)arg;
TCPTransport *transport = info->transport;
auto it = transport->tcpAddresses.find(bev); auto it = transport->tcpAddresses.find(bev);
ASSERT(it != transport->tcpAddresses.end()); ASSERT(it != transport->tcpAddresses.end());
TCPTransportAddress addr = it->second; TCPTransportAddress addr = it->second;
......
// -*- mode: c++; c-file-style: "k&r"; c-basic-offset: 4 -*- / -*- mode: c++; c-file-style: "k&r"; c-basic-offset: 4 -*-
/*********************************************************************** /***********************************************************************
* *
* udptransport.h: * udptransport.h:
...@@ -44,6 +44,7 @@ ...@@ -44,6 +44,7 @@
#include <unordered_map> #include <unordered_map>
#include <list> #include <list>
#include <random> #include <random>
#include <mutex>
#include <netinet/in.h> #include <netinet/in.h>
class TCPTransportAddress : public TransportAddress class TCPTransportAddress : public TransportAddress
...@@ -79,6 +80,7 @@ public: ...@@ -79,6 +80,7 @@ public:
void CancelAllTimers(); void CancelAllTimers();
private: private:
std::mutex mtx;
struct TCPTransportTimerInfo struct TCPTransportTimerInfo
{ {
TCPTransport *transport; TCPTransport *transport;
......
...@@ -120,6 +120,14 @@ Client::Get(const string &key, string &value) ...@@ -120,6 +120,14 @@ Client::Get(const string &key, string &value)
return promise.GetReply(); return promise.GetReply();
} }
string
Client::Get(const string &key)
{
string value;
Get(key, value);
return value;
}
/* Sets the value corresponding to the supplied key. */ /* Sets the value corresponding to the supplied key. */
int int
Client::Put(const string &key, const string &value) Client::Put(const string &key, const string &value)
......
...@@ -59,11 +59,16 @@ class Client : public ::Client ...@@ -59,11 +59,16 @@ class Client : public ::Client
public: public:
Client(const std::string configPath, int nshards, Client(const std::string configPath, int nshards,
int closestReplica, TrueTime timeserver); int closestReplica, TrueTime timeserver);
Client(const std::string configPath, int nshards,
int closestReplica)
{ Client(configPath, nshards, closestReplica, TrueTime(0,0)); };
virtual ~Client(); virtual ~Client();
// Overriding functions from ::Client. // Overriding functions from ::Client.
void Begin(); void Begin();
int Get(const std::string &key, std::string &value); int Get(const std::string &key, std::string &value);
// Interface added for Java bindings
std::string Get(const std::string &key);
int Put(const std::string &key, const std::string &value); int Put(const std::string &key, const std::string &value);
bool Commit(); bool Commit();
void Abort(); void Abort();
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment