From a6bb1e80ea108880da50b3b70b4bb0c9ea53de52 Mon Sep 17 00:00:00 2001 From: "Naveen Kr. Sharma" <naveenks@cs.washington.edu> Date: Wed, 18 Jan 2017 18:22:44 -0800 Subject: [PATCH] Thread-safe clients by removing static variables (performance hit?) --- lib/message.cc | 6 +++--- replication/ir/client.cc | 8 ++++---- replication/ir/replica.cc | 10 +++++----- replication/vr/client.cc | 4 ++-- replication/vr/replica.cc | 20 +++++++++---------- store/tapirstore/client.cc | 34 ++++++++++++--------------------- store/tapirstore/shardclient.cc | 21 ++++++++++---------- store/weakstore/server.cc | 4 ++-- store/weakstore/shardclient.cc | 4 ++-- 9 files changed, 50 insertions(+), 61 deletions(-) diff --git a/lib/message.cc b/lib/message.cc index b2affcc..3aa7dc7 100644 --- a/lib/message.cc +++ b/lib/message.cc @@ -50,7 +50,7 @@ #define TIMESTAMP_BASE62 0 #define TIMESTAMP_NUMERIC 1 -std::mutex mtx; +std::mutex message_mtx; void __attribute__((weak)) Message_VA(enum Message_Type type, @@ -79,7 +79,7 @@ _Message_VA(enum Message_Type type, FILE *fp, const char *fmt, va_list args) { // Lock mutex to make sure the output is not mangled. - mtx.lock(); + message_mtx.lock(); static int haveColor = -1; struct msg_desc { @@ -159,7 +159,7 @@ _Message_VA(enum Message_Type type, FILE *fp, fflush(fp); // Unlock mutex. - mtx.unlock(); + message_mtx.unlock(); } void _Panic(void) diff --git a/replication/ir/client.cc b/replication/ir/client.cc index 279ae17..7825794 100644 --- a/replication/ir/client.cc +++ b/replication/ir/client.cc @@ -280,10 +280,10 @@ IRClient::ReceiveMessage(const TransportAddress &remote, const string &type, const string &data) { - static proto::ReplyInconsistentMessage replyInconsistent; - static proto::ReplyConsensusMessage replyConsensus; - static proto::ConfirmMessage confirm; - static proto::UnloggedReplyMessage unloggedReply; + proto::ReplyInconsistentMessage replyInconsistent; + proto::ReplyConsensusMessage replyConsensus; + proto::ConfirmMessage confirm; + proto::UnloggedReplyMessage unloggedReply; if (type == replyInconsistent.GetTypeName()) { replyInconsistent.ParseFromString(data); diff --git a/replication/ir/replica.cc b/replication/ir/replica.cc index 69ee001..4116692 100644 --- a/replication/ir/replica.cc +++ b/replication/ir/replica.cc @@ -37,11 +37,11 @@ void IRReplica::HandleMessage(const TransportAddress &remote, const string &type, const string &data) { - static ProposeInconsistentMessage proposeInconsistent; - static FinalizeInconsistentMessage finalizeInconsistent; - static ProposeConsensusMessage proposeConsensus; - static FinalizeConsensusMessage finalizeConsensus; - static UnloggedRequestMessage unloggedRequest; + ProposeInconsistentMessage proposeInconsistent; + FinalizeInconsistentMessage finalizeInconsistent; + ProposeConsensusMessage proposeConsensus; + FinalizeConsensusMessage finalizeConsensus; + UnloggedRequestMessage unloggedRequest; if (type == proposeInconsistent.GetTypeName()) { proposeInconsistent.ParseFromString(data); diff --git a/replication/vr/client.cc b/replication/vr/client.cc index 9bb8731..f8065cb 100644 --- a/replication/vr/client.cc +++ b/replication/vr/client.cc @@ -137,8 +137,8 @@ VRClient::ReceiveMessage(const TransportAddress &remote, const string &type, const string &data) { - static proto::ReplyMessage reply; - static proto::UnloggedReplyMessage unloggedReply; + proto::ReplyMessage reply; + proto::UnloggedReplyMessage unloggedReply; if (type == reply.GetTypeName()) { reply.ParseFromString(data); diff --git a/replication/vr/replica.cc b/replication/vr/replica.cc index 6315ac5..09ecf24 100644 --- a/replication/vr/replica.cc +++ b/replication/vr/replica.cc @@ -349,16 +349,16 @@ void VRReplica::ReceiveMessage(const TransportAddress &remote, const string &type, const string &data) { - static RequestMessage request; - static UnloggedRequestMessage unloggedRequest; - static PrepareMessage prepare; - static PrepareOKMessage prepareOK; - static CommitMessage commit; - static RequestStateTransferMessage requestStateTransfer; - static StateTransferMessage stateTransfer; - static StartViewChangeMessage startViewChange; - static DoViewChangeMessage doViewChange; - static StartViewMessage startView; + RequestMessage request; + UnloggedRequestMessage unloggedRequest; + PrepareMessage prepare; + PrepareOKMessage prepareOK; + CommitMessage commit; + RequestStateTransferMessage requestStateTransfer; + StateTransferMessage stateTransfer; + StartViewChangeMessage startViewChange; + DoViewChangeMessage doViewChange; + StartViewMessage startView; if (type == request.GetTypeName()) { request.ParseFromString(data); diff --git a/store/tapirstore/client.cc b/store/tapirstore/client.cc index e6cdd90..7b4fd76 100644 --- a/store/tapirstore/client.cc +++ b/store/tapirstore/client.cc @@ -37,7 +37,7 @@ using namespace std; Client::Client(const string configPath, int nShards, int closestReplica, TrueTime timeServer) - : nshards(nShards), transport(0.0, 0.0, 0), timeServer(timeServer) + : nshards(nShards), transport(0.0, 0.0, 0, false), timeServer(timeServer) { // Initialize all state here; client_id = 0; @@ -93,7 +93,7 @@ Client::run_client() void Client::Begin() { - Debug("BEGIN Transaction"); + Debug("BEGIN [%lu]", t_id + 1); t_id++; participants.clear(); } @@ -102,7 +102,7 @@ Client::Begin() int Client::Get(const string &key, string &value) { - Debug("GET Operation [%s]", key.c_str()); + Debug("GET [%lu : %s]", t_id, key.c_str()); // Contact the appropriate shard to get the value. int i = key_to_shard(key, nshards); @@ -133,7 +133,7 @@ Client::Get(const string &key) int Client::Put(const string &key, const string &value) { - Debug("PUT Operation [%s]", key.c_str()); + Debug("PUT [%lu : %s]", t_id, key.c_str()); // Contact the appropriate shard to set the value. int i = key_to_shard(key, nshards); @@ -158,11 +158,10 @@ Client::Prepare(Timestamp ×tamp) uint64_t proposed = 0; list<Promise *> promises; - Debug("PREPARE Transaction at %lu", timestamp.getTimestamp()); + Debug("PREPARE [%lu] at %lu", t_id, timestamp.getTimestamp()); ASSERT(participants.size() > 0); for (auto p : participants) { - Debug("Sending prepare to shard [%d]", p); promises.push_back(new Promise(PREPARE_TIMEOUT)); bclient[p]->Prepare(timestamp, promises.back()); } @@ -176,11 +175,11 @@ Client::Prepare(Timestamp ×tamp) switch(p->GetReply()) { case REPLY_OK: - Debug("Prepare ok vote"); + Debug("PREPARE [%lu] OK", t_id); continue; case REPLY_FAIL: // abort! - Debug("ABORT transaction"); + Debug("PREPARE [%lu] ABORT", t_id); return REPLY_FAIL; case REPLY_RETRY: status = REPLY_RETRY; @@ -207,10 +206,10 @@ Client::Prepare(Timestamp ×tamp) } else { timestamp.setTimestamp(proposed); } - Debug("RETRY transaction at %lu", timestamp.getTimestamp()); + Debug("RETRY [%lu] at [%lu]", t_id, timestamp.getTimestamp()); } - Debug("All PREPARE replies received"); + Debug("All PREPARE's [%lu] received", t_id); return status; } @@ -232,10 +231,9 @@ Client::Commit() } if (status == REPLY_OK) { - Debug("COMMIT Transaction"); + Debug("COMMIT [%lu]", t_id); for (auto p : participants) { - Debug("Sending commit to shard [%d]", p); bclient[p]->Commit(0); } return true; @@ -250,18 +248,10 @@ Client::Commit() void Client::Abort() { - Debug("ABORT Transaction"); - list<Promise *> promises; + Debug("ABORT [%lu]", t_id); for (auto p : participants) { - promises.push_back(new Promise(ABORT_TIMEOUT)); - bclient[p]->Abort(promises.back()); - } - - // Wait for responses for aborts - for (auto p : promises) { - p->GetReply(); - delete p; + bclient[p]->Abort(); } } diff --git a/store/tapirstore/shardclient.cc b/store/tapirstore/shardclient.cc index de9e2f9..ee9cec4 100644 --- a/store/tapirstore/shardclient.cc +++ b/store/tapirstore/shardclient.cc @@ -43,8 +43,7 @@ ShardClient::ShardClient(const string &configPath, { ifstream configStream(configPath); if (configStream.fail()) { - fprintf(stderr, "unable to read configuration file: %s\n", - configPath.c_str()); + Panic("Unable to read configuration file: %s\n", configPath.c_str()); } transport::Configuration config(configStream); @@ -85,7 +84,7 @@ void ShardClient::Get(uint64_t id, const string &key, Promise *promise) { // Send the GET operation to appropriate shard. - Debug("[shard %i] Sending GET [%s]", shard, key.c_str()); + Debug("[shard %i] Sending GET [%lu : %s]", shard, id, key.c_str()); // create request string request_str; @@ -117,7 +116,7 @@ ShardClient::Get(uint64_t id, const string &key, const Timestamp ×tamp, Promise *promise) { // Send the GET operation to appropriate shard. - Debug("[shard %i] Sending GET [%s]", shard, key.c_str()); + Debug("[shard %i] Sending GET [%lu : %s]", shard, id, key.c_str()); // create request string request_str; @@ -158,7 +157,7 @@ void ShardClient::Prepare(uint64_t id, const Transaction &txn, const Timestamp ×tamp, Promise *promise) { - Debug("[shard %i] Sending PREPARE: %lu", shard, id); + Debug("[shard %i] Sending PREPARE [%lu]", shard, id); // create prepare request string request_str; @@ -221,7 +220,7 @@ ShardClient::Commit(uint64_t id, const Transaction &txn, uint64_t timestamp, Promise *promise) { - Debug("[shard %i] Sending COMMIT: %lu", shard, id); + Debug("[shard %i] Sending COMMIT [%lu]", shard, id); // create commit request string request_str; @@ -245,7 +244,7 @@ ShardClient::Commit(uint64_t id, const Transaction &txn, void ShardClient::Abort(uint64_t id, const Transaction &txn, Promise *promise) { - Debug("[shard %i] Sending ABORT: %lu", shard, id); + Debug("[shard %i] Sending ABORT [%lu]", shard, id); // create abort request string request_str; @@ -284,7 +283,7 @@ ShardClient::GetCallback(const string &request_str, const string &reply_str) Reply reply; reply.ParseFromString(reply_str); - Debug("[shard %i] Received GET callback [%d]", shard, reply.status()); + Debug("[shard %lu:%i] GET callback [%d]", client_id, shard, reply.status()); if (waiting != NULL) { Promise *w = waiting; waiting = NULL; @@ -303,7 +302,7 @@ ShardClient::PrepareCallback(const string &request_str, const string &reply_str) Reply reply; reply.ParseFromString(reply_str); - Debug("[shard %i] Received PREPARE callback [%d]", shard, reply.status()); + Debug("[shard %lu:%i] PREPARE callback [%d]", client_id, shard, reply.status()); if (waiting != NULL) { Promise *w = waiting; @@ -328,7 +327,7 @@ ShardClient::CommitCallback(const string &request_str, const string &reply_str) if (waiting != NULL) { waiting = NULL; } - Debug("[shard %i] Received COMMIT callback", shard); + Debug("[shard %lu:%i] COMMIT callback", client_id, shard); } /* Callback from a shard replica on abort operation completion. */ @@ -343,7 +342,7 @@ ShardClient::AbortCallback(const string &request_str, const string &reply_str) if (waiting != NULL) { waiting = NULL; } - Debug("[shard %i] Received ABORT callback", shard); + Debug("[shard %lu:%i] ABORT callback", client_id, shard); } } // namespace tapir diff --git a/store/weakstore/server.cc b/store/weakstore/server.cc index cec1351..dce64b7 100644 --- a/store/weakstore/server.cc +++ b/store/weakstore/server.cc @@ -54,8 +54,8 @@ void Server::HandleMessage(const TransportAddress &remote, const string &type, const string &data) { - static GetMessage get; - static PutMessage put; + GetMessage get; + PutMessage put; if (type == get.GetTypeName()) { get.ParseFromString(data); diff --git a/store/weakstore/shardclient.cc b/store/weakstore/shardclient.cc index 7fe97fd..0296044 100644 --- a/store/weakstore/shardclient.cc +++ b/store/weakstore/shardclient.cc @@ -152,8 +152,8 @@ ShardClient::ReceiveMessage(const TransportAddress &remote, const string &type, const string &data) { - static GetReplyMessage getReply; - static PutReplyMessage putReply; + GetReplyMessage getReply; + PutReplyMessage putReply; Debug("Received reply type: %s", type.c_str()); -- GitLab