Skip to content
Snippets Groups Projects
Commit d147632c authored by WinJ's avatar WinJ
Browse files

timer handlers, cleaning up type declaration

parent 2336d9d6
No related branches found
No related tags found
No related merge requests found
from typing import Dict
from assignment2.paxos import Address, BallotValuePair, BallotNumber
class Message:
def __init__(self, addr) -> None:
self.addr = addr # address of sender
def __init__(self, addr: Address) -> None:
self.addr: Address = addr # address of sender
def __str__(self) -> str:
return "addr: " + str(self.addr)
return f"addr: {str(self.addr)}"
class LeaderHeartbeat(Message):
def __init__(self, addr: Address, log: Dict[int, BallotValuePair], ballot_num: BallotNumber) -> None:
super().__init__(addr)
self.log: Dict[int, BallotValuePair] = log
self.ballot_num: BallotNumber = ballot_num
def __str__(self) -> str:
return f"LeaderHeartbeat({super().__str__()}, {str(self.ballot_num)}, log: {self.log})"
class HeartbeatResponse(Message):
def __init__(self, addr: Address, last_executed_slot: int, highest_ballot_seen: BallotNumber) -> None:
super().__init__(addr)
self.last_executed_slot: int = last_executed_slot
self.highest_ballot_seen: BallotNumber = highest_ballot_seen
def __str__(self) -> str:
return f"HeartbeatResponse({super().__str__()}, last_executed_slot: {str(self.last_executed_slot)}, highest_ballot_seen: {self.highest_ballot_seen})"
class P1A(Message):
def __init__(self, addr, ballot_num) -> None:
def __init__(self, addr: Address, ballot_num: BallotNumber) -> None:
super().__init__(addr)
# ballot_num is a BallotNumber
self.ballot_num = ballot_num
self.ballot_num: BallotNumber = ballot_num
def __str__(self) -> str:
return "P1A(" + super().__str__() + ", " + str(self.ballot_num) + ")\n"
return f"P1A({super().__str__()}, {str(self.ballot_num)})\n"
class P1B(Message):
def __init__(self, addr, accepted_ballot, accepted) -> None:
def __init__(self, addr: Address, accepted_ballot: BallotNumber, accepted: Dict[int, BallotValuePair]) -> None:
super().__init__(addr)
# accepted_ballot is a BallotNumber
# accepted is a dict from int to BallotValuePair
self.accepted_ballot = accepted_ballot
self.accepted = accepted
self.accepted_ballot: BallotNumber = accepted_ballot
self.accepted: Dict[int, BallotValuePair] = accepted
def __str__(self) -> str:
return "P1B(" + super().__str__() + ", " + str(self.accepted_ballot) + ", " + str(self.accepted) + ")\n"
return f"P1B({super().__str__()}, {str(self.accepted_ballot)}, {str(self.accepted)})\n"
class P2A(Message):
def __init__(self, addr, ballot, slot_num, value) -> None:
def __init__(self, addr: Address, ballot_num: BallotNumber, slot_num: int, value) -> None:
super().__init__(addr)
# ballot is BallotNumber
# slot_num is an int
# value is a PaxosRequest
self.ballot = ballot
self.slot_num = slot_num
self.value = value
self.ballot_num: BallotNumber = ballot_num
self.slot_num: int = slot_num
self.value: PaxosRequest = value
def __str__(self) -> str:
return "P2A(" + super().__str__() + ", " + str(self.ballot) + ", slot_num: " + str(self.slot_num) + ", value: " + str(self.value) + ")\n"
return f"P2A({super().__str__()}, {str(self.ballot_num)}, slot_num: {str(self.slot_num)}, value: {str(self.value)})\n"
class P2B(Message):
def __init__(self, addr, ballot, slot_num) -> None:
def __init__(self, addr: Address, ballot: BallotNumber, slot_num: int) -> None:
super().__init__(addr)
# ballot is BallotNumber
# slot_num is an int
self.ballot = ballot
self.slot_num = slot_num
self.ballot_num: BallotNumber = ballot
self.slot_num: int = slot_num
def __str__(self) -> str:
return "P2B(" + super().__str__() + ", " + str(self.ballot) + ", slot_num: " + str(self.slot_num) + ")\n"
return f"P2B({super().__str__()}, {str(self.ballot_num)}, slot_num: {str(self.slot_num)})\n"
# TODO: need to add heartbeats
## Client messages=, separate into a new class
class PaxosRequest(Message):
def __init__(self, addr, lock_command) -> None:
def __init__(self, addr: Address, lock_command) -> None:
super().__init__(addr)
# lock_command is of type LockCommand
self.lock_command = lock_command
self.lock_command: LockCommand = lock_command
def __str__(self) -> str:
return "PaxosRequest(" + super().__str__() + ", " + str(self.lock_command) + ")\n"
return f"PaxosRequest({super().__str__()}, {str(self.lock_command)})\n"
class PaxosResult(Message):
def __init__(self, addr, value) -> None:
def __init__(self, addr: Address, value: bool) -> None:
super().__init__(addr)
self.value = value
self.value: bool = value
def __str__(self) -> str:
return "PaxosResult(" + super().__str__() + ", value: " + str(self.value) + ")\n"
return f"PaxosResult({super().__str__()}, value: {str(self.value)})\n"
class LockCommand:
def __init__(self, op, value) -> None:
......@@ -79,4 +107,4 @@ class LockCommand:
self.value = value
def __str__(self) -> str:
return "LockCommand(op: " + str(self.op) + ", value: " + str(self.value) + ")"
\ No newline at end of file
return f"LockCommand(op: {str(self.op)}, value: {str(self.value)})"
\ No newline at end of file
# http://pymotw.com/2/SocketServer/
import sys
from functools import total_ordering
from typing import Dict, Set, Tuple, DefaultDict
from typing import Set, Tuple, DefaultDict, List
from assignment2.timers import P2ATimer, setTimer, HeartBeatCheckTimer, HeartBeatTimer, LeaderElectionTimer
from message import *
import socketserver
import pickle
import socket
from collections import defaultdict
Address = Tuple[str, int]
# Paxos servers
class Paxos(socketserver.TCPServer):
def __init__(self, address, servers) -> None:
# address is a tuple of (ip, port)
self.address = address # Our address (ip, port)
self.servers = servers # All servers addresses
self.address: Address = address # Our address tuple (ip, port)
self.servers: List[Address] = servers # All servers addresses TODO: is this a tuple or list of server addresses?
# Sets up other variables
self.is_leader: bool = False # Are we the leader?
self.ballot: BallotNumber = BallotNumber(1, self.address) # Our ballot number
self.highestBallotSeen: BallotNumber = BallotNumber(0, self.address) # Highest ballot number seen
self.proposals: Dict[int, BallotValuePair] = dict() # Used by leader to send P2A to acceptors, slot number -> proposal
self.accepted: Dict[int, BallotValuePair] = dict() # Used by acceptors in P2A and also P1B, slot number -> proposal
self.log: Dict[int, BallotValuePair] = dict() # The log
self.leader_recent_ping: bool = False # True if we think leader is alive
self.is_leader: bool = False # Are we the leader?
self.ballot: BallotNumber = BallotNumber(1, self.address) # Our ballot number
self.highest_ballot_seen: BallotNumber = BallotNumber(0, self.address) # Highest ballot number seen
self.proposals: Dict[int, BallotValuePair] = dict() # Used by leader to send P2A to acceptors, slot number -> proposal
self.accepted: Dict[int, BallotValuePair] = dict() # Used by acceptors in P2A and also P1B, slot number -> proposal
self.log: Dict[int, BallotValuePair] = dict() # The log
self.leader_recent_ping: bool = False # True if we think leader is alive
# for proposal phase
self.slot_in: int = 1 # First non-executed proposal slot (+1 last executed slot)
self.slot_out: int = 1 # First empty slot (+1 last proposed slot)
self.slot_to_acceptors: DefaultDict[int, Set[Tuple[str, int]]] = defaultdict(set) # Used by leader to decide majority for each slot after P2B, Multimap<Integer, Address>
self.slot_in: int = 1 # First non-executed proposal slot (+1 last executed slot)
self.slot_out: int = 1 # First empty slot (+1 last proposed slot)
self.slot_to_acceptors: DefaultDict[int, Set[Address]] = defaultdict(set) # Used by leader to decide majority for each slot after P2B, Multimap<Integer, Address>
# for leader election
self.voters: Set[Tuple[str, int]] = set() # Yes votes for leader election
self.p1b_replies: Dict[int, BallotValuePair] = dict() # Accepted values for each slot by acceptors that voted yes, slot number -> proposal
self.voters: Set[Address] = set() # Yes votes for leader election, set of addresses
self.p1b_replies: Dict[int, BallotValuePair] = dict() # Accepted values for each slot by acceptors that voted yes, slot number -> proposal
# ...
# Default leader during setup
......@@ -47,6 +51,13 @@ class Paxos(socketserver.TCPServer):
class PaxosHandler(socketserver.BaseRequestHandler):
"""
Init Function to initialize Heartbeat Timers
"""
def init(self):
setTimer(HeartBeatTimer(), HeartBeatTimer.HEARTBEAT_RETRY_MILLIS)
setTimer(HeartBeatCheckTimer(self.server.highest_ballot_seen), HeartBeatTimer.HEARTBEAT_RETRY_MILLIS)
"""
The request handler class for our server.
......@@ -134,7 +145,9 @@ class PaxosHandler(socketserver.BaseRequestHandler):
# TODO: translate java code
#requestInProcess.add(p2a.value()); No need, this is for AMO
#set(new P2ATimer(p2a), P2ATimer.P2A_RETRY_MILLIS);
# TODO: TIMERS: set(new P2ATimer(p2a), P2ATimer.P2A_RETRY_MILLIS);
setTimer(P2ATimer(p2a), P2ATimer.P2A_RETRY_MILLIS)
if len(self.server.slot_to_acceptors[p2a.slot_num]) > (len(self.server.servers)/2):
#Majority accepted, can put into log
self.server.log[p2a.slot_num] = bvp
......@@ -155,7 +168,9 @@ class PaxosHandler(socketserver.BaseRequestHandler):
p1b = P1B(self.server.address, self.server.highest_ballot_seen, self.server.accepted)
send_msg(p1b, sender)
#TODO: Set timer for heartbeatcheck
# TODO: TIMERS: set(new HeartBeatCheckTimer(p1a.ballotNumber()), HeartBeatCheckTimer.HEARTBEAT_CHECK_RETRY_MILLIS * 2);
setTimer(HeartBeatCheckTimer(p1a.ballot_num), HeartBeatCheckTimer.HEARTBEAT_CHECK_RETRY_MILLIS * 2)
self.leader_recent_ping = True
def handleP1B(self, p1b, sender):
......@@ -177,6 +192,8 @@ class PaxosHandler(socketserver.BaseRequestHandler):
self.server.is_leader = True
# Must update its state with accepted values from acceptors
for slot in self.server.p1b_replies:
# TODO: TIMERS: set(new P2ATimer(p2a), P2ATimer.P2A_RETRY_MILLIS);
setTimer(P2ATimer(p2a), P2ATimer.P2A_RETRY_MILLIS)
pass
# TODO Translate java code
# if (status(slot) != PaxosLogSlotStatus.CHOSEN) {
......@@ -199,7 +216,7 @@ class PaxosHandler(socketserver.BaseRequestHandler):
# }
def handleP2A(self, p2a, sender):
if self.server.highest_ballot_seen != p2a.ballot:
if self.server.highest_ballot_seen != p2a.ballot_num:
# It's not the leader, drop it
return
# TODO: Not sure if needed but translate Java code
......@@ -208,21 +225,21 @@ class PaxosHandler(socketserver.BaseRequestHandler):
# }
if p2a.slot_num in self.server.accepted:
bvp = self.server.accepted[p2a.slot_num]
if bvp.ballot <= p2a.ballot:
if bvp.ballot_num <= p2a.ballot_num:
# p2a ballot is higher or equal
bvp = BallotValuePair(p2a.ballot, p2a.value)
bvp = BallotValuePair(p2a.ballot_num, p2a.value)
self.server.accepted[p2a.slot_num, bvp]
else:
# Don't do anything
return
else:
# Have not accepted anything, then accept it
bvp = BallotValuePair(p2a.ballot, p2a.value)
bvp = BallotValuePair(p2a.ballot_num, p2a.value)
self.server.accepted[p2a.slot_num, bvp]
self.server.slot_in = max(self.server.slot_in, p2a.slot_num + 1)
p2b = P2B(p2a.ballot, p2a.slot_num)
p2b = P2B(p2a.ballot_num, p2a.slot_num)
send_msg(p2b, sender)
def handleP2B(self, p2b, sender):
......@@ -237,7 +254,7 @@ class PaxosHandler(socketserver.BaseRequestHandler):
# check if it is still consistent with our proposal
bvp = self.server.proposals(p2b.slot_num)
if bvp.ballot_num != p2b.ballot:
if bvp.ballot_num != p2b.ballot_num:
# No longer in proposal
return
......@@ -249,6 +266,144 @@ class PaxosHandler(socketserver.BaseRequestHandler):
# TODO: Create executeLog() and uncomment below
# executeLog()
"""
Timer Handlers
Argument 1 needs to be a Timer
"""
def onP2ATimer(self, p2a_timer: P2ATimer):
print(f"{p2a_timer}: Callback", file=sys.stderr)
# TODO:
# // If not leader then stop doing it
# if (isLeader && status(t.p2a().slotNum()) != PaxosLogSlotStatus.CHOSEN
# && status(t.p2a().slotNum()) != PaxosLogSlotStatus.CLEARED) {
# for (Address acceptor : servers) {
# if (!acceptor.equals(address())) {
# send(t.p2a(), acceptor);
# }
# }
# set(t, P2ATimer.P2A_RETRY_MILLIS);
# }
# If not leader then stop timer
if self.server.is_leader: # TODO: log status
for acceptor_addr in self.server.servers:
if acceptor_addr != self.server.address:
send_msg(p2a_timer.p2a, acceptor_addr)
setTimer(p2a_timer, P2ATimer.P2A_RETRY_MILLIS, self.onP2ATimer)
def onHeartBeatTimer(self, hearbeat_timer: HeartBeatTimer):
print(f"{hearbeat_timer}: Callback", file=sys.stderr)
# TODO:
# executeLog();
# if (isLeader) {
# for (Address acceptor : servers) {
# if (!acceptor.equals(address())) {
# LeaderHeartbeat lh = new LeaderHeartbeat(log, garbageCollectUntil(), ballot);
# send(lh, acceptor);
# }
# }
# int lastEx = replicasLastExecuted.getOrDefault(address(), 0);
# replicasLastExecuted.put(address(), Math.max(slotOut - 1, lastEx));
# garbageCollect(garbageCollectUntil());
# }
# set(t, HeartBeatTimer.HEARTBEAT_RETRY_MILLIS);
if self.server.is_leader:
for acceptor_addr in self.server.servers:
if acceptor_addr != self.server.address:
lh = LeaderHeartbeat(self.server.log, self.server.ballot)
send_msg(lh, acceptor_addr)
setTimer(hearbeat_timer, HeartBeatTimer.HEARTBEAT_RETRY_MILLIS, self.onHeartBeatTimer)
def onHeartBeatCheckTimer(self, hearbeat_check_timer: HeartBeatCheckTimer):
print(f"{hearbeat_check_timer}: Callback", file=sys.stderr)
# TODO:
# if (isLeader) {
# return;
# }
# if (t.ballotNumber().equals(highestBallotSeen)) {
# // Check if the leader alive or not
# if (!leaderRecentPing) {
# // Leader is dead
# // Just for randomization for contention issue
# electLeader();
# return;
# }
# leaderRecentPing = false;
# set(t, HeartBeatCheckTimer.HEARTBEAT_CHECK_RETRY_MILLIS);
# }
if not self.server.is_leader:
if hearbeat_check_timer.ballot_num == self.server.highest_ballot_seen:
# Check if the leader alive or not
if not self.server.leader_recent_ping:
# Leader is dead
# Just for randomization for contention issue
self.__electLeader()
return
self.server.leader_recent_ping = False
setTimer(hearbeat_check_timer, HeartBeatCheckTimer.HEARTBEAT_CHECK_RETRY_MILLIS)
def onLeaderElectionTimer(self, leader_election_timer: LeaderElectionTimer):
print(f"{leader_election_timer}: Callback", file=sys.stderr)
# TODO:
# if (highestBallotSeen.equals(ballot) && !isLeader) {
# for (Address acceptor : servers) {
# if (!acceptor.equals(address())) {
# send(t.p1a(), acceptor);
# }
# }
# set(t, LeaderElectionTimer.LEADER_ELECTION_TIMER);
# }
if self.server.highest_ballot_num == self.server.ballot_num and not self.server.is_leader:
for acceptor_addr in self.server.servers:
if acceptor_addr != self.server.address:
send_msg(leader_election_timer.p1a, acceptor_addr)
def __electLeader(self):
# private void electLeader() {
# // Try to elect ourself as the leader
# // Try to get elected as leader at the beginning of time
# voters.clear();
# p1bReplies.clear();
# p1bReplies.putAll(accepted);
#
# // Increase ballot until higher than the highest we saw before electing
# while (ballot.compareTo(highestBallotSeen) < 0) {
# ballot.increaseBallot();
# }
#
# P1A p1a = new P1A(new BallotNumber(ballot.sequenceNum(), address()));
# highestBallotSeen = p1a.ballotNumber();
#
# // P1A
# for (Address acceptor : servers) {
# if (!acceptor.equals(address())) {
# send(p1a, acceptor);
# }
# }
#
# set(new LeaderElectionTimer(p1a), LeaderElectionTimer.LEADER_ELECTION_TIMER);
# }
# Try to elect ourself as the leader
# Try to get elected as leader at the beginning of time
self.server.voters.clear()
self.server.p1b_replies.clear()
self.server.p1b_replies.add(self.server.accepted)
# Increase ballot until higher than the highest we saw before electing
while self.server.ballot < self.server.highest_ballot_seen:
self.server.ballot.increaseBallot()
p1a: P1A = P1A(self.server.address, BallotNumber(self.server.ballot.seq_num, self.server.address))
self.server.highest_ballot_seen = p1a.ballot_num
# P1A
for acceptor_addr in self.server.servers:
if acceptor_addr != self.server.address:
send_msg(p1a, acceptor_addr)
setTimer(LeaderElectionTimer(p1a), LeaderElectionTimer.LEADER_ELECTION_TIMER)
@total_ordering
class BallotNumber:
......@@ -310,7 +465,7 @@ class BallotValuePair:
# Serialize obj, and send the message.
# This function will not wait for reply (communication between paxos nodes)
def send_msg(obj, dest_addr):
def send_msg(obj, dest_addr: Address):
HOST, PORT = dest_addr
data = pickle.dumps(obj)
......
import sys
from typing import Callable
from assignment2.message import P1A, P2A
from assignment2.paxos import BallotNumber
import threading
"""
Timers for retrying certain operations (e.g. resending messages)
"""
class Timer:
def __init__(self) -> None:
pass
def __str__(self) -> str:
return f"timer: {self.__class__.__name__}"
return f"timer: {self.__class__.__name__}\n"
class P2ATimer(Timer):
......@@ -28,9 +34,9 @@ class HeartBeatTimer(Timer):
class HeartBeatCheckTimer(Timer):
HEARTBEAT_CHECK_RETRY_MILLIS = 100
def __init__(self, ballot: BallotNumber) -> None:
def __init__(self, ballot_num: BallotNumber) -> None:
super().__init__()
self.ballot: BallotNumber = ballot
self.ballot_num: BallotNumber = ballot_num
class LeaderElectionTimer(Timer):
......@@ -40,3 +46,12 @@ class LeaderElectionTimer(Timer):
super().__init__()
self.p1a: P1A = p1a
"""
Setting a timer with callback
"""
def setTimer(timer: Timer, timerLengthMillis: int, callback: Callable):
# threading.Timer takes in time (in s), a callback function, and arguments
# (timer, ) is a list of positional arguments to the callback function
S = threading.Timer(timerLengthMillis / 1000.0, callback, (timer,))
S.start()
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment