Skip to content
GitLab
Explore
Sign in
Primary navigation
Search or go to…
Project
C
cse550
Manage
Activity
Members
Labels
Plan
Issues
Issue boards
Milestones
Wiki
Code
Merge requests
Repository
Branches
Commits
Tags
Repository graph
Compare revisions
Snippets
Build
Pipelines
Jobs
Pipeline schedules
Artifacts
Deploy
Releases
Package Registry
Model registry
Operate
Environments
Terraform modules
Monitor
Incidents
Analyze
Value stream analytics
Contributor analytics
CI/CD analytics
Repository analytics
Model experiments
Help
Help
Support
GitLab documentation
Compare GitLab plans
Community forum
Contribute to GitLab
Provide feedback
Keyboard shortcuts
?
Snippets
Groups
Projects
Show more breadcrumbs
Winston Jodjana
cse550
Commits
18e2d612
Commit
18e2d612
authored
2 years ago
by
WinJ
Browse files
Options
Downloads
Plain Diff
Merge remote-tracking branch 'origin/test'
# Conflicts: # assignment2/paxos.py
parents
b7be2a1e
2215ef04
No related branches found
Branches containing commit
No related tags found
No related merge requests found
Changes
3
Hide whitespace changes
Inline
Side-by-side
Showing
3 changed files
assignment2/client.py
+2
-2
2 additions, 2 deletions
assignment2/client.py
assignment2/paxos.py
+94
-56
94 additions, 56 deletions
assignment2/paxos.py
assignment2/paxos_utils.py
+13
-0
13 additions, 0 deletions
assignment2/paxos_utils.py
with
109 additions
and
58 deletions
assignment2/client.py
+
2
−
2
View file @
18e2d612
...
...
@@ -6,10 +6,10 @@ from message import * # Can change this after testing done
if
__name__
==
"
__main__
"
:
# create a UDP socket
HOST
,
PORT
=
"
localhost
"
,
9
000
HOST
,
PORT
=
"
localhost
"
,
8
000
# TODO: Uncomment below for multiple nodes paxos
serv_addresses
=
((
HOST
,
9
000
),
(
HOST
,
9
001
))
serv_addresses
=
((
HOST
,
8
000
),
(
HOST
,
8
001
))
sock
=
socket
.
socket
(
socket
.
AF_INET
,
socket
.
SOCK_DGRAM
)
sock
.
bind
((
'
localhost
'
,
0
))
address
=
sock
.
getsockname
()
...
...
This diff is collapsed.
Click to expand it.
assignment2/paxos.py
+
94
−
56
View file @
18e2d612
...
...
@@ -43,6 +43,9 @@ class Paxos(socketserver.UDPServer):
# Default leader during setup
if
self
.
address
==
self
.
servers
[
0
]:
self
.
is_leader
=
True
self
.
ballot
.
increaseBallot
()
else
:
self
.
highest_ballot_seen
=
BallotNumber
(
2
,
self
.
servers
[
0
])
print
(
"
Finished init paxos
"
,
file
=
sys
.
stderr
)
print
(
f
"
servers:
{
self
.
servers
}
"
,
file
=
sys
.
stderr
)
...
...
@@ -85,8 +88,8 @@ class PaxosHandler(socketserver.BaseRequestHandler):
print
(
message
,
"
\n
"
,
file
=
sys
.
stderr
)
# debug
if
isinstance
(
message
,
PaxosRequest
):
print
(
"
got paxos request
"
,
file
=
sys
.
stderr
)
# debug
self
.
handlePaxosRequest
(
message
,
self
.
client_address
)
print
(
"
got paxos request
"
,
file
=
sys
.
stderr
)
# debug
elif
isinstance
(
message
,
P1A
):
print
(
"
got p1a
"
,
file
=
sys
.
stderr
)
# debug
...
...
@@ -97,8 +100,8 @@ class PaxosHandler(socketserver.BaseRequestHandler):
self
.
handleP1B
(
message
,
self
.
client_address
)
elif
isinstance
(
message
,
P2A
):
print
(
"
got p2a
"
,
file
=
sys
.
stderr
)
# debug
self
.
handleP2A
(
message
,
self
.
client_
addr
ess
)
print
(
"
got p2a
"
,
file
=
sys
.
stderr
)
# debug
self
.
handleP2A
(
message
,
message
.
addr
)
elif
isinstance
(
message
,
P2B
):
print
(
"
got p2b
"
,
file
=
sys
.
stderr
)
# debug
...
...
@@ -134,20 +137,14 @@ class PaxosHandler(socketserver.BaseRequestHandler):
# TODO: translate java code
# requestInProcess.add(p2a.value()); No need, this is for AMO
# TODO: TIMERS: set(new P2ATimer(p2a), P2ATimer.P2A_RETRY_MILLIS);
# setTimer(P2ATimer(p2a), P2ATimer.P2A_RETRY_MILLIS, self.onP2ATimer)
setTimer
(
P2ATimer
(
p2a
),
P2ATimer
.
P2A_RETRY_MILLIS
,
self
.
onP2ATimer
)
if
len
(
self
.
server
.
slot_to_acceptors
[
p2a
.
slot_num
])
>
(
len
(
self
.
server
.
servers
)
/
2
):
#
Majority accepted, can put into log
if
len
(
self
.
server
.
slot_to_acceptors
[
p2a
.
slot_num
])
>
(
len
(
self
.
server
.
servers
)
/
2
):
#Majority accepted, can put into log
self
.
server
.
log
[
p2a
.
slot_num
]
=
bvp
# TODO: Executelog() function
# executeLog();
# Testing code to return the PaxosResult (TODO: Put this inside executeLog instead of here)
lock_res
=
self
.
server
.
lock_manager
.
execute
(
paxos_req
.
lock_command
,
self
.
client_address
)
self
.
executeLog
(
"
handlePaxosRequest
"
)
sock
=
self
.
request
[
1
]
sock
.
sendto
(
pickle
.
dumps
(
PaxosResult
(
sock
.
getsockname
(),
lock_res
)),
sender
)
def
handleP1A
(
self
,
p1a
,
sender
):
if
self
.
server
.
highest_ballot_seen
<
p1a
.
ballot_num
:
...
...
@@ -177,58 +174,59 @@ class PaxosHandler(socketserver.BaseRequestHandler):
self
.
server
.
p1b_replies
[
slot
]
=
new_bvp
else
:
if
new_bvp
.
ballot_num
>=
cur_bvp
.
ballot_num
:
self
.
server
.
p1b_repli
s
e
[
slot
]
=
new_bvp
if
len
(
self
.
server
.
voters
)
>
(
len
(
self
.
server
.
servers
)
/
2
-
1
):
self
.
server
.
p1b_replie
s
[
slot
]
=
new_bvp
if
len
(
self
.
server
.
voters
)
>
(
len
(
self
.
server
.
servers
)
/
2
-
1
):
# This server is elected as leader
self
.
server
.
is_leader
=
True
# Must update its state with accepted values from acceptors
for
slot
in
self
.
server
.
p1b_replies
:
pass
# TODO Translate java code
# if (status(slot) != PaxosLogSlotStatus.CHOSEN) {
# proposals.put(slot, new BallotValuePair(new BallotNumber(ballot.sequenceNum(), address()), p1bReplies.get(slot).value()));
# slotIn = Math.max(slotIn, slot + 1);
# BallotValuePair bvp = proposals.get(slot);
# P2A p2a = new P2A(bvp.ballotNumber(), slot, bvp.value());
# for (Address acceptor : servers) {
# if (!acceptor.equals(address())) {
# send(p2a, acceptor);
# }
# }
# accepted.put(slot, new BallotValuePair(new BallotNumber(ballot.sequenceNum(), address()), p1bReplies.get(slot).value()));
# // Accept our own proposal
# slotsToAcceptors.put(p2a.slotNum(), address());
# requestInProcess.add(p2a.value());
# set(new P2ATimer(p2a), P2ATimer.P2A_RETRY_MILLIS);
# TODO: TIMERS: set(new P2ATimer(p2a), P2ATimer.P2A_RETRY_MILLIS);
setTimer
(
P2ATimer
(
p2a
),
P2ATimer
.
P2A_RETRY_MILLIS
,
self
.
onP2ATimer
)
# }
for
slot
,
value
in
self
.
server
.
p1b_replies
:
if
self
.
status
(
slot
)
!=
PaxosLogSlotStatus
.
CHOSEN
:
self
.
server
.
proposals
[
slot
]
=
BallotValuePair
(
BallotNumber
(
self
.
server
.
ballot
.
seq_num
,
self
.
server
.
address
),
value
.
value
)
self
.
server
.
slot_in
=
max
(
self
.
server
.
slot_in
,
slot
+
1
)
bvp
=
self
.
server
.
proposals
[
slot
]
p2a
=
P2A
(
self
.
server
.
address
,
bvp
.
ballot_num
,
slot
,
bvp
.
value
)
for
acceptor
in
self
.
server
.
addresses
:
if
acceptor
!=
self
.
server
.
address
:
send_msg
(
p2a
,
acceptor
)
self
.
server
.
accepted
[
slot
]
=
BallotValuePair
(
BallotNumber
(
self
.
server
.
ballot
.
seq_num
,
self
.
server
.
address
),
value
.
value
)
self
.
server
.
slots_to_acceptors
[
p2a
.
slot_num
].
add
(
self
.
server
.
address
)
# requestInProcess.add(p2a.value());
setTimer
(
P2ATimer
(
p2a
),
P2ATimer
.
P2A_RETRY_MILLIS
,
self
.
onP2ATimer
)
def
handleP2A
(
self
,
p2a
,
sender
):
print
(
"
p2a - 0
\n
"
,
file
=
sys
.
stderr
)
print
(
f
"
highest_ballot_seen:
{
self
.
server
.
highest_ballot_seen
}
, p2a.ballot_num:
{
p2a
.
ballot_num
}
\n
"
,
file
=
sys
.
stderr
)
if
self
.
server
.
highest_ballot_seen
!=
p2a
.
ballot_num
:
# It's not the leader, drop it
return
# TODO: Not sure if needed but translate Java code
# if (status(p2a.slotNum()) == PaxosLogSlotStatus.CLEARED) {
# return;
# }
print
(
"
p2a - 1
\n
"
,
file
=
sys
.
stderr
)
if
p2a
.
slot_num
in
self
.
server
.
accepted
:
bvp
=
self
.
server
.
accepted
[
p2a
.
slot_num
]
if
bvp
.
ballot_num
<=
p2a
.
ballot_num
:
# p2a ballot is higher or equal
bvp
=
BallotValuePair
(
p2a
.
ballot_num
,
p2a
.
value
)
self
.
server
.
accepted
[
p2a
.
slot_num
,
bvp
]
self
.
server
.
accepted
[
p2a
.
slot_num
]
=
bvp
else
:
# Don't do anything
return
else
:
# Have not accepted anything, then accept it
bvp
=
BallotValuePair
(
p2a
.
ballot_num
,
p2a
.
value
)
self
.
server
.
accepted
[
p2a
.
slot_num
,
bvp
]
self
.
server
.
accepted
[
p2a
.
slot_num
]
=
bvp
print
(
"
p2a - 2
\n
"
)
self
.
server
.
slot_in
=
max
(
self
.
server
.
slot_in
,
p2a
.
slot_num
+
1
)
p2b
=
P2B
(
p2a
.
ballot_num
,
p2a
.
slot_num
)
p2b
=
P2B
(
self
.
server
.
address
,
p2a
.
ballot_num
,
p2a
.
slot_num
)
send_msg
(
p2b
,
sender
)
print
(
"
p2a - 3
\n
"
)
def
handleP2B
(
self
,
p2b
,
sender
):
if
not
self
.
server
.
is_leader
:
...
...
@@ -236,7 +234,7 @@ class PaxosHandler(socketserver.BaseRequestHandler):
return
# check if it is still consistent with our proposal
bvp
=
self
.
server
.
proposals
(
p2b
.
slot_num
)
bvp
=
self
.
server
.
proposals
[
p2b
.
slot_num
]
if
bvp
.
ballot_num
!=
p2b
.
ballot_num
:
# No longer in proposal
return
...
...
@@ -246,8 +244,7 @@ class PaxosHandler(socketserver.BaseRequestHandler):
if
len
(
self
.
server
.
slot_to_acceptors
[
p2b
.
slot_num
])
>
(
len
(
self
.
server
.
servers
)
/
2
):
# Majority accepted, can put into log
self
.
server
.
log
[
p2b
.
slot_num
]
=
bvp
# TODO: Create executeLog() and uncomment below
# executeLog()
self
.
executeLog
(
"
P2B
"
)
"""
Timer Handlers
...
...
@@ -257,20 +254,22 @@ class PaxosHandler(socketserver.BaseRequestHandler):
def
onP2ATimer
(
self
,
p2a_timer
:
P2ATimer
):
print
(
f
"
{
p2a_timer
}
: Callback
"
,
file
=
sys
.
stderr
)
# If not leader then stop timer
if
self
.
server
.
is_leader
:
# TODO: log status
if
self
.
server
.
is_leader
and
not
self
.
status
(
p2a_timer
.
p2a
.
slot_num
)
==
PaxosLogSlotStatus
.
CHOSEN
:
# TODO: log status
for
acceptor_addr
in
self
.
server
.
servers
:
if
acceptor_addr
!=
self
.
server
.
address
:
send_msg
(
p2a_timer
.
p2a
,
acceptor_addr
)
setTimer
(
p2a_timer
,
P2ATimer
.
P2A_RETRY_MILLIS
,
self
.
onP2ATimer
)
def
onHeartBeatTimer
(
self
,
hearbeat_timer
:
HeartBeatTimer
):
print
(
f
"
{
hearbeat_timer
}
: Callback
"
,
file
=
sys
.
stderr
)
def
onHeartBeatTimer
(
self
,
heartbeat_timer
:
HeartBeatTimer
):
print
(
f
"
{
heartbeat_timer
}
: Callback
"
,
file
=
sys
.
stderr
)
self
.
executeLog
(
"
HB timer
"
)
if
self
.
server
.
is_leader
:
for
acceptor_addr
in
self
.
server
.
servers
:
if
acceptor_addr
!=
self
.
server
.
address
:
lh
=
LeaderHeartbeat
(
self
.
server
.
log
,
self
.
server
.
ballot
)
send_msg
(
lh
,
acceptor_addr
)
setTimer
(
hearbeat_timer
,
HeartBeatTimer
.
HEARTBEAT_RETRY_MILLIS
,
self
.
onHeartBeatTimer
)
setTimer
(
hear
t
beat_timer
,
HeartBeatTimer
.
HEARTBEAT_RETRY_MILLIS
,
self
.
onHeartBeatTimer
)
def
onHeartBeatCheckTimer
(
self
,
hearbeat_check_timer
:
HeartBeatCheckTimer
):
print
(
f
"
{
hearbeat_check_timer
}
: Callback
"
,
file
=
sys
.
stderr
)
...
...
@@ -292,6 +291,8 @@ class PaxosHandler(socketserver.BaseRequestHandler):
if
acceptor_addr
!=
self
.
server
.
address
:
send_msg
(
leader_election_timer
.
p1a
,
acceptor_addr
)
setTimer
(
leader_election_timer
,
LeaderElectionTimer
.
LEADER_ELECTION_TIMER
,
self
.
onLeaderElectionTimer
)
def
__electLeader
(
self
):
# Try to elect ourself as the leader
# Try to get elected as leader at the beginning of time
...
...
@@ -312,6 +313,47 @@ class PaxosHandler(socketserver.BaseRequestHandler):
send_msg
(
p1a
,
acceptor_addr
)
setTimer
(
LeaderElectionTimer
(
p1a
),
LeaderElectionTimer
.
LEADER_ELECTION_TIMER
,
self
.
onLeaderElectionTimer
)
def
status
(
self
,
log_slot_num
)
->
PaxosLogSlotStatus
:
if
log_slot_num
in
self
.
server
.
log
:
return
PaxosLogSlotStatus
.
CHOSEN
if
log_slot_num
in
self
.
server
.
accepted
:
return
PaxosLogSlotStatus
.
ACCEPTED
return
PaxosLogSlotStatus
.
EMPTY
def
executeLog
(
self
,
context
):
foundEmpty
=
False
for
i
in
range
(
self
.
server
.
slot_out
,
self
.
server
.
slot_in
):
status
=
self
.
status
(
i
)
if
status
==
PaxosLogSlotStatus
.
CHOSEN
:
if
foundEmpty
:
continue
bvp
=
self
.
server
.
log
[
i
]
#cmd = bvp.value.lock_command
# if cmd != null ?
# execute and reply to client
lock_res
=
self
.
server
.
lock_manager
.
execute
(
bvp
.
value
.
lock_command
,
bvp
.
value
.
addr
)
sock
=
self
.
request
[
1
]
print
(
f
"
{
context
}
, executeLog - sending resp back to client
\n
"
)
sock
.
sendto
(
pickle
.
dumps
(
PaxosResult
(
self
.
server
.
address
,
lock_res
)),
bvp
.
value
.
addr
)
self
.
server
.
slot_out
+=
1
else
:
foundEmpty
=
True
if
status
==
PaxosLogSlotStatus
.
EMPTY
:
if
self
.
server
.
is_leader
:
p2a
=
P2A
(
self
.
server
.
address
,
BallotNumber
(
self
.
server
.
ballot
.
seq_num
,
self
.
server
.
address
),
i
,
PaxosRequest
(
self
.
server
.
add
,
None
))
for
server
in
self
.
server
.
addresses
:
if
server
!=
self
.
server
.
address
:
send_msg
(
p2a
,
server
)
self
.
server
.
slot_to_acceptors
[
i
].
add
(
self
.
server
.
address
)
self
.
server
.
proposals
[
i
]
=
BallotValuePair
(
BallotNumber
(
self
.
server
.
ballot
.
seq_num
,
self
.
server
.
address
),
p2a
.
value
)
self
.
server
.
accepted
[
i
]
=
BallotValuePair
(
BallotNumber
(
self
.
server
.
ballot
.
seq_num
,
self
.
server
.
address
),
p2a
.
value
)
# Serialize obj, and send the message.
# This function will not wait for reply (communication between paxos nodes)
...
...
@@ -330,12 +372,8 @@ def send_msg(obj, dest_addr: Address):
if
__name__
==
"
__main__
"
:
HOST
,
PORT
=
sys
.
argv
[
1
],
int
(
sys
.
argv
[
2
])
PORTSTART
,
PORTEND
=
int
(
sys
.
argv
[
3
]),
int
(
sys
.
argv
[
4
])
addresses
=
()
for
i
in
range
(
PORTSTART
,
PORTEND
+
1
):
addresses
+=
((
HOST
,
i
),)
# addresses = ((HOST, PORT),)
addresses
=
((
HOST
,
9000
),
(
HOST
,
9001
))
# Create the server, binding to localhost on port 9999
server
=
Paxos
((
HOST
,
PORT
),
addresses
)
# Activate the server; this will keep running until you
...
...
This diff is collapsed.
Click to expand it.
assignment2/paxos_utils.py
+
13
−
0
View file @
18e2d612
from
functools
import
total_ordering
from
enum
import
Enum
from
typing
import
Tuple
Address
=
Tuple
[
str
,
int
]
# Type declaration for what an address is
...
...
@@ -60,3 +61,15 @@ class BallotValuePair:
def
__str__
(
self
)
->
str
:
return
"
BallotValuePair(ballot_num:
"
+
str
(
self
.
ballot_num
)
+
"
, value:
"
+
str
(
self
.
value
)
+
"
)
"
class
PaxosLogSlotStatus
(
Enum
):
EMPTY
=
0
ACCEPTED
=
1
CHOSEN
=
2
#public enum PaxosLogSlotStatus {
#EMPTY, // no command is known by the server for this slot
#ACCEPTED, // a command has been tentatively accepted by this server
#CHOSEN, // the server knows a command to be permanently chosen for this slot
#CLEARED // the command in this slot has been garbage-collected at the server
#}
\ No newline at end of file
This diff is collapsed.
Click to expand it.
Preview
0%
Loading
Try again
or
attach a new file
.
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Save comment
Cancel
Please
register
or
sign in
to comment