Skip to content
Snippets Groups Projects
Commit e9771255 authored by WinJ's avatar WinJ
Browse files

small fixes

parent 2303e7e1
No related branches found
No related tags found
No related merge requests found
// Your code here...
import dslabs.framework.Message;
import dslabs.paxos.PaxosServer.BallotNumber;
import dslabs.paxos.PaxosServer.BallotValuePair;
import java.util.Map;
import lombok.Data;
@Data
class LeaderHeartbeat implements Message {
private final Map<Integer, BallotValuePair> log;
private final int garbageCollectUntil;
private final BallotNumber ballotNumber;
}
@Data
class HeartbeatResponse implements Message {
private final int lastExecutedSlot;
private final BallotNumber highestBallotSeen;
}
@Data
class P1A implements Message {
private final BallotNumber ballotNumber;
}
@Data
class P1B implements Message {
private final BallotNumber acceptedBallot;
private final Map<Integer, BallotValuePair> accepted;
}
@Data
class P2A implements Message {
// Slot proposal
private final BallotNumber ballot;
private final int slotNum;
private final PaxosRequest value;
}
@Data
class P2B implements Message {
// Slot accepted
private final BallotNumber ballot;
private final int slotNum;
}
import dslabs.atmostonce.AMOCommand;
import dslabs.atmostonce.AMOResult;
import dslabs.framework.Address;
import dslabs.framework.Client;
import dslabs.framework.Command;
import dslabs.framework.Node;
import dslabs.framework.Result;
import java.util.Objects;
import lombok.EqualsAndHashCode;
import lombok.ToString;
import static dslabs.paxos.ClientTimer.CLIENT_RETRY_MILLIS;
@ToString(callSuper = true)
@EqualsAndHashCode(callSuper = true)
public final class PaxosClient extends Node implements Client {
private final Address[] servers;
// Your code here...
// Client sends request to all servers and just wait for response
private AMOCommand amoCommand;
private AMOResult amoResult;
private int sequenceNum = -1;
/* -------------------------------------------------------------------------
Construction and Initialization
-----------------------------------------------------------------------*/
public PaxosClient(Address address, Address[] servers) {
super(address);
this.servers = servers;
}
@Override
public synchronized void init() {
// No need to initialize
}
/* -------------------------------------------------------------------------
Public methods
-----------------------------------------------------------------------*/
@Override
public synchronized void sendCommand(Command operation) {
// Your code here...
amoCommand = new AMOCommand(operation, ++sequenceNum, this.address());
amoResult = null;
for (Address address : servers) {
send(new PaxosRequest(amoCommand), address);
}
set(new ClientTimer(amoCommand), CLIENT_RETRY_MILLIS);
}
@Override
public synchronized boolean hasResult() {
// Your code here...
return amoResult != null;
}
@Override
public synchronized Result getResult() throws InterruptedException {
// Your code here...
while (amoResult == null) {
wait();
}
return amoResult.result();
}
/* -------------------------------------------------------------------------
Message Handlers
-----------------------------------------------------------------------*/
private synchronized void handlePaxosReply(PaxosReply m, Address sender) {
// Your code here...
if (sequenceNum == m.amoResult().sequenceNum()) {
amoResult = m.amoResult();
notify();
}
}
/* -------------------------------------------------------------------------
Timer Handlers
-----------------------------------------------------------------------*/
private synchronized void onClientTimer(ClientTimer t) {
// Your code here...
if (Objects.equals(amoCommand, t.amoCommand()) && amoResult == null) {
for (Address address : servers) {
send(new PaxosRequest(t.amoCommand()), address);
}
set(t, CLIENT_RETRY_MILLIS);
}
}
}
public enum PaxosLogSlotStatus {
EMPTY, // no command is known by the server for this slot
ACCEPTED, // a command has been tentatively accepted by this server
CHOSEN, // the server knows a command to be permanently chosen for this slot
CLEARED // the command in this slot has been garbage-collected at the server
}
\ No newline at end of file
import dslabs.atmostonce.AMOResult;
import dslabs.framework.Message;
import lombok.Data;
@Data
public final class PaxosReply implements Message {
// Your code here...
private final AMOResult amoResult;
}
import dslabs.atmostonce.AMOCommand;
import dslabs.framework.Message;
import lombok.Data;
@Data
public final class PaxosRequest implements Message {
// Your code here...
private final AMOCommand value;
}
This diff is collapsed.
import dslabs.atmostonce.AMOCommand;
import dslabs.framework.Timer;
import dslabs.paxos.PaxosServer.BallotNumber;
import lombok.Data;
@Data
final class ClientTimer implements Timer {
static final int CLIENT_RETRY_MILLIS = 100;
// Your code here...
private final AMOCommand amoCommand;
}
// Your code here...
@Data
final class P2ATimer implements Timer {
static final int P2A_RETRY_MILLIS = 100;
private final P2A p2a;
}
@Data
final class HeartBeatTimer implements Timer {
static final int HEARTBEAT_RETRY_MILLIS = 20;
}
@Data
final class HeartBeatCheckTimer implements Timer {
static final int HEARTBEAT_CHECK_RETRY_MILLIS = 100;
private final BallotNumber ballotNumber;
}
@Data
final class LeaderElectionTimer implements Timer {
static final int LEADER_ELECTION_TIMER = 50;
private final P1A p1a;
}
......@@ -8,6 +8,7 @@ import sys
from message import * # Can change this after testing done
from timers import ClientTimer, setTimer
class LockClient():
def __init__(self) -> None:
with open("config.json", encoding='utf-8') as f:
......@@ -71,4 +72,3 @@ class LockClient():
if __name__ == "__main__":
client = LockClient()
client.execute()
......@@ -4,6 +4,7 @@ from message import LockCommand
from paxos_utils import Address
from typing import Dict
class LockManager:
def __init__(self) -> None:
......@@ -21,7 +22,7 @@ class LockManager:
# client already acquired this lock before
if self.locks[value] == client:
return True
# lock is not available
return False
......@@ -31,7 +32,7 @@ class LockManager:
# lock does not exist
if value not in self.locks:
return False
# client is not the owner of the locok
if self.locks[value] != client:
return False
......
import json
import subprocess
import sys
import time
# Can spawn multiple paxos server
if __name__ == "__main__":
......
......@@ -19,6 +19,7 @@ class LeaderHeartbeat(Message):
def __str__(self) -> str:
return f"LeaderHeartbeat({super().__str__()}, {str(self.ballot_num)}, log: {self.log})"
class P1A(Message):
def __init__(self, addr: Address, ballot_num: BallotNumber) -> None:
super().__init__(addr)
......
......@@ -124,27 +124,27 @@ class Paxos(socketserver.UDPServer):
if new_bvp.ballot_num >= cur_bvp.ballot_num:
self.p1b_replies[slot] = new_bvp
if len(self.voters) > (len(self.servers) / 2 - 1):
# This server is elected as leader
if not self.is_leader:
print(f"{self.address} becomes leader")
self.is_leader = True
# Must update its state with accepted values from acceptors
for slot in self.p1b_replies:
value = self.p1b_replies[slot]
if self.status(slot) != PaxosLogSlotStatus.CHOSEN:
self.proposals[slot] = BallotValuePair(BallotNumber(self.ballot.seq_num, self.address), value.value)
self.slot_in = max(self.slot_in, slot + 1)
bvp = self.proposals[slot]
p2a = P2A(self.address, bvp.ballot_num, slot, bvp.value)
for acceptor in self.servers:
if acceptor != self.address:
self.send_msg(p2a, acceptor)
self.accepted[slot] = BallotValuePair(BallotNumber(self.ballot.seq_num, self.address), value.value)
self.slot_to_acceptors[p2a.slot_num].add(self.address)
setTimer(P2ATimer(p2a), P2ATimer.P2A_RETRY_MILLIS, self.onP2ATimer)
# This server is elected as leader
if not self.is_leader:
print(f"{self.address} becomes leader")
self.is_leader = True
# Must update its state with accepted values from acceptors
for slot in self.p1b_replies:
value = self.p1b_replies[slot]
if self.status(slot) != PaxosLogSlotStatus.CHOSEN:
self.proposals[slot] = BallotValuePair(BallotNumber(self.ballot.seq_num, self.address), value.value)
self.slot_in = max(self.slot_in, slot + 1)
bvp = self.proposals[slot]
p2a = P2A(self.address, bvp.ballot_num, slot, bvp.value)
for acceptor in self.servers:
if acceptor != self.address:
self.send_msg(p2a, acceptor)
self.accepted[slot] = BallotValuePair(BallotNumber(self.ballot.seq_num, self.address), value.value)
self.slot_to_acceptors[p2a.slot_num].add(self.address)
setTimer(P2ATimer(p2a), P2ATimer.P2A_RETRY_MILLIS, self.onP2ATimer)
def handleP2A(self, p2a, sender):
# print("p2a - 0\n", file=sys.stdout)
......@@ -198,7 +198,7 @@ class Paxos(socketserver.UDPServer):
return
newLeaderSeen = False
if self.highest_ballot_seen < heartbeat.ballot_num:
self.isLeader = False
self.is_leader = False
self.highest_ballot_seen = heartbeat.ballot_num
newLeaderSeen = True
# Replace log with the bigger log slot
......@@ -211,9 +211,9 @@ class Paxos(socketserver.UDPServer):
new_bvp = heartbeat.log[slot]
if new_bvp > bvp:
self.log[slot] = new_bvp
self.executeLog("Handle Leader Heartbeat");
self.executeLog("Handle Leader Heartbeat")
self.leader_recent_ping = True;
self.leader_recent_ping = True
if newLeaderSeen:
# Exponential backoff
setTimer(HeartBeatCheckTimer(self.highest_ballot_seen), HeartBeatCheckTimer.HEARTBEAT_CHECK_RETRY_MILLIS * 2, self.onHeartBeatCheckTimer)
......@@ -288,19 +288,17 @@ class Paxos(socketserver.UDPServer):
def status(self, log_slot_num) -> PaxosLogSlotStatus:
if log_slot_num in self.log:
return PaxosLogSlotStatus.CHOSEN
if log_slot_num in self.accepted:
return PaxosLogSlotStatus.ACCEPTED
return PaxosLogSlotStatus.EMPTY
def executeLog(self, context):
foundEmpty = False
for i in range(self.slot_out, self.slot_in):
status = self.status(i)
for j in range(self.slot_out, self.slot_in):
status = self.status(j)
if status == PaxosLogSlotStatus.CHOSEN:
if foundEmpty:
continue
bvp = self.log[i]
bvp = self.log[j]
# execute and reply to client
lock_res = self.lock_manager.execute(bvp.value.lock_command, bvp.value.addr)
......@@ -314,21 +312,20 @@ class Paxos(socketserver.UDPServer):
foundEmpty = True
if status == PaxosLogSlotStatus.EMPTY:
if self.is_leader:
p2a = P2A(self.address, BallotNumber(self.ballot.seq_num, self.address), i, PaxosRequest(self.address, None))
for server in self.servers:
if server != self.address:
self.send_msg(p2a, server)
self.slot_to_acceptors[i].add(self.address)
self.proposals[i] = BallotValuePair(BallotNumber(self.ballot.seq_num, self.address), p2a.value)
self.accepted[i] = BallotValuePair(BallotNumber(self.ballot.seq_num, self.address), p2a.value)
p2a = P2A(self.address, BallotNumber(self.ballot.seq_num, self.address), j, PaxosRequest(self.address, None))
for tmp_server in self.servers:
if tmp_server != self.address:
self.send_msg(p2a, tmp_server)
self.slot_to_acceptors[j].add(self.address)
self.proposals[j] = BallotValuePair(BallotNumber(self.ballot.seq_num, self.address), p2a.value)
self.accepted[j] = BallotValuePair(BallotNumber(self.ballot.seq_num, self.address), p2a.value)
# Serialize obj, and send the message.
# This function will not wait for reply (communication between paxos nodes)
def send_msg(self, obj, dest_addr: Address):
def send_msg(self, obj, dest_address: Address):
# print(f"Sending {obj} to {dest_addr}", file=sys.stdout)
data = pickle.dumps(obj)
self.socket.sendto(data, dest_addr)
self.socket.sendto(data, dest_address)
class PaxosHandler(socketserver.BaseRequestHandler):
......@@ -342,6 +339,7 @@ class PaxosHandler(socketserver.BaseRequestHandler):
This will receive lock() / unlock() command from client
Handler for proposals and leader election will be a different class (I think, probably using different port?)
"""
def handle(self):
# Use self.arg to get servers fields
# Note that we guarantee communication client to server is exactly once,
......@@ -382,6 +380,7 @@ class PaxosHandler(socketserver.BaseRequestHandler):
# prob just ignore message
print("unrecognized message", file=sys.stdout) # debug
if __name__ == "__main__":
HOST, PORT = sys.argv[1], int(sys.argv[2])
# addresses = ((HOST, PORT),)
......
......@@ -4,11 +4,12 @@ from typing import Tuple
Address = Tuple[str, int] # Type declaration for what an address is
@total_ordering
class BallotNumber:
def __init__(self, seq_num, addr) -> None:
self.seq_num = seq_num # Sequence number of the proposal
self.addr = addr # Address of the server proposing
self.seq_num = seq_num # Sequence number of the proposal
self.addr = addr # Address of the server proposing
def increaseBallot(self):
self.seq_num += 1
......@@ -28,7 +29,7 @@ class BallotNumber:
else:
# Same seq_num, use address for break
return self.addr[1] < other.addr[1]
def __eq__(self, other) -> bool:
if not self._is_valid_operand(other):
return NotImplemented
......@@ -36,12 +37,13 @@ class BallotNumber:
def __str__(self) -> str:
return "BallotNumber(seq_num: " + str(self.seq_num) + ", addr: " + str(self.addr) + ")"
@total_ordering
class BallotValuePair:
def __init__(self, ballot_num, value) -> None:
self.ballot_num = ballot_num # ballot number
self.value = value # paxos request
self.ballot_num = ballot_num # ballot number
self.value = value # paxos request
def _is_valid_operand(self, other):
return (hasattr(other, "ballot_num") and
......@@ -51,9 +53,9 @@ class BallotValuePair:
def __lt__(self, other) -> bool:
if not self._is_valid_operand(other):
return NotImplemented
return self.ballot_num < other.ballot_num
def __eq__(self, other) -> bool:
if not self._is_valid_operand(other):
return NotImplemented
......@@ -62,14 +64,7 @@ class BallotValuePair:
def __str__(self) -> str:
return "BallotValuePair(ballot_num: " + str(self.ballot_num) + ", value: " + str(self.value) + ")"
class PaxosLogSlotStatus(Enum):
EMPTY = 0
ACCEPTED = 1
CHOSEN = 2
#public enum PaxosLogSlotStatus {
#EMPTY, // no command is known by the server for this slot
#ACCEPTED, // a command has been tentatively accepted by this server
#CHOSEN, // the server knows a command to be permanently chosen for this slot
#CLEARED // the command in this slot has been garbage-collected at the server
#}
\ No newline at end of file
EMPTY = 0 # no command is known by the server for this slot
CHOSEN = 1 # the server knows a command to be permanently chosen for this slot
......@@ -8,6 +8,8 @@ import threading
"""
Timers for retrying certain operations (e.g. resending messages)
"""
class Timer:
def __init__(self) -> None:
pass
......@@ -15,6 +17,7 @@ class Timer:
def __str__(self) -> str:
return f"timer: {self.__class__.__name__}\n"
class ClientTimer(Timer):
CLIENT_RETRY_MILLIS = 5000
......@@ -23,6 +26,7 @@ class ClientTimer(Timer):
self.paxos_req: PaxosRequest = request
self.req_num: int = req_num
class P2ATimer(Timer):
P2A_RETRY_MILLIS = 500
......@@ -57,6 +61,8 @@ class LeaderElectionTimer(Timer):
"""
Setting a timer with callback
"""
def setTimer(timer: Timer, timerLengthMillis: int, callback: Callable):
# threading.Timer takes in time (in s), a callback function, and arguments
# (timer, ) is a list of positional arguments to the callback function
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment