Skip to content
Snippets Groups Projects
Commit cb3e6a9f authored by Dixon Tirtayadi's avatar Dixon Tirtayadi
Browse files

Mencius is working when a majority of server is alive

parent 9b22d3f8
No related branches found
No related tags found
No related merge requests found
No preview for this file type
No preview for this file type
......@@ -34,7 +34,7 @@ class Paxos(socketserver.UDPServer):
self.slot_to_acceptors: DefaultDict[int, Set[Address]] = defaultdict(set) # Used by leader to decide majority for each slot after P2B, Multimap<Integer, Address>
# for leader election
self.voters: Set[Address] = set() # Yes votes for leader election, set of addresses
self.voters: List[Set[Address]] = [set()] * self.n_server # Yes votes for leader election, set of addresses
self.p1b_replies: Dict[int, BallotValuePair] = dict() # Accepted values for each slot by acceptors that voted yes, slot number -> proposal
# lock manager app
......@@ -47,6 +47,7 @@ class Paxos(socketserver.UDPServer):
self.is_leader[i] = True
# Change starting slot
self.slot_in = i
self.leader_recent_ping[i] = True
self.highest_ballot_seen.append(BallotNumber(1, self.servers[i]))
print("Finished init paxos", file=sys.stdout)
......@@ -58,7 +59,7 @@ class Paxos(socketserver.UDPServer):
socketserver.UDPServer.__init__(self, address, PaxosHandler)
setTimer(HeartBeatTimer(), HeartBeatTimer.HEARTBEAT_RETRY_MILLIS, self.onHeartBeatTimer)
setTimer(HeartBeatCheckTimer(self.highest_ballot_seen), HeartBeatTimer.HEARTBEAT_RETRY_MILLIS, self.onHeartBeatCheckTimer)
setTimer(HeartBeatCheckTimer(self.highest_ballot_seen), HeartBeatCheckTimer.HEARTBEAT_CHECK_RETRY_MILLIS, self.onHeartBeatCheckTimer)
def handlePaxosRequest(self, paxos_req, sender):
# print(f"{self.address} Got Paxos Request from {sender}", file=sys.stdout)
......@@ -100,26 +101,35 @@ class Paxos(socketserver.UDPServer):
self.executeLog("handlePaxosRequest")
def handleP1A(self, p1a, sender):
if self.highest_ballot_seen < p1a.ballot_num:
self.highest_ballot_seen = p1a.ballot_num
leader_slot = p1a.leader_slot
if self.highest_ballot_seen[leader_slot] < p1a.ballot_num:
self.highest_ballot_seen[leader_slot] = p1a.ballot_num
# Update the ballot for other highest ballot seen from the same address
for i in range(self.n_server):
if p1a.ballot_num.addr == self.highest_ballot_seen[i].addr:
if p1a.ballot_num.seq_num > self.highest_ballot_seen[i].seq_num:
self.highest_ballot_seen[i] = p1a.ballot_num
# If we are leader, make this a follower (since the one sending P1A thinks they are leader)
if self.is_leader:
print(f"{self.address} demotes itself from leader")
self.is_leader = False
if self.is_leader[leader_slot]:
print(f"{self.address} demotes itself from leader from slot {leader_slot}")
self.is_leader[leader_slot] = False
p1b = P1B(self.address, self.highest_ballot_seen, self.accepted)
p1b = P1B(self.address, self.highest_ballot_seen[leader_slot], self.accepted, leader_slot)
self.send_msg(p1b, sender)
setTimer(HeartBeatCheckTimer(p1a.ballot_num), HeartBeatCheckTimer.HEARTBEAT_CHECK_RETRY_MILLIS * 2, self.onHeartBeatCheckTimer)
self.leader_recent_ping = True
setTimer(HeartBeatCheckTimer(self.highest_ballot_seen), HeartBeatCheckTimer.HEARTBEAT_CHECK_RETRY_MILLIS * 2, self.onHeartBeatCheckTimer)
self.leader_recent_ping[leader_slot] = True
def handleP1B(self, p1b, sender):
if self.is_leader:
leader_slot = p1b.leader_slot
if self.is_leader[leader_slot]:
return
# If receive majority of response from acceptors with its ballot, becomes leader
if self.ballot == p1b.accepted_ballot and self.ballot == self.highest_ballot_seen:
self.voters.update(sender)
if self.ballot == p1b.accepted_ballot and self.ballot == self.highest_ballot_seen[leader_slot]:
self.voters[leader_slot].update(sender)
for slot in p1b.accepted:
new_bvp = p1b.accepted[slot]
if slot not in self.p1b_replies:
......@@ -128,11 +138,11 @@ class Paxos(socketserver.UDPServer):
cur_bvp = self.p1b_replies[slot]
if new_bvp.ballot_num >= cur_bvp.ballot_num:
self.p1b_replies[slot] = new_bvp
if len(self.voters) > (len(self.servers) / 2 - 1):
if len([self.voters[leader_slot]]) > (len(self.servers) / 2 - 1):
# This server is elected as leader
if not self.is_leader:
print(f"{self.address} becomes leader")
self.is_leader = True
if not self.is_leader[leader_slot]:
print(f"{self.address} becomes leader for slot {leader_slot}")
self.is_leader[leader_slot] = True
# Must update its state with accepted values from acceptors
for slot in self.p1b_replies:
value = self.p1b_replies[slot]
......@@ -141,13 +151,14 @@ class Paxos(socketserver.UDPServer):
# self.slot_in = max(self.slot_in, slot + 1)
# Changed: Instead of doing max of slot_in, do while loop:
while self.slot_in <= slot:
# Propose no-op(SKIP message) and send it straight away for the slot we skip
skip = P2A(self.address, BallotNumber(self.ballot.seq_num, self.address), self.slot_in, None)
for acceptor in self.servers:
if acceptor != self.address:
self.send_msg(skip, acceptor)
bvp = BallotValuePair(BallotNumber(self.ballot.seq_num, self.address), None)
self.log[self.slot_in] = bvp
# This chunk of code is not needed, we just need to increment slot_in, executeLog will be the one taking care of proposing SKIP
# # Propose no-op(SKIP message) and send it straight away for the slot we skip
# skip = P2A(self.address, BallotNumber(self.ballot.seq_num, self.address), self.slot_in, None)
# for acceptor in self.servers:
# if acceptor != self.address:
# self.send_msg(skip, acceptor)
# bvp = BallotValuePair(BallotNumber(self.ballot.seq_num, self.address), None)
# self.log[self.slot_in] = bvp
self.incrementSlotIn()
bvp = self.proposals[slot]
......@@ -181,6 +192,9 @@ class Paxos(socketserver.UDPServer):
# Learn it immediately if value is None and proposed by the leader
bvp = BallotValuePair(BallotNumber(p2a.ballot_num.seq_num, p2a.addr), None)
self.log[p2a.slot_num] = bvp
self.slot_to_acceptors[p2a.slot_num].add(self.address)
self.proposals[p2a.slot_num] = bvp
self.accepted[p2a.slot_num] = bvp
print(f"Learned skip for {p2a}")
return
......@@ -215,7 +229,8 @@ class Paxos(socketserver.UDPServer):
# print("p2a - 3\n")
def handleP2B(self, p2b, sender):
if not self.is_leader:
slot = self.slotToLeaderIndex(p2b.slot_num)
if not self.is_leader[slot]:
# Not leader, drop message
return
......@@ -235,8 +250,10 @@ class Paxos(socketserver.UDPServer):
def handleLeaderHeartbeat(self, heartbeat, address):
for i in range(len(heartbeat.leader_slot)):
if not self.is_leader[i] and heartbeat.leader_slot[i]:
# print(f"{self.address} got heartbeat from {heartbeat.ballot_num} at index {i}")
if self.highest_ballot_seen[i] > heartbeat.ballot_num:
# It's not "leader" heartbeat
# print(f"{self.address} drops heartbeat from {heartbeat.ballot_num} at index {i} because of highest is {self.highest_ballot_seen[i]}")
return
newLeaderSeen = False
if self.highest_ballot_seen[i] < heartbeat.ballot_num:
......@@ -252,8 +269,8 @@ class Paxos(socketserver.UDPServer):
new_bvp = heartbeat.log[slot]
if new_bvp > bvp:
self.log[slot] = new_bvp
self.executeLog("Handle Leader Heartbeat")
# TODO: Uncomment below
# self.executeLog("Handle Leader Heartbeat")
self.leader_recent_ping[i] = True
if newLeaderSeen:
# Exponential backoff
......@@ -267,7 +284,8 @@ class Paxos(socketserver.UDPServer):
def onP2ATimer(self, p2a_timer: P2ATimer):
# print(f"{p2a_timer}: Callback", file=sys.stdout)
# If not leader then stop timer
if self.is_leader and not self.status(p2a_timer.p2a.slot_num) == PaxosLogSlotStatus.CHOSEN:
leader_slot = self.slotToLeaderIndex(p2a_timer.p2a.slot_num)
if self.is_leader[leader_slot] and not self.status(p2a_timer.p2a.slot_num) == PaxosLogSlotStatus.CHOSEN:
for acceptor_addr in self.servers:
if acceptor_addr != self.address:
self.send_msg(p2a_timer.p2a, acceptor_addr)
......@@ -287,8 +305,15 @@ class Paxos(socketserver.UDPServer):
# Change: Since everyone is leader, heartbeat is now being sent from all leader, we
# need to keep track which heartbeat is last heard, and if one leader is dead, we take
# over that leader slot.
# Drop if this is not the latest heartbeat check
if heartbeat_check_timer.ballot_num != self.highest_ballot_seen:
return
# print(f"{heartbeat_check_timer}: Callback", file=sys.stdout)
print(f"{heartbeat_check_timer}: Callback", file=sys.stdout)
print(f"{self.address}: Recent ping {self.leader_recent_ping}", file=sys.stdout)
print(f"{self.address}: Leader {self.is_leader}", file=sys.stdout)
print(f"{self.address}: Ballot {[str(i) for i in self.highest_ballot_seen]}", file=sys.stdout)
print("\n")
for i in range(len(heartbeat_check_timer.ballot_num)):
if not self.is_leader[i]:
if heartbeat_check_timer.ballot_num[i] == self.highest_ballot_seen[i]:
......@@ -298,34 +323,38 @@ class Paxos(socketserver.UDPServer):
# Just for randomization for contention issue
# TODO: Uncomment below for the electing leader, will need to add index
# to know which slot are we trying to become the leader of.
# self.__electLeader()
self.__electLeader(i)
return
self.leader_recent_ping[i] = False
setTimer(heartbeat_check_timer, HeartBeatCheckTimer.HEARTBEAT_CHECK_RETRY_MILLIS, self.onHeartBeatCheckTimer)
setTimer(heartbeat_check_timer, HeartBeatCheckTimer.HEARTBEAT_CHECK_RETRY_MILLIS, self.onHeartBeatCheckTimer)
def onLeaderElectionTimer(self, leader_election_timer: LeaderElectionTimer):
# print(f"{leader_election_timer}: Callback", file=sys.stdout)
if self.highest_ballot_seen == self.ballot and not self.is_leader:
leader_slot = leader_election_timer.p1a.leader_slot
if self.highest_ballot_seen[leader_slot] == self.ballot and not self.is_leader[leader_slot]:
for acceptor_addr in self.servers:
if acceptor_addr != self.address:
self.send_msg(leader_election_timer.p1a, acceptor_addr)
setTimer(leader_election_timer, LeaderElectionTimer.LEADER_ELECTION_TIMER, self.onLeaderElectionTimer)
def __electLeader(self):
print(f"{self.address} Detected leader is dead, try to get ourself to become leader")
def __electLeader(self, leader_slot):
print(f"{self.address} Detected leader {leader_slot} is dead, try to get ourself to become leader")
# Try to elect ourself as the leader
# Try to get elected as leader at the beginning of time
self.voters.clear()
self.voters[leader_slot].clear()
self.p1b_replies.clear()
self.p1b_replies.update(self.accepted)
# Increase ballot until higher than the highest we saw before electing
while self.ballot < self.highest_ballot_seen:
while self.ballot < self.highest_ballot_seen[leader_slot]:
self.ballot.increaseBallot()
p1a: P1A = P1A(self.address, BallotNumber(self.ballot.seq_num, self.address))
self.highest_ballot_seen = p1a.ballot_num
for i in range(self.n_server):
if self.is_leader[i]:
self.highest_ballot_seen[i] = self.ballot
p1a: P1A = P1A(self.address, BallotNumber(self.ballot.seq_num, self.address), leader_slot)
self.highest_ballot_seen[leader_slot] = p1a.ballot_num
# P1A
for acceptor_addr in self.servers:
......@@ -366,9 +395,12 @@ class Paxos(socketserver.UDPServer):
for tmp_server in self.servers:
if tmp_server != self.address:
self.send_msg(p2a, tmp_server)
bvp = BallotValuePair(BallotNumber(self.ballot.seq_num, self.address), None)
self.log[self.slot_in] = bvp
self.slot_to_acceptors[j].add(self.address)
self.proposals[j] = BallotValuePair(BallotNumber(self.ballot.seq_num, self.address), p2a.value)
self.accepted[j] = BallotValuePair(BallotNumber(self.ballot.seq_num, self.address), p2a.value)
self.proposals[j] = bvp
self.accepted[j] = bvp
def slotToLeaderIndex(self, slot) -> int:
return slot % self.n_server
......
......@@ -12,7 +12,7 @@ class Message:
class LeaderHeartbeat(Message):
def __init__(self, addr: Address, log: Dict[int, BallotValuePair], ballot_num: BallotNumber, leader_slot: int) -> None:
def __init__(self, addr: Address, log: Dict[int, BallotValuePair], ballot_num: BallotNumber, leader_slot: List[bool]) -> None:
super().__init__(addr)
self.log: Dict[int, BallotValuePair] = log
self.ballot_num: BallotNumber = ballot_num
......
......@@ -43,7 +43,7 @@ class HeartBeatTimer(Timer):
class HeartBeatCheckTimer(Timer):
HEARTBEAT_CHECK_RETRY_MILLIS = 10000000000
HEARTBEAT_CHECK_RETRY_MILLIS = 5000
def __init__(self, ballot_num: List[BallotNumber]) -> None:
super().__init__()
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment