diff --git a/lib/message.cc b/lib/message.cc index b2affccbab9e682e0c8145733db09d3826f82ab3..3aa7dc7a997637b48b3a961ab3bd101b566d8fbc 100644 --- a/lib/message.cc +++ b/lib/message.cc @@ -50,7 +50,7 @@ #define TIMESTAMP_BASE62 0 #define TIMESTAMP_NUMERIC 1 -std::mutex mtx; +std::mutex message_mtx; void __attribute__((weak)) Message_VA(enum Message_Type type, @@ -79,7 +79,7 @@ _Message_VA(enum Message_Type type, FILE *fp, const char *fmt, va_list args) { // Lock mutex to make sure the output is not mangled. - mtx.lock(); + message_mtx.lock(); static int haveColor = -1; struct msg_desc { @@ -159,7 +159,7 @@ _Message_VA(enum Message_Type type, FILE *fp, fflush(fp); // Unlock mutex. - mtx.unlock(); + message_mtx.unlock(); } void _Panic(void) diff --git a/replication/ir/client.cc b/replication/ir/client.cc index 279ae17ffda7ca1f3f22f20ed13c4c994bead067..7825794b19f6455da7e61465780ecbbc1780eaf1 100644 --- a/replication/ir/client.cc +++ b/replication/ir/client.cc @@ -280,10 +280,10 @@ IRClient::ReceiveMessage(const TransportAddress &remote, const string &type, const string &data) { - static proto::ReplyInconsistentMessage replyInconsistent; - static proto::ReplyConsensusMessage replyConsensus; - static proto::ConfirmMessage confirm; - static proto::UnloggedReplyMessage unloggedReply; + proto::ReplyInconsistentMessage replyInconsistent; + proto::ReplyConsensusMessage replyConsensus; + proto::ConfirmMessage confirm; + proto::UnloggedReplyMessage unloggedReply; if (type == replyInconsistent.GetTypeName()) { replyInconsistent.ParseFromString(data); diff --git a/replication/ir/replica.cc b/replication/ir/replica.cc index 69ee001ef1892ecbe63ed447abb45df28572fe7a..411669232b2bb11ae141f490db37877225efa110 100644 --- a/replication/ir/replica.cc +++ b/replication/ir/replica.cc @@ -37,11 +37,11 @@ void IRReplica::HandleMessage(const TransportAddress &remote, const string &type, const string &data) { - static ProposeInconsistentMessage proposeInconsistent; - static FinalizeInconsistentMessage finalizeInconsistent; - static ProposeConsensusMessage proposeConsensus; - static FinalizeConsensusMessage finalizeConsensus; - static UnloggedRequestMessage unloggedRequest; + ProposeInconsistentMessage proposeInconsistent; + FinalizeInconsistentMessage finalizeInconsistent; + ProposeConsensusMessage proposeConsensus; + FinalizeConsensusMessage finalizeConsensus; + UnloggedRequestMessage unloggedRequest; if (type == proposeInconsistent.GetTypeName()) { proposeInconsistent.ParseFromString(data); diff --git a/replication/vr/client.cc b/replication/vr/client.cc index 9bb8731476d1fc3b887baa2779ca669ef6e712ee..f8065cb07766e18f04220300c5c8b68db1c62428 100644 --- a/replication/vr/client.cc +++ b/replication/vr/client.cc @@ -137,8 +137,8 @@ VRClient::ReceiveMessage(const TransportAddress &remote, const string &type, const string &data) { - static proto::ReplyMessage reply; - static proto::UnloggedReplyMessage unloggedReply; + proto::ReplyMessage reply; + proto::UnloggedReplyMessage unloggedReply; if (type == reply.GetTypeName()) { reply.ParseFromString(data); diff --git a/replication/vr/replica.cc b/replication/vr/replica.cc index 6315ac5208674fc7982d4001002886cd90b92a44..09ecf24d6f7d44157f6950d3df31b28e23d757fe 100644 --- a/replication/vr/replica.cc +++ b/replication/vr/replica.cc @@ -349,16 +349,16 @@ void VRReplica::ReceiveMessage(const TransportAddress &remote, const string &type, const string &data) { - static RequestMessage request; - static UnloggedRequestMessage unloggedRequest; - static PrepareMessage prepare; - static PrepareOKMessage prepareOK; - static CommitMessage commit; - static RequestStateTransferMessage requestStateTransfer; - static StateTransferMessage stateTransfer; - static StartViewChangeMessage startViewChange; - static DoViewChangeMessage doViewChange; - static StartViewMessage startView; + RequestMessage request; + UnloggedRequestMessage unloggedRequest; + PrepareMessage prepare; + PrepareOKMessage prepareOK; + CommitMessage commit; + RequestStateTransferMessage requestStateTransfer; + StateTransferMessage stateTransfer; + StartViewChangeMessage startViewChange; + DoViewChangeMessage doViewChange; + StartViewMessage startView; if (type == request.GetTypeName()) { request.ParseFromString(data); diff --git a/store/tapirstore/client.cc b/store/tapirstore/client.cc index e6cdd908f2b986d685b241b53f761bc8e5dbd4d4..7b4fd76be43d38d84f28fe7b8c0681cca8074156 100644 --- a/store/tapirstore/client.cc +++ b/store/tapirstore/client.cc @@ -37,7 +37,7 @@ using namespace std; Client::Client(const string configPath, int nShards, int closestReplica, TrueTime timeServer) - : nshards(nShards), transport(0.0, 0.0, 0), timeServer(timeServer) + : nshards(nShards), transport(0.0, 0.0, 0, false), timeServer(timeServer) { // Initialize all state here; client_id = 0; @@ -93,7 +93,7 @@ Client::run_client() void Client::Begin() { - Debug("BEGIN Transaction"); + Debug("BEGIN [%lu]", t_id + 1); t_id++; participants.clear(); } @@ -102,7 +102,7 @@ Client::Begin() int Client::Get(const string &key, string &value) { - Debug("GET Operation [%s]", key.c_str()); + Debug("GET [%lu : %s]", t_id, key.c_str()); // Contact the appropriate shard to get the value. int i = key_to_shard(key, nshards); @@ -133,7 +133,7 @@ Client::Get(const string &key) int Client::Put(const string &key, const string &value) { - Debug("PUT Operation [%s]", key.c_str()); + Debug("PUT [%lu : %s]", t_id, key.c_str()); // Contact the appropriate shard to set the value. int i = key_to_shard(key, nshards); @@ -158,11 +158,10 @@ Client::Prepare(Timestamp ×tamp) uint64_t proposed = 0; list<Promise *> promises; - Debug("PREPARE Transaction at %lu", timestamp.getTimestamp()); + Debug("PREPARE [%lu] at %lu", t_id, timestamp.getTimestamp()); ASSERT(participants.size() > 0); for (auto p : participants) { - Debug("Sending prepare to shard [%d]", p); promises.push_back(new Promise(PREPARE_TIMEOUT)); bclient[p]->Prepare(timestamp, promises.back()); } @@ -176,11 +175,11 @@ Client::Prepare(Timestamp ×tamp) switch(p->GetReply()) { case REPLY_OK: - Debug("Prepare ok vote"); + Debug("PREPARE [%lu] OK", t_id); continue; case REPLY_FAIL: // abort! - Debug("ABORT transaction"); + Debug("PREPARE [%lu] ABORT", t_id); return REPLY_FAIL; case REPLY_RETRY: status = REPLY_RETRY; @@ -207,10 +206,10 @@ Client::Prepare(Timestamp ×tamp) } else { timestamp.setTimestamp(proposed); } - Debug("RETRY transaction at %lu", timestamp.getTimestamp()); + Debug("RETRY [%lu] at [%lu]", t_id, timestamp.getTimestamp()); } - Debug("All PREPARE replies received"); + Debug("All PREPARE's [%lu] received", t_id); return status; } @@ -232,10 +231,9 @@ Client::Commit() } if (status == REPLY_OK) { - Debug("COMMIT Transaction"); + Debug("COMMIT [%lu]", t_id); for (auto p : participants) { - Debug("Sending commit to shard [%d]", p); bclient[p]->Commit(0); } return true; @@ -250,18 +248,10 @@ Client::Commit() void Client::Abort() { - Debug("ABORT Transaction"); - list<Promise *> promises; + Debug("ABORT [%lu]", t_id); for (auto p : participants) { - promises.push_back(new Promise(ABORT_TIMEOUT)); - bclient[p]->Abort(promises.back()); - } - - // Wait for responses for aborts - for (auto p : promises) { - p->GetReply(); - delete p; + bclient[p]->Abort(); } } diff --git a/store/tapirstore/shardclient.cc b/store/tapirstore/shardclient.cc index de9e2f9245d34ebd3d98e5233d74acdf2fae31be..ee9cec44eff9fe9fe5b35bedbf5cd1a52aa04897 100644 --- a/store/tapirstore/shardclient.cc +++ b/store/tapirstore/shardclient.cc @@ -43,8 +43,7 @@ ShardClient::ShardClient(const string &configPath, { ifstream configStream(configPath); if (configStream.fail()) { - fprintf(stderr, "unable to read configuration file: %s\n", - configPath.c_str()); + Panic("Unable to read configuration file: %s\n", configPath.c_str()); } transport::Configuration config(configStream); @@ -85,7 +84,7 @@ void ShardClient::Get(uint64_t id, const string &key, Promise *promise) { // Send the GET operation to appropriate shard. - Debug("[shard %i] Sending GET [%s]", shard, key.c_str()); + Debug("[shard %i] Sending GET [%lu : %s]", shard, id, key.c_str()); // create request string request_str; @@ -117,7 +116,7 @@ ShardClient::Get(uint64_t id, const string &key, const Timestamp ×tamp, Promise *promise) { // Send the GET operation to appropriate shard. - Debug("[shard %i] Sending GET [%s]", shard, key.c_str()); + Debug("[shard %i] Sending GET [%lu : %s]", shard, id, key.c_str()); // create request string request_str; @@ -158,7 +157,7 @@ void ShardClient::Prepare(uint64_t id, const Transaction &txn, const Timestamp ×tamp, Promise *promise) { - Debug("[shard %i] Sending PREPARE: %lu", shard, id); + Debug("[shard %i] Sending PREPARE [%lu]", shard, id); // create prepare request string request_str; @@ -221,7 +220,7 @@ ShardClient::Commit(uint64_t id, const Transaction &txn, uint64_t timestamp, Promise *promise) { - Debug("[shard %i] Sending COMMIT: %lu", shard, id); + Debug("[shard %i] Sending COMMIT [%lu]", shard, id); // create commit request string request_str; @@ -245,7 +244,7 @@ ShardClient::Commit(uint64_t id, const Transaction &txn, void ShardClient::Abort(uint64_t id, const Transaction &txn, Promise *promise) { - Debug("[shard %i] Sending ABORT: %lu", shard, id); + Debug("[shard %i] Sending ABORT [%lu]", shard, id); // create abort request string request_str; @@ -284,7 +283,7 @@ ShardClient::GetCallback(const string &request_str, const string &reply_str) Reply reply; reply.ParseFromString(reply_str); - Debug("[shard %i] Received GET callback [%d]", shard, reply.status()); + Debug("[shard %lu:%i] GET callback [%d]", client_id, shard, reply.status()); if (waiting != NULL) { Promise *w = waiting; waiting = NULL; @@ -303,7 +302,7 @@ ShardClient::PrepareCallback(const string &request_str, const string &reply_str) Reply reply; reply.ParseFromString(reply_str); - Debug("[shard %i] Received PREPARE callback [%d]", shard, reply.status()); + Debug("[shard %lu:%i] PREPARE callback [%d]", client_id, shard, reply.status()); if (waiting != NULL) { Promise *w = waiting; @@ -328,7 +327,7 @@ ShardClient::CommitCallback(const string &request_str, const string &reply_str) if (waiting != NULL) { waiting = NULL; } - Debug("[shard %i] Received COMMIT callback", shard); + Debug("[shard %lu:%i] COMMIT callback", client_id, shard); } /* Callback from a shard replica on abort operation completion. */ @@ -343,7 +342,7 @@ ShardClient::AbortCallback(const string &request_str, const string &reply_str) if (waiting != NULL) { waiting = NULL; } - Debug("[shard %i] Received ABORT callback", shard); + Debug("[shard %lu:%i] ABORT callback", client_id, shard); } } // namespace tapir diff --git a/store/weakstore/server.cc b/store/weakstore/server.cc index cec1351dfd03f453222195bb1dc575ab2e2250fe..dce64b7670b0368818426d864d0414a106632279 100644 --- a/store/weakstore/server.cc +++ b/store/weakstore/server.cc @@ -54,8 +54,8 @@ void Server::HandleMessage(const TransportAddress &remote, const string &type, const string &data) { - static GetMessage get; - static PutMessage put; + GetMessage get; + PutMessage put; if (type == get.GetTypeName()) { get.ParseFromString(data); diff --git a/store/weakstore/shardclient.cc b/store/weakstore/shardclient.cc index 7fe97fd9f43969b688b4e960eb6ed48350e4971c..0296044c22ef9d508056b8ffc59078c21a1dd264 100644 --- a/store/weakstore/shardclient.cc +++ b/store/weakstore/shardclient.cc @@ -152,8 +152,8 @@ ShardClient::ReceiveMessage(const TransportAddress &remote, const string &type, const string &data) { - static GetReplyMessage getReply; - static PutReplyMessage putReply; + GetReplyMessage getReply; + PutReplyMessage putReply; Debug("Received reply type: %s", type.c_str());