Skip to content
Snippets Groups Projects
Commit 15b0e8c9 authored by Irene Y Zhang's avatar Irene Y Zhang
Browse files

fixing bugs in tests and removing Naveen's test client

parent 896f85f2
No related branches found
No related tags found
No related merge requests found
......@@ -80,10 +80,10 @@ IRClient::InvokeInconsistent(const string &request,
});
PendingInconsistentRequest *req =
new PendingInconsistentRequest(request,
reqId,
continuation,
timer,
config.QuorumSize());
reqId,
continuation,
timer,
config.QuorumSize());
pendingReqs[reqId] = req;
SendInconsistent(req);
}
......@@ -100,8 +100,8 @@ IRClient::SendInconsistent(const PendingInconsistentRequest *req)
req->timer->Reset();
} else {
Warning("Could not send inconsistent request to replicas");
pendingReqs.erase(req->clientReqId);
delete req;
pendingReqs.erase(req->clientReqId);
delete req;
}
}
......@@ -313,40 +313,38 @@ IRClient::HandleInconsistentReply(const TransportAddress &remote,
return;
}
PendingInconsistentRequest *req = static_cast<PendingInconsistentRequest *>(it->second);
PendingInconsistentRequest *req =
static_cast<PendingInconsistentRequest *>(it->second);
// Make sure the dynamic cast worked
ASSERT(req != NULL);
Debug("Client received reply: %lu %i", reqId, req->inconsistentReplyQuorum.NumRequired());
Debug("Client received reply: %lu %i", reqId,
req->inconsistentReplyQuorum.NumRequired());
// Record replies
viewstamp_t vs = { msg.view(), reqId };
if (req->inconsistentReplyQuorum.AddAndCheckForQuorum(vs, msg.replicaidx(), msg)) {
// If all quorum received, then send finalize and return to client
req->timer->Stop();
delete req->timer;
req->timer = new Timeout(transport, 500, [this, reqId]() {
ResendConfirmation(reqId, false);
});
// asynchronously send the finalize message
proto::FinalizeInconsistentMessage response;
*(response.mutable_opid()) = msg.opid();
if (transport->SendMessageToAll(this, response)) {
req->timer->Start();
} else {
Warning("Could not send finalize message to replicas");
pendingReqs.erase(it);
delete req;
return;
}
// Return to client
if (!req->continuationInvoked) {
req->timer->Stop();
delete req->timer;
req->timer = new Timeout(transport, 500, [this, reqId]() {
ResendConfirmation(reqId, false);
});
// asynchronously send the finalize message
proto::FinalizeInconsistentMessage response;
*(response.mutable_opid()) = msg.opid();
if (transport->SendMessageToAll(this, response)) {
req->timer->Start();
} else {
Warning("Could not send finalize message to replicas");
}
req->continuation(req->request, "");
req->continuationInvoked = true;
req->continuationInvoked = true;
}
}
}
......@@ -427,7 +425,7 @@ IRClient::HandleConfirm(const TransportAddress &remote,
uint64_t reqId = msg.opid().clientreqid();
auto it = pendingReqs.find(reqId);
if (it == pendingReqs.end()) {
Debug("Received reply when no request was pending");
// ignore, we weren't waiting for the confirmation
return;
}
......@@ -436,12 +434,12 @@ IRClient::HandleConfirm(const TransportAddress &remote,
viewstamp_t vs = { msg.view(), reqId };
if (req->confirmQuorum.AddAndCheckForQuorum(vs, msg.replicaidx(), msg)) {
req->timer->Stop();
pendingReqs.erase(it);
if (!req->continuationInvoked) {
// Return to client
PendingConsensusRequest *r2 = static_cast<PendingConsensusRequest *>(req);
r2->continuation(r2->request, r2->decideResult);
}
pendingReqs.erase(it);
if (!req->continuationInvoked) {
// Return to client
PendingConsensusRequest *r2 = static_cast<PendingConsensusRequest *>(req);
r2->continuation(r2->request, r2->decideResult);
}
delete req;
}
}
......
......@@ -113,7 +113,7 @@ IRReplica::HandleFinalizeInconsistent(const TransportAddress &remote,
// Check record for the request
RecordEntry *entry = record.Find(opid);
if (entry != NULL) {
if (entry != NULL && entry->state == RECORD_STATE_TENTATIVE) {
// Mark entry as finalized
record.SetStatus(opid, RECORD_STATE_FINALIZED);
......
......@@ -8,14 +8,14 @@ GTEST_SRCS += $(addprefix $(d), \
versionstore-test.cc \
lockserver-test.cc)
$(d)kvstore-test: $(o)kvstore-test.o $(LIB-transport) $(LIB-common) $(LIB-backend) $(GTEST_MAIN)
$(d)kvstore-test: $(o)kvstore-test.o $(LIB-transport) $(LIB-store-common) $(LIB-store-backend) $(GTEST_MAIN)
TEST_BINS += $(d)kvstore-test
$(d)versionstore-test: $(o)versionstore-test.o $(LIB-transport) $(LIB-common) $(LIB-backend) $(GTEST_MAIN)
$(d)versionstore-test: $(o)versionstore-test.o $(LIB-transport) $(LIB-store-common) $(LIB-store-backend) $(GTEST_MAIN)
TEST_BINS += $(d)versionstore-test
$(d)lockserver-test: $(o)lockserver-test.o $(LIB-transport) $(LIB-common) $(LIB-backend) $(GTEST_MAIN)
$(d)lockserver-test: $(o)lockserver-test.o $(LIB-transport) $(LIB-store-common) $(LIB-store-backend) $(GTEST_MAIN)
TEST_BINS += $(d)lockserver-test
d := $(dir $(lastword $(MAKEFILE_LIST)))
SRCS += $(addprefix $(d), test-client.cc)
PROTOS += $(addprefix $(d), test-client-proto.proto)
$(d)test-client: $(LIB-message) $(LIB-tcptransport) $(LIB-udptransport) $(o)test-client-proto.o $(o)test-client.o
BINS += $(d)test-client
syntax = "proto2";
message TestMessage {
required int32 status = 1;
}
#include "lib/message.h"
#include "lib/configuration.h"
#include "lib/udptransport.h"
#include "lib/tcptransport.h"
#include "test-client/test-client-proto.pb.h"
#include <thread>
#include <fstream>
#include <sstream>
class Worker : public TransportReceiver
{
public:
Worker(const string configPath, int id)
: id(id), n(0), transport(0.0, 0.0, 0, false)
{
std::ifstream configStream(configPath, std::ifstream::in);
fprintf(stderr, "Reading %s %d\n", configPath.c_str());
if (configStream.fail()) {
Debug("Unable to read configuration file: %s\n", configPath.c_str());
}
config = new transport::Configuration(configStream);
transport.Register(this, *config, -1);
transport.Timer(100, [=]() {
Debug("Scheduling SendMessage");
this->SendMessage();
});
}
~Worker() {}
void Run()
{
transport.Run();
}
void SendMessage()
{
n++;
if (n > 10) {
transport.Stop();
return;
}
Debug("Sending Message %d %d", id, n);
TestMessage msg;
msg.set_status(n);
transport.Timer(1000, [=]() {
if (!transport.SendMessageToReplica(this, 0, msg)) {
Debug("Unable to send request");
}
this->SendMessage();
});
}
void ReceiveMessage(const TransportAddress &remote, const string &type, const string &data)
{
Debug("Received reply type: %s", type.c_str());
}
private:
int id, n;
UDPTransport transport;
//TCPTransport transport;
transport::Configuration *config;
};
void test_thread(int id)
{
Worker *w;
std::stringstream buf;
buf << "test" << id;
w = new Worker(buf.str(), id);
w->Run();
}
int main()
{
std::thread t1(test_thread, 1);
std::thread t2(test_thread, 2);
t1.join();
t2.join();
return 0;
}
f 1
replica localhost:51729
replica localhost:51730
replica localhost:51731
f 1
replica localhost:51729
replica localhost:51730
replica localhost:51731
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment