Commit d2fd3c80 authored by Ellis Michael's avatar Ellis Michael

Update to 2015 labs

parent b8b54cb6
api.key
/pkg/ /pkg/
/bin/
mrtmp.*
/main/diff.out
/mapreduce/x.txt
/pbservice/x.txt
/kvpaxos/x.txt
...@@ -789,6 +789,177 @@ func Test5AppendUse(t *testing.T) { ...@@ -789,6 +789,177 @@ func Test5AppendUse(t *testing.T) {
fmt.Printf(" ... Passed\n") fmt.Printf(" ... Passed\n")
} }
//
// recovery if a single replica loses disk content.
//
func Test5OneLostDisk(t *testing.T) {
tc := setup(t, "onelostdisk", 1, 3, false)
defer tc.cleanup()
fmt.Printf("Test: One server loses disk and restarts ...\n")
tc.join(0)
ck := tc.clerk()
g0 := tc.groups[0]
k1 := randstring(10)
k1v := ""
k2 := randstring(10)
k2v := ""
for i := 0; i < 7+(rand.Int()%7); i++ {
x := randstring(10)
ck.Append(k1, x)
k1v += x
k2v = randstring(10)
ck.Put(k2, k2v)
}
time.Sleep(300 * time.Millisecond)
ck.Get(k1)
time.Sleep(300 * time.Millisecond)
ck.Get(k2)
for i := 0; i < len(g0.servers); i++ {
k1x := ck.Get(k1)
if k1x != k1v {
t.Fatalf("wrong value for k1, i=%v, wanted=%v, got=%v", i, k1v, k1x)
}
k2x := ck.Get(k2)
if k2x != k2v {
t.Fatalf("wrong value for k2")
}
tc.kill1(0, i, true)
time.Sleep(1 * time.Second)
{
z := randstring(10)
k1v += z
ck.Append(k1, z)
k2v = randstring(10)
ck.Put(k2, k2v)
}
tc.start1(0, i)
{
z := randstring(10)
k1v += z
ck.Append(k1, z)
time.Sleep(10 * time.Millisecond)
z = randstring(10)
k1v += z
ck.Append(k1, z)
}
time.Sleep(2 * time.Second)
}
if ck.Get(k1) != k1v {
t.Fatalf("wrong value for k1")
}
if ck.Get(k2) != k2v {
t.Fatalf("wrong value for k2")
}
fmt.Printf(" ... Passed\n")
}
//
// one disk lost while another replica is merely down.
//
func Test5OneLostOneDown(t *testing.T) {
tc := setup(t, "onelostonedown", 1, 5, false)
defer tc.cleanup()
fmt.Printf("Test: One server down, another loses disk ...\n")
tc.join(0)
ck := tc.clerk()
g0 := tc.groups[0]
k1 := randstring(10)
k1v := ""
k2 := randstring(10)
k2v := ""
for i := 0; i < 7+(rand.Int()%7); i++ {
x := randstring(10)
ck.Append(k1, x)
k1v += x
k2v = randstring(10)
ck.Put(k2, k2v)
}
time.Sleep(300 * time.Millisecond)
ck.Get(k1)
time.Sleep(300 * time.Millisecond)
ck.Get(k2)
tc.kill1(0, 0, false)
for i := 1; i < len(g0.servers); i++ {
k1x := ck.Get(k1)
if k1x != k1v {
t.Fatalf("wrong value for k1, i=%v, wanted=%v, got=%v", i, k1v, k1x)
}
k2x := ck.Get(k2)
if k2x != k2v {
t.Fatalf("wrong value for k2")
}
tc.kill1(0, i, true)
time.Sleep(1 * time.Second)
{
z := randstring(10)
k1v += z
ck.Append(k1, z)
k2v = randstring(10)
ck.Put(k2, k2v)
}
tc.start1(0, i)
{
z := randstring(10)
k1v += z
ck.Append(k1, z)
time.Sleep(10 * time.Millisecond)
z = randstring(10)
k1v += z
ck.Append(k1, z)
}
time.Sleep(2 * time.Second)
}
if ck.Get(k1) != k1v {
t.Fatalf("wrong value for k1")
}
if ck.Get(k2) != k2v {
t.Fatalf("wrong value for k2")
}
tc.start1(0, 0)
ck.Put("a", "b")
time.Sleep(1 * time.Second)
ck.Put("a", "c")
if ck.Get(k1) != k1v {
t.Fatalf("wrong value for k1")
}
if ck.Get(k2) != k2v {
t.Fatalf("wrong value for k2")
}
fmt.Printf(" ... Passed\n")
}
// check that all known appends are present in a value, // check that all known appends are present in a value,
// and are in order for each concurrent client. // and are in order for each concurrent client.
func checkAppends(t *testing.T, v string, counts []int) { func checkAppends(t *testing.T, v string, counts []int) {
...@@ -858,7 +1029,7 @@ func doConcurrentCrash(t *testing.T, unreliable bool) { ...@@ -858,7 +1029,7 @@ func doConcurrentCrash(t *testing.T, unreliable bool) {
} }
for i := 0; i < 3; i++ { for i := 0; i < 3; i++ {
tc.kill1(0, i%3, false) tc.kill1(0, i%3, true)
time.Sleep(1000 * time.Millisecond) time.Sleep(1000 * time.Millisecond)
ck.Get(k1) ck.Get(k1)
tc.start1(0, i%3) tc.start1(0, i%3)
...@@ -943,7 +1114,7 @@ func Test5Simultaneous(t *testing.T) { ...@@ -943,7 +1114,7 @@ func Test5Simultaneous(t *testing.T) {
if (rand.Int() % 1000) < 500 { if (rand.Int() % 1000) < 500 {
tc.kill1(0, i%3, false) tc.kill1(0, i%3, false)
} else { } else {
tc.kill1(0, i%3, false) tc.kill1(0, i%3, true)
} }
time.Sleep(1000 * time.Millisecond) time.Sleep(1000 * time.Millisecond)
vx := ck.Get(k1) vx := ck.Get(k1)
...@@ -960,3 +1131,150 @@ func Test5Simultaneous(t *testing.T) { ...@@ -960,3 +1131,150 @@ func Test5Simultaneous(t *testing.T) {
fmt.Printf(" ... Passed\n") fmt.Printf(" ... Passed\n")
} }
//
// recovery with mixture of lost disks and simple reboot.
// does a replica that loses its disk wait for majority?
//
func Test5RejoinMix1(t *testing.T) {
tc := setup(t, "rejoinmix1", 1, 5, false)
defer tc.cleanup()
fmt.Printf("Test: replica waits correctly after disk loss ...\n")
tc.join(0)
ck := tc.clerk()
k1 := randstring(10)
k1v := ""
for i := 0; i < 7+(rand.Int()%7); i++ {
x := randstring(10)
ck.Append(k1, x)
k1v += x
}
time.Sleep(300 * time.Millisecond)
ck.Get(k1)
tc.kill1(0, 0, false)
for i := 0; i < 2; i++ {
x := randstring(10)
ck.Append(k1, x)
k1v += x
}
time.Sleep(300 * time.Millisecond)
ck.Get(k1)
time.Sleep(300 * time.Millisecond)
tc.kill1(0, 1, true)
tc.kill1(0, 2, true)
tc.kill1(0, 3, false)
tc.kill1(0, 4, false)
tc.start1(0, 0)
tc.start1(0, 1)
tc.start1(0, 2)
time.Sleep(300 * time.Millisecond)
// check that requests are not executed.
ch := make(chan string)
go func() {
ck1 := tc.clerk()
v := ck1.Get(k1)
ch <- v
}()
select {
case <-ch:
t.Fatalf("Get should not have succeeded.")
case <-time.After(3 * time.Second):
// this is what we hope for.
}
tc.start1(0, 3)
tc.start1(0, 4)
{
x := randstring(10)
ck.Append(k1, x)
k1v += x
}
v := ck.Get(k1)
if v != k1v {
t.Fatalf("Get returned wrong value")
}
fmt.Printf(" ... Passed\n")
}
//
// does a replica that loses its state avoid
// changing its mind about Paxos agreements?
//
func Test5RejoinMix3(t *testing.T) {
tc := setup(t, "rejoinmix3", 1, 5, false)
defer tc.cleanup()
fmt.Printf("Test: replica Paxos resumes correctly after disk loss ...\n")
tc.join(0)
ck := tc.clerk()
k1 := randstring(10)
k1v := ""
for i := 0; i < 7+(rand.Int()%7); i++ {
x := randstring(10)
ck.Append(k1, x)
k1v += x
}
time.Sleep(300 * time.Millisecond)
ck.Get(k1)
// kill R1, R2.
tc.kill1(0, 1, false)
tc.kill1(0, 2, false)
// R0, R3, and R4 are up.
for i := 0; i < 100+(rand.Int()%7); i++ {
x := randstring(10)
ck.Append(k1, x)
k1v += x
}
// kill R0, lose disk.
tc.kill1(0, 0, true)
time.Sleep(50 * time.Millisecond)
// restart R1, R2, R0.
tc.start1(0, 1)
tc.start1(0, 2)
time.Sleep(1 * time.Millisecond)
tc.start1(0, 0)
chx := make(chan bool)
x1 := randstring(10)
x2 := randstring(10)
go func() { ck.Append(k1, x1); chx <- true }()
time.Sleep(10 * time.Millisecond)
go func() { ck.Append(k1, x2); chx <- true }()
<-chx
<-chx
xv := ck.Get(k1)
if xv == k1v+x1+x2 || xv == k1v+x2+x1 {
// ok
} else {
t.Fatalf("wrong value")
}
fmt.Printf(" ... Passed\n")
}
package kvpaxos package kvpaxos
import "net/rpc" import "net/rpc"
import "crypto/rand"
import "math/big"
import "fmt" import "fmt"
type Clerk struct { type Clerk struct {
...@@ -8,6 +11,13 @@ type Clerk struct { ...@@ -8,6 +11,13 @@ type Clerk struct {
// You will have to modify this struct. // You will have to modify this struct.
} }
func nrand() int64 {
max := big.NewInt(int64(1) << 62)
bigx, _ := rand.Int(rand.Reader, max)
x := bigx.Int64()
return x
}
func MakeClerk(servers []string) *Clerk { func MakeClerk(servers []string) *Clerk {
ck := new(Clerk) ck := new(Clerk)
ck.servers = servers ck.servers = servers
...@@ -25,8 +35,9 @@ func MakeClerk(servers []string) *Clerk { ...@@ -25,8 +35,9 @@ func MakeClerk(servers []string) *Clerk {
// if call() was not able to contact the server. in particular, // if call() was not able to contact the server. in particular,
// the reply's contents are only valid if call() returned true. // the reply's contents are only valid if call() returned true.
// //
// you should assume that call() will time out and return an // you should assume that call() will return an
// error after a while if it doesn't get a reply from the server. // error after a while if the server is dead.
// don't provide your own time-out mechanism.
// //
// please use call() to send all RPCs, in client.go and server.go. // please use call() to send all RPCs, in client.go and server.go.
// please don't change this function. // please don't change this function.
...@@ -59,18 +70,15 @@ func (ck *Clerk) Get(key string) string { ...@@ -59,18 +70,15 @@ func (ck *Clerk) Get(key string) string {
} }
// //
// set the value for a key. // shared by Put and Append.
// keeps trying until it succeeds.
// //
func (ck *Clerk) PutExt(key string, value string, dohash bool) string { func (ck *Clerk) PutAppend(key string, value string, op string) {
// You will have to modify this function. // You will have to modify this function.
return ""
} }
func (ck *Clerk) Put(key string, value string) { func (ck *Clerk) Put(key string, value string) {
ck.PutExt(key, value, false) ck.PutAppend(key, value, "Put")
} }
func (ck *Clerk) PutHash(key string, value string) string { func (ck *Clerk) Append(key string, value string) {
v := ck.PutExt(key, value, true) ck.PutAppend(key, value, "Append")
return v
} }
package kvpaxos package kvpaxos
import "hash/fnv"
const ( const (
OK = "OK" OK = "OK"
ErrNoKey = "ErrNoKey" ErrNoKey = "ErrNoKey"
...@@ -9,19 +7,19 @@ const ( ...@@ -9,19 +7,19 @@ const (
type Err string type Err string
type PutArgs struct { // Put or Append
type PutAppendArgs struct {
// You'll have to add definitions here. // You'll have to add definitions here.
Key string Key string
Value string Value string
DoHash bool // For PutHash Op string // "Put" or "Append"
// You'll have to add definitions here. // You'll have to add definitions here.
// Field names must start with capital letters, // Field names must start with capital letters,
// otherwise RPC will break. // otherwise RPC will break.
} }
type PutReply struct { type PutAppendReply struct {
Err Err Err Err
PreviousValue string // For PutHash
} }
type GetArgs struct { type GetArgs struct {
...@@ -33,9 +31,3 @@ type GetReply struct { ...@@ -33,9 +31,3 @@ type GetReply struct {
Err Err Err Err
Value string Value string
} }
func hash(s string) uint32 {
h := fnv.New32a()
h.Write([]byte(s))
return h.Sum32()
}
...@@ -6,6 +6,7 @@ import "net/rpc" ...@@ -6,6 +6,7 @@ import "net/rpc"
import "log" import "log"
import "paxos" import "paxos"
import "sync" import "sync"
import "sync/atomic"
import "os" import "os"
import "syscall" import "syscall"
import "encoding/gob" import "encoding/gob"
...@@ -30,8 +31,8 @@ type KVPaxos struct { ...@@ -30,8 +31,8 @@ type KVPaxos struct {
mu sync.Mutex mu sync.Mutex
l net.Listener l net.Listener
me int me int
dead bool // for testing dead int32 // for testing
unreliable bool // for testing unreliable int32 // for testing
px *paxos.Paxos px *paxos.Paxos
// Your definitions here. // Your definitions here.
...@@ -42,21 +43,39 @@ func (kv *KVPaxos) Get(args *GetArgs, reply *GetReply) error { ...@@ -42,21 +43,39 @@ func (kv *KVPaxos) Get(args *GetArgs, reply *GetReply) error {
return nil return nil
} }
func (kv *KVPaxos) Put(args *PutArgs, reply *PutReply) error { func (kv *KVPaxos) PutAppend(args *PutAppendArgs, reply *PutAppendReply) error {
// Your code here. // Your code here.
return nil return nil
} }
// tell the server to shut itself down. // tell the server to shut itself down.
// please do not change this function. // please do not change these two functions.
func (kv *KVPaxos) kill() { func (kv *KVPaxos) kill() {
DPrintf("Kill(%d): die\n", kv.me) DPrintf("Kill(%d): die\n", kv.me)
kv.dead = true atomic.StoreInt32(&kv.dead, 1)
kv.l.Close() kv.l.Close()
kv.px.Kill() kv.px.Kill()
} }
// call this to find out if the server is dead.
func (kv *KVPaxos) isdead() bool {
return atomic.LoadInt32(&kv.dead) != 0
}
// please do not change these two functions.
func (kv *KVPaxos) setunreliable(what bool) {
if what {
atomic.StoreInt32(&kv.unreliable, 1)
} else {
atomic.StoreInt32(&kv.unreliable, 0)
}
}
func (kv *KVPaxos) isunreliable() bool {
return atomic.LoadInt32(&kv.unreliable) != 0
}
// //
// servers[] contains the ports of the set of // servers[] contains the ports of the set of
// servers that will cooperate via Paxos to // servers that will cooperate via Paxos to
...@@ -89,13 +108,13 @@ func StartServer(servers []string, me int) *KVPaxos { ...@@ -89,13 +108,13 @@ func StartServer(servers []string, me int) *KVPaxos {
// or do anything to subvert it. // or do anything to subvert it.
go func() { go func() {
for kv.dead == false { for kv.isdead() == false {
conn, err := kv.l.Accept() conn, err := kv.l.Accept()
if err == nil && kv.dead == false { if err == nil && kv.isdead() == false {
if kv.unreliable && (rand.Int63()%1000) < 100 { if kv.isunreliable() && (rand.Int63()%1000) < 100 {
// discard the request. // discard the request.
conn.Close() conn.Close()
} else if kv.unreliable && (rand.Int63()%1000) < 200 { } else if kv.isunreliable() && (rand.Int63()%1000) < 200 {
// process the request but force discard of reply. // process the request but force discard of reply.
c1 := conn.(*net.UnixConn) c1 := conn.(*net.UnixConn)
f, _ := c1.File() f, _ := c1.File()
...@@ -110,7 +129,7 @@ func StartServer(servers []string, me int) *KVPaxos { ...@@ -110,7 +129,7 @@ func StartServer(servers []string, me int) *KVPaxos {