import sys from message import LockCommand from paxos_utils import Address from typing import Dict class LockManager: def __init__(self) -> None: self.locks: Dict[int, Address] = dict() # returns true if lock is acquired or the same client tries to acquire # a locks it already has, false otherwise def lock(self, value: int, client: Address) -> bool: # acquire lock # print(f"Locking {value} for {client}", file=sys.stdout) if value not in self.locks: self.locks[value] = client return True # client already acquired this lock before if self.locks[value] == client: return True # lock is not available return False # returns true if lock is unlocked, false otherwise def unlock(self, value: int, client: Address) -> bool: # print(f"Unlocking {value} for {client}", file=sys.stdout) # lock does not exist if value not in self.locks: return False # client is not the owner of the locok if self.locks[value] != client: return False # unlock del self.locks[value] return True def lockstatus(self): print(f"\nCurrent Locks: {self.locks}\n", file=sys.stdout) def execute(self, lock_command: LockCommand, client_address: Address) -> bool: if lock_command is not None: lock_res = False if lock_command.op == "lock": lock_res = self.lock(int(lock_command.value), client_address) elif lock_command.op == "unlock": lock_res = self.unlock(int(lock_command.value), client_address) return lock_res