Newer
Older
# http://pymotw.com/2/SocketServer/
import pickle
import socket
import socketserver
import sys
from collections import defaultdict
from typing import Set, DefaultDict, List
from lockmanager import LockManager
from message import *
from paxos_utils import *
from timers import P2ATimer, setTimer, HeartBeatCheckTimer, HeartBeatTimer, LeaderElectionTimer
# Paxos servers
class Paxos(socketserver.UDPServer):
def __init__(self, address, servers) -> None:
self.address: Address = address # Our address tuple (ip, port)
self.servers: List[Address] = servers # All servers addresses
self.n_server: int = len(self.servers)
self.is_leader: List[bool] = [False] * self.n_server # Are we the leader?
self.ballot: BallotNumber = BallotNumber(1, self.address) # Our ballot number
self.highest_ballot_seen: List[BallotNumber] = [] # Highest ballot number seen per slot
self.proposals: Dict[int, BallotValuePair] = dict() # Used by leader to send P2A to acceptors, slot number -> proposal
self.accepted: Dict[int, BallotValuePair] = dict() # Used by acceptors in P2A and also P1B, slot number -> proposal
self.log: Dict[int, BallotValuePair] = dict() # The log
self.leader_recent_ping: List[bool] = [False] * self.n_server # True if we think leader is alive
self.slot_in: int = 0 # First empty slot that the server owns
self.slot_out: int = 0 # First non-executed proposal slot (+1 last executed slot)
self.slot_to_acceptors: DefaultDict[int, Set[Address]] = defaultdict(set) # Used by leader to decide majority for each slot after P2B, Multimap<Integer, Address>
# for leader election
self.voters: List[Set[Address]] = [set()] * self.n_server # Yes votes for leader election, set of addresses
self.p1b_replies: Dict[int, BallotValuePair] = dict() # Accepted values for each slot by acceptors that voted yes, slot number -> proposal
# lock manager app
self.lock_manager: LockManager = LockManager()
# ...
# Garbage collection
self.replicasLastExecuted: Dict[Address, int] = dict() # Replicas to last executed slot
self.lastGarbageCollected: int = 0 # Our last garbage collected slot
for i in range(self.n_server):
if self.address == self.servers[i]:
self.is_leader[i] = True
# Change starting slot
self.slot_in = i
self.leader_recent_ping[i] = True
self.highest_ballot_seen.append(BallotNumber(1, self.servers[i]))
# to simulate latencies between servers
self.latencies = dict()
with open("config.json", encoding='utf-8') as f:
json_dict = json.loads(f.read())
distances = json_dict["distances"]
self.simulate_wan = json_dict["simulate_wan"]
for i in range(0, len(distances), 2):
curr_addr = (distances[i][0], distances[i][1])
if curr_addr == self.address:
for j in range(0, len(distances[i + 1]), 3):
dest_addr = (distances[i + 1][j], distances[i + 1][j+1])
latency = distances[i + 1][j+2]
self.latencies[dest_addr] = latency
print(f"latencies: {self.latencies}")
print("Finished init paxos", file=sys.stdout)
print(f"servers: {self.servers}", file=sys.stdout)
print(f"address: {self.address}", file=sys.stdout)
print(f"highest ballot seen: {[str(i) for i in self.highest_ballot_seen]}", file=sys.stdout)
print(f"is_leader: {self.is_leader}\n", file=sys.stdout)
socketserver.UDPServer.__init__(self, address, PaxosHandler)
setTimer(HeartBeatTimer(), HeartBeatTimer.HEARTBEAT_RETRY_MILLIS, self.onHeartBeatTimer)
setTimer(HeartBeatCheckTimer(self.highest_ballot_seen), HeartBeatCheckTimer.HEARTBEAT_CHECK_RETRY_MILLIS, self.onHeartBeatCheckTimer)
def handlePaxosRequest(self, paxos_req, sender):
# print(f"{self.address} Got Paxos Request from {sender}", file=sys.stdout)
# if not self.is_leader:
# # Broadcast to all, leader will receive and propose
# if sender not in self.servers:
# for acceptor in self.servers:
# if acceptor != self.address:
# self.send_msg(paxos_req, acceptor)
# pass
# return
# Change: All replicas can accept client request, no need to forward and return
print(f"Leader at {self.address} Handling paxos request", file=sys.stdout)
bvp = BallotValuePair(BallotNumber(self.ballot.seq_num, self.address), paxos_req)
self.proposals[self.slot_in] = bvp
self.accepted[self.slot_out] = bvp
p2a = P2A(self.address, BallotNumber(self.ballot.seq_num, self.address), self.slot_in, paxos_req)
# increase slot_in
# self.slot_in += 1
self.incrementSlotIn()
# P2A
for acceptor in self.servers:
if acceptor != self.address:
self.send_msg(p2a, acceptor)
# accept our own proposal
self.slot_to_acceptors[p2a.slot_num].add(self.address)
setTimer(P2ATimer(p2a), P2ATimer.P2A_RETRY_MILLIS, self.onP2ATimer)
if len(self.slot_to_acceptors[p2a.slot_num]) > (len(self.servers) / 2):
# Majority accepted, can put into log
self.log[p2a.slot_num] = bvp
self.executeLog("handlePaxosRequest")
def handleP1A(self, p1a, sender):
leader_slot = p1a.leader_slot
if self.highest_ballot_seen[leader_slot] < p1a.ballot_num:
self.highest_ballot_seen[leader_slot] = p1a.ballot_num
# Update the ballot for other highest ballot seen from the same address
for i in range(self.n_server):
if p1a.ballot_num.addr == self.highest_ballot_seen[i].addr:
if p1a.ballot_num.seq_num > self.highest_ballot_seen[i].seq_num:
self.highest_ballot_seen[i] = p1a.ballot_num
# If we are leader, make this a follower (since the one sending P1A thinks they are leader)
if self.is_leader[leader_slot]:
print(f"{self.address} demotes itself from leader from slot {leader_slot}")
self.is_leader[leader_slot] = False
p1b = P1B(self.address, self.highest_ballot_seen[leader_slot], self.accepted, leader_slot)
setTimer(HeartBeatCheckTimer(self.highest_ballot_seen), HeartBeatCheckTimer.HEARTBEAT_CHECK_RETRY_MILLIS * 2, self.onHeartBeatCheckTimer)
self.leader_recent_ping[leader_slot] = True
leader_slot = p1b.leader_slot
if self.is_leader[leader_slot]:
# If receive majority of response from acceptors with its ballot, becomes leader
if self.ballot == p1b.accepted_ballot and self.ballot == self.highest_ballot_seen[leader_slot]:
self.voters[leader_slot].update(sender)
for slot in p1b.accepted:
new_bvp = p1b.accepted[slot]
if slot not in self.p1b_replies:
self.p1b_replies[slot] = new_bvp
else:
cur_bvp = self.p1b_replies[slot]
if new_bvp.ballot_num >= cur_bvp.ballot_num:
self.p1b_replies[slot] = new_bvp
if len([self.voters[leader_slot]]) > (len(self.servers) / 2 - 1):
if not self.is_leader[leader_slot]:
print(f"{self.address} becomes leader for slot {leader_slot}")
self.is_leader[leader_slot] = True
# Must update its state with accepted values from acceptors
for slot in self.p1b_replies:
value = self.p1b_replies[slot]
if self.status(slot) != PaxosLogSlotStatus.CHOSEN:
self.proposals[slot] = BallotValuePair(BallotNumber(self.ballot.seq_num, self.address), value.value)
# self.slot_in = max(self.slot_in, slot + 1)
# Changed: Instead of doing max of slot_in, do while loop:
while self.slot_in <= slot:
# This chunk of code is not needed, we just need to increment slot_in, executeLog will be the one taking care of proposing SKIP
# # Propose no-op(SKIP message) and send it straight away for the slot we skip
# skip = P2A(self.address, BallotNumber(self.ballot.seq_num, self.address), self.slot_in, None)
# for acceptor in self.servers:
# if acceptor != self.address:
# self.send_msg(skip, acceptor)
# bvp = BallotValuePair(BallotNumber(self.ballot.seq_num, self.address), None)
# self.log[self.slot_in] = bvp
self.incrementSlotIn()
bvp = self.proposals[slot]
p2a = P2A(self.address, bvp.ballot_num, slot, bvp.value)
for acceptor in self.servers:
if acceptor != self.address:
self.send_msg(p2a, acceptor)
self.accepted[slot] = BallotValuePair(BallotNumber(self.ballot.seq_num, self.address), value.value)
self.slot_to_acceptors[p2a.slot_num].add(self.address)
setTimer(P2ATimer(p2a), P2ATimer.P2A_RETRY_MILLIS, self.onP2ATimer)
def handleP2A(self, p2a, sender):
if p2a.slot_num <= self.lastGarbageCollected:
return
# print("p2a - 0\n", file=sys.stdout)
# print(f"highest_ballot_seen: {self.highest_ballot_seen}, p2a.ballot_num: {p2a.ballot_num}\n", file=sys.stdout)
# No longer drop immediately if not leader
# if self.highest_ballot_seen != p2a.ballot_num:
# # It's not the leader, drop it
# return
# Skip messages are learned imediately
if p2a.value is not None and self.highest_ballot_seen[self.slotToLeaderIndex(p2a.slot_num)] != p2a.ballot_num:
# Drop if value is not Skip and is proposed by non-coordinator for that slot
print(f"{self.address} Dropped because {p2a.ballot_num} is not from {self.highest_ballot_seen[self.slotToLeaderIndex(p2a.slot_num)]}")
print(f"{self.address} highest ballot seen is {[str(i) for i in self.highest_ballot_seen]}")
return
# If it is a skip and from the coordinator, immediately learns it (put it in their log)
if p2a.value is None and self.highest_ballot_seen[self.slotToLeaderIndex(p2a.slot_num)] == p2a.ballot_num:
# Learn it immediately if value is None and proposed by the leader
bvp = BallotValuePair(BallotNumber(p2a.ballot_num.seq_num, p2a.addr), None)
self.log[p2a.slot_num] = bvp
self.slot_to_acceptors[p2a.slot_num].add(self.address)
self.proposals[p2a.slot_num] = bvp
self.accepted[p2a.slot_num] = bvp
print(f"Learned skip for {p2a}")
return
if p2a.slot_num in self.accepted:
bvp = self.accepted[p2a.slot_num]
if bvp.ballot_num <= p2a.ballot_num:
# p2a ballot is higher or equal
bvp = BallotValuePair(p2a.ballot_num, p2a.value)
self.accepted[p2a.slot_num] = bvp
else:
# Don't do anything
return
else:
# Have not accepted anything, then accept it
bvp = BallotValuePair(p2a.ballot_num, p2a.value)
self.accepted[p2a.slot_num] = bvp
# print("p2a - 2\n")
# self.slot_in = max(self.slot_in, p2a.slot_num + 1)
# Changed: Instead of doing max of slot_in, do while loop:
while self.slot_in <= p2a.slot_num:
skip = P2A(self.address, BallotNumber(self.ballot.seq_num, self.address), self.slot_in, None)
for acceptor in self.servers:
if acceptor != self.address:
self.send_msg(skip, acceptor)
bvp = BallotValuePair(BallotNumber(self.ballot.seq_num, self.address), None)
self.log[self.slot_in] = bvp
self.incrementSlotIn()
p2b = P2B(self.address, p2a.ballot_num, p2a.slot_num)
self.send_msg(p2b, sender)
# print("p2a - 3\n")
def handleP2B(self, p2b, sender):
if p2b.slot_num <= self.lastGarbageCollected:
return
slot = self.slotToLeaderIndex(p2b.slot_num)
if not self.is_leader[slot]:
# Not leader, drop message
return
# check if it is still consistent with our proposal
bvp = self.proposals[p2b.slot_num]
if bvp.ballot_num != p2b.ballot_num:
# No longer in proposal
return
# Keep track of who have accepted
self.slot_to_acceptors[p2b.slot_num].add(sender)
if len(self.slot_to_acceptors[p2b.slot_num]) > (len(self.servers) / 2):
# Majority accepted, can put into log
self.log[p2b.slot_num] = bvp
self.executeLog("P2B")
def handleLeaderHeartbeat(self, heartbeat, address):
for i in range(len(heartbeat.leader_slot)):
if not self.is_leader[i] and heartbeat.leader_slot[i]:
# print(f"{self.address} got heartbeat from {heartbeat.ballot_num} at index {i}")
if self.highest_ballot_seen[i] > heartbeat.ballot_num:
# It's not "leader" heartbeat
# print(f"{self.address} drops heartbeat from {heartbeat.ballot_num} at index {i} because of highest is {self.highest_ballot_seen[i]}")
return
newLeaderSeen = False
if self.highest_ballot_seen[i] < heartbeat.ballot_num:
self.is_leader[i] = False
self.highest_ballot_seen[i] = heartbeat.ballot_num
newLeaderSeen = True
# Replace log with the bigger log slot
for slot in heartbeat.log:
if slot not in self.log:
self.log[slot] = heartbeat.log[slot]
else:
bvp = self.log[slot]
new_bvp = heartbeat.log[slot]
if new_bvp > bvp:
self.log[slot] = new_bvp
lastExecuted = self.replicasLastExecuted[heartbeat.addr]
lastExecuted = max(lastExecuted, heartbeat.lastExecutedSlot)
self.replicasLastExecuted[heartbeat.addr] = lastExecuted
# TODO: Uncomment below
self.executeLog("Handle Leader Heartbeat")
self.garbageCollect(heartbeat.garbageCollectUntil)
self.leader_recent_ping[i] = True
if newLeaderSeen:
# Exponential backoff
setTimer(HeartBeatCheckTimer(self.highest_ballot_seen), HeartBeatCheckTimer.HEARTBEAT_CHECK_RETRY_MILLIS * 2, self.onHeartBeatCheckTimer)
"""
Timer Handlers
Argument 1 needs to be a Timer
"""
def onP2ATimer(self, p2a_timer: P2ATimer):
# print(f"{p2a_timer}: Callback", file=sys.stdout)
# If not leader then stop timer
if p2a_timer.p2a.slot_num <= self.lastGarbageCollected:
return
leader_slot = self.slotToLeaderIndex(p2a_timer.p2a.slot_num)
if self.is_leader[leader_slot] and not self.status(p2a_timer.p2a.slot_num) == PaxosLogSlotStatus.CHOSEN:
for acceptor_addr in self.servers:
if acceptor_addr != self.address:
self.send_msg(p2a_timer.p2a, acceptor_addr)
setTimer(p2a_timer, P2ATimer.P2A_RETRY_MILLIS, self.onP2ATimer)
def onHeartBeatTimer(self, heartbeat_timer: HeartBeatTimer):
# print(f"{heartbeat_timer}: Callback", file=sys.stdout)
self.executeLog("HB timer")
# Changed: Heartbeat is done for all servers, since they are all leader
for acceptor_addr in self.servers:
if acceptor_addr != self.address:
lh = LeaderHeartbeat(self.address, self.log, self.ballot, self.is_leader, self.garbageCollectSlot(), self.slot_out - 1)
self.send_msg(lh, acceptor_addr)
setTimer(heartbeat_timer, HeartBeatTimer.HEARTBEAT_RETRY_MILLIS, self.onHeartBeatTimer)
def onHeartBeatCheckTimer(self, heartbeat_check_timer: HeartBeatCheckTimer):
# Change: Since everyone is leader, heartbeat is now being sent from all leader, we
# need to keep track which heartbeat is last heard, and if one leader is dead, we take
# over that leader slot.
# Drop if this is not the latest heartbeat check
if heartbeat_check_timer.ballot_num != self.highest_ballot_seen:
return
# print(f"{heartbeat_check_timer}: Callback", file=sys.stdout)
# print(f"{self.address}: Recent ping {self.leader_recent_ping}", file=sys.stdout)
# print(f"{self.address}: Leader {self.is_leader}", file=sys.stdout)
# print(f"{self.address}: Ballot {[str(i) for i in self.highest_ballot_seen]}", file=sys.stdout)
# print("\n")
for i in range(len(heartbeat_check_timer.ballot_num)):
if not self.is_leader[i]:
if heartbeat_check_timer.ballot_num[i] == self.highest_ballot_seen[i]:
# Check if the leader alive or not
if not self.leader_recent_ping[i]:
# Leader is dead
# Just for randomization for contention issue
# TODO: Uncomment below for the electing leader, will need to add index
# to know which slot are we trying to become the leader of.
self.__electLeader(i)
return
self.leader_recent_ping[i] = False
setTimer(heartbeat_check_timer, HeartBeatCheckTimer.HEARTBEAT_CHECK_RETRY_MILLIS, self.onHeartBeatCheckTimer)
def onLeaderElectionTimer(self, leader_election_timer: LeaderElectionTimer):
# print(f"{leader_election_timer}: Callback", file=sys.stdout)
leader_slot = leader_election_timer.p1a.leader_slot
if self.highest_ballot_seen[leader_slot] == self.ballot and not self.is_leader[leader_slot]:
for acceptor_addr in self.servers:
if acceptor_addr != self.address:
self.send_msg(leader_election_timer.p1a, acceptor_addr)
setTimer(leader_election_timer, LeaderElectionTimer.LEADER_ELECTION_TIMER, self.onLeaderElectionTimer)
def __electLeader(self, leader_slot):
print(f"{self.address} Detected leader {leader_slot} is dead, try to get ourself to become leader")
# Try to elect ourself as the leader
# Try to get elected as leader at the beginning of time
self.voters[leader_slot].clear()
self.p1b_replies.clear()
self.p1b_replies.update(self.accepted)
# Increase ballot until higher than the highest we saw before electing
while self.ballot < self.highest_ballot_seen[leader_slot]:
for i in range(self.n_server):
if self.is_leader[i]:
self.highest_ballot_seen[i] = self.ballot
p1a: P1A = P1A(self.address, BallotNumber(self.ballot.seq_num, self.address), leader_slot)
self.highest_ballot_seen[leader_slot] = p1a.ballot_num
# P1A
for acceptor_addr in self.servers:
if acceptor_addr != self.address:
self.send_msg(p1a, acceptor_addr)
setTimer(LeaderElectionTimer(p1a), LeaderElectionTimer.LEADER_ELECTION_TIMER, self.onLeaderElectionTimer)
def status(self, log_slot_num) -> PaxosLogSlotStatus:
if log_slot_num in self.log:
return PaxosLogSlotStatus.CHOSEN
return PaxosLogSlotStatus.EMPTY
def garbageCollectSlot(self) -> int:
if len(self.replicasLastExecuted) != self.n_server:
return 0
slot = self.slot_out - 1
for val in self.replicasLastExecuted.values():
slot = min(val, slot)
return slot
# print(f"{self.address} executes log {[str(i) + str(self.log[i]) for i in self.log]}")
foundEmpty = False
for j in range(self.slot_out, self.slot_in):
status = self.status(j)
if status == PaxosLogSlotStatus.CHOSEN:
if foundEmpty:
continue
bvp = self.log[j]
# execute and reply to client, skips no-op
if bvp.value is not None:
lock_res = self.lock_manager.execute(bvp.value.lock_command, bvp.value.addr)
# print(f"{context}, executeLog - sending resp back to client\n")
self.socket.sendto(pickle.dumps(PaxosResult(self.address, lock_res, bvp.value.lock_command)), bvp.value.addr)
if self.is_leader[self.slotToLeaderIndex(j)]:
self.lock_manager.lockstatus()
self.replicasLastExecuted[self.address] = self.slot_out - 1
else:
foundEmpty = True
if status == PaxosLogSlotStatus.EMPTY:
if self.is_leader[self.slotToLeaderIndex(j)]:
p2a = P2A(self.address, BallotNumber(self.ballot.seq_num, self.address), j, None)
for tmp_server in self.servers:
if tmp_server != self.address:
self.send_msg(p2a, tmp_server)
bvp = BallotValuePair(BallotNumber(self.ballot.seq_num, self.address), None)
self.log[self.slot_in] = bvp
self.proposals[j] = bvp
self.accepted[j] = bvp
def garbageCollect(self, until: int):
if until <= self.lastGarbageCollected:
# No need to do anything its already garbage collected
return
# Clean up the slots
for i in range(self.lastGarbageCollected, until + 1):
if i in self.log:
del self.log[i]
if i in self.accepted:
del self.accepted[i]
if i in self.slot_to_acceptors:
del self.slot_to_acceptors[i]
if i in self.proposals:
del self.proposals[i]
if i in self.p1b_replies:
del self.p1b_replies[i]
# print(f"{self.address} finished garbage collecting until {until}")
# print(f"{self.accepted}")
# print(f"{self.slot_to_acceptors}")
# print(f"{self.proposals}")
# print(f"{self.p1b_replies}")
# print(f"")
self.lastGarbageCollected = until
def slotToLeaderIndex(self, slot) -> int:
return slot % self.n_server
def incrementSlotIn(self):
# Increment it into the first number greater than slot_out that we own
# TODO: Can probably be optimized
for i in range(1, self.n_server + 1):
temp = self.slot_in + i
if self.is_leader[self.slotToLeaderIndex(temp)]:
self.slot_in = temp
return
# Serialize obj, and send the message.
# This function will not wait for reply (communication between paxos nodes)
def send_msg(self, obj, dest_address: Address):
# print(f"Sending {obj} to {dest_addr}", file=sys.stdout)
if self.simulate_wan:
if dest_address in self.latencies:
latency = self.latencies[dest_address]
#print(f"simulating latency {latency / 1000}")
time.sleep(latency / 1000.0)
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
data = pickle.dumps(obj)
self.socket.sendto(data, dest_address)
class PaxosHandler(socketserver.BaseRequestHandler):
"""
The request handler class for our server.
It is instantiated once per connection to the server, and must
override the handle() method to implement communication to the
client.
This will receive lock() / unlock() command from client
Handler for proposals and leader election will be a different class (I think, probably using different port?)
"""
def handle(self):
# Use self.arg to get servers fields
# Note that we guarantee communication client to server is exactly once,
# no need to worry about duplicate request and proposing two slot.
data = self.request[0].strip()
# data = self.request.recv(1024).strip() # DEBUG LOG
# print(f"{self.client_address} wrote: {data}", file=sys.stdout) # DEBUG LOG
# test deserialize
message = pickle.loads(data)
# print(message, "\n", file=sys.stdout) # debug
if isinstance(message, PaxosRequest):
print(f"got paxos request {message}", file=sys.stdout) # debug
self.server.handlePaxosRequest(message, self.client_address)
elif isinstance(message, P1A):
print(f"got p1a {message}", file=sys.stdout) # debug
self.server.handleP1A(message, self.client_address)
elif isinstance(message, P1B):
print(f"got p1b {message}", file=sys.stdout) # debug
self.server.handleP1B(message, self.client_address)
elif isinstance(message, P2A):
print(f"{self.server.server_address} got p2a {message}", file=sys.stdout) # debug
self.server.handleP2A(message, message.addr)
elif isinstance(message, P2B):
print(f"{self.server.server_address} got p2b {message}", file=sys.stdout) # debug
self.server.handleP2B(message, self.client_address)
elif isinstance(message, LeaderHeartbeat):
# print(f"got heartbeat {message}", file=sys.stdout) # debug
self.server.handleLeaderHeartbeat(message, self.client_address)
else:
# prob just ignore message
print("unrecognized message", file=sys.stdout) # debug
if __name__ == "__main__":
HOST, PORT = sys.argv[1], int(sys.argv[2])
# addresses = ((HOST, PORT),)
addresses = []
for i in range(3, len(sys.argv), 2):
addresses.append((sys.argv[i], int(sys.argv[i + 1])))
# Create the server, binding to localhost on port 9999
server = Paxos((HOST, PORT), addresses)
# Activate the server; this will keep running until you
# interrupt the program with Ctrl-C
server.serve_forever()