Skip to content
Snippets Groups Projects
Commit 47c56bd6 authored by Mengqi Chen's avatar Mengqi Chen
Browse files

p2a/p2b works with 3 servers

parent 48b748ad
No related branches found
No related tags found
No related merge requests found
......@@ -27,10 +27,10 @@ from message import * # Can change this after testing done
if __name__ == "__main__":
# create a UDP socket
HOST, PORT = "localhost", 9000
HOST, PORT = "localhost", 8000
# TODO: Uncomment below for multiple nodes paxos
serv_addresses = ((HOST, 9000), (HOST, 9001))
serv_addresses = ((HOST, 8000), (HOST, 8001))
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.bind(('localhost', 0))
address = sock.getsockname()
......
......@@ -45,6 +45,9 @@ class Paxos(socketserver.UDPServer):
# Default leader during setup
if self.address == self.servers[0]:
self.is_leader = True
self.ballot.increaseBallot()
else:
self.highest_ballot_seen = BallotNumber(2, self.servers[0])
print("Finished init paxos")
print("servers: " + str(self.servers))
......@@ -59,8 +62,9 @@ class PaxosHandler(socketserver.BaseRequestHandler):
Init Function to initialize Heartbeat Timers
"""
def init(self):
setTimer(HeartBeatTimer(), HeartBeatTimer.HEARTBEAT_RETRY_MILLIS, self.onHeartBeatTimer)
setTimer(HeartBeatCheckTimer(self.server.highest_ballot_seen), HeartBeatTimer.HEARTBEAT_RETRY_MILLIS, self.onHeartBeatCheckTimer)
pass
#setTimer(HeartBeatTimer(), HeartBeatTimer.HEARTBEAT_RETRY_MILLIS, self.onHeartBeatTimer)
#setTimer(HeartBeatCheckTimer(self.server.highest_ballot_seen), HeartBeatTimer.HEARTBEAT_RETRY_MILLIS, self.onHeartBeatCheckTimer)
"""
The request handler class for our server.
......@@ -87,8 +91,8 @@ class PaxosHandler(socketserver.BaseRequestHandler):
print(message, "\n") # debug
if isinstance(message, PaxosRequest):
self.handlePaxosRequest(message, self.client_address)
print("got paxos request") # debug
self.handlePaxosRequest(message, self.client_address)
elif isinstance(message, P1A):
print("got p1a") # debug
......@@ -100,7 +104,7 @@ class PaxosHandler(socketserver.BaseRequestHandler):
elif isinstance(message, P2A):
print("got p2a") # debug
self.handleP2A(message, self.client_address)
self.handleP2A(message, message.addr)
elif isinstance(message, P2B):
print("got p2b") # debug
......@@ -154,16 +158,19 @@ class PaxosHandler(socketserver.BaseRequestHandler):
# TODO: TIMERS: set(new P2ATimer(p2a), P2ATimer.P2A_RETRY_MILLIS);
# setTimer(P2ATimer(p2a), P2ATimer.P2A_RETRY_MILLIS, self.onP2ATimer)
print("1\n")
if len(self.server.slot_to_acceptors[p2a.slot_num]) > (len(self.server.servers)/2):
#Majority accepted, can put into log
self.server.log[p2a.slot_num] = bvp
print("2\n")
# TODO: Executelog() function
# executeLog();
# Testing code to return the PaxosResult (TODO: Put this inside executeLog instead of here)
lock_res = self.server.lock_manager.execute(paxos_req.lock_command, self.client_address)
lock_res = self.server.lock_manager.execute(paxos_req.lock_command, bvp.value.addr)
sock = self.request[1]
print("sending resp back to client\n")
sock.sendto(pickle.dumps(PaxosResult(sock.getsockname(), lock_res)), sender)
......@@ -225,6 +232,8 @@ class PaxosHandler(socketserver.BaseRequestHandler):
# }
def handleP2A(self, p2a, sender):
print("p2a - 0\n")
print(f"highest_ballot_seen: {self.server.highest_ballot_seen}, p2a.ballot_num: {p2a.ballot_num}\n")
if self.server.highest_ballot_seen != p2a.ballot_num:
# It's not the leader, drop it
return
......@@ -232,24 +241,31 @@ class PaxosHandler(socketserver.BaseRequestHandler):
# if (status(p2a.slotNum()) == PaxosLogSlotStatus.CLEARED) {
# return;
# }
print("p2a - 1\n")
if p2a.slot_num in self.server.accepted:
bvp = self.server.accepted[p2a.slot_num]
if bvp.ballot_num <= p2a.ballot_num:
# p2a ballot is higher or equal
bvp = BallotValuePair(p2a.ballot_num, p2a.value)
self.server.accepted[p2a.slot_num, bvp]
#self.server.accepted[p2a.slot_num, bvp]
self.server.accepted[p2a.slot_num] = bvp
lock_res = self.server.lock_manager.execute(bvp.value.lock_command, bvp.value.addr)
else:
# Don't do anything
return
else:
# Have not accepted anything, then accept it
bvp = BallotValuePair(p2a.ballot_num, p2a.value)
self.server.accepted[p2a.slot_num, bvp]
#self.server.accepted[p2a.slot_num, bvp]
self.server.accepted[p2a.slot_num] = bvp
lock_res = self.server.lock_manager.execute(bvp.value.lock_command, bvp.value.addr)
print("p2a - 2\n")
self.server.slot_in = max(self.server.slot_in, p2a.slot_num + 1)
p2b = P2B(p2a.ballot_num, p2a.slot_num)
p2b = P2B(self.server.address, p2a.ballot_num, p2a.slot_num)
send_msg(p2b, sender)
print("p2a - 3\n")
def handleP2B(self, p2b, sender):
if not self.server.is_leader:
......@@ -262,7 +278,7 @@ class PaxosHandler(socketserver.BaseRequestHandler):
# }
# check if it is still consistent with our proposal
bvp = self.server.proposals(p2b.slot_num)
bvp = self.server.proposals[p2b.slot_num]
if bvp.ballot_num != p2b.ballot_num:
# No longer in proposal
return
......@@ -273,6 +289,11 @@ class PaxosHandler(socketserver.BaseRequestHandler):
# Majority accepted, can put into log
self.server.log[p2b.slot_num] = bvp
# TODO: Create executeLog() and uncomment below
lock_res = self.server.lock_manager.execute(bvp.value.lock_command, bvp.value.addr)
sock = self.request[1]
print("P2b - sending resp back to client\n")
sock.sendto(pickle.dumps(PaxosResult(self.server.address, lock_res)), bvp.value.addr)
# executeLog()
"""
......@@ -432,7 +453,7 @@ def send_msg(obj, dest_addr: Address):
if __name__ == "__main__":
HOST, PORT = sys.argv[1], int(sys.argv[2])
# addresses = ((HOST, PORT),)
addresses = ((HOST, 9000), (HOST, 9001))
addresses = ((HOST, 8000), (HOST, 8001), (HOST, 8002))
# Create the server, binding to localhost on port 9999
server = Paxos((HOST, PORT), addresses)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment