Commit 45d6c0d2 authored by Nickolai Zeldovich's avatar Nickolai Zeldovich
Browse files

update

parents
# This is the Makefile helping you submit the labs.
# Just create 6.824/api.key with your API key in it,
# and submit your lab with the following command:
# $ make [lab1|lab2a|lab2b|lab3a|lab3b|lab4a|lab4b]
KEY=$(shell cat api.key)
LABS="lab1 lab2a lab2b lab3a lab3b lab4a lab4b"
%:
@if echo $(LABS) | grep -q "$@ " ; then \
tar cvzf $@-handin.tar.gz src; \
if test -z $(KEY) ; then \
echo "Missing $(PWD)/api.key. Please create the file with your key in it or submit the $@-handin.tar.gz via the web interface."; \
else \
curl -F file=@$@-handin.tar.gz -F key=$(KEY) http://ydmao.scripts.mit.edu/6.824/handin.py/upload; \
fi; \
else \
echo "Bad target $@. Usage: make [$(LABS)]"; \
fi
mrtmp.*
/main/diff.out
/mapreduce/x.txt
/pbservice/x.txt
/kvpaxos/x.txt
package kvpaxos
import "net/rpc"
import "time"
import "crypto/rand"
import "math/big"
type Clerk struct {
servers []string
// You will have to modify this struct.
}
func MakeClerk(servers []string) *Clerk {
ck := new(Clerk)
ck.servers = servers
// You'll have to add code here.
return ck
}
//
// call() sends an RPC to the rpcname handler on server srv
// with arguments args, waits for the reply, and leaves the
// reply in reply. the reply argument should be a pointer
// to a reply structure.
//
// the return value is true if the server responded, and false
// if call() was not able to contact the server. in particular,
// the reply's contents are only valid if call() returned true.
//
// you should assume that call() will time out and return an
// error after a while if it doesn't get a reply from the server.
//
// please use call() to send all RPCs, in client.go and server.go.
// please don't change this function.
//
func call(srv string, rpcname string,
args interface{}, reply interface{}) bool {
c, errx := rpc.Dial("unix", srv)
if errx != nil {
return false
}
defer c.Close()
err := c.Call(rpcname, args, reply)
if err == nil {
return true
}
return false
}
//
// fetch the current value for a key.
// returns "" if the key does not exist.
// keeps trying forever in the face of all other errors.
//
func (ck *Clerk) Get(key string) string {
// You will have to modify this function.
return ""
}
//
// set the value for a key.
// keeps trying until it succeeds.
//
func (ck *Clerk) PutExt(key string, value string, dohash bool) string {
// You will have to modify this function.
}
func (ck *Clerk) Put(key string, value string) {
ck.PutExt(key, value, false)
}
func (ck *Clerk) PutHash(key string, value string) string {
v := ck.PutExt(key, value, true)
return v
}
package kvpaxos
import "hash/fnv"
const (
OK = "OK"
ErrNoKey = "ErrNoKey"
)
type Err string
type PutArgs struct {
// You'll have to add definitions here.
Key string
Value string
DoHash bool // For PutHash
// You'll have to add definitions here.
// Field names must start with capital letters,
// otherwise RPC will break.
}
type PutReply struct {
Err Err
PreviousValue string // For PutHash
}
type GetArgs struct {
Key string
// You'll have to add definitions here.
}
type GetReply struct {
Err Err
Value string
}
func hash(s string) uint32 {
h := fnv.New32a()
h.Write([]byte(s))
return h.Sum32()
}
package kvpaxos
import "net"
import "fmt"
import "net/rpc"
import "log"
import "paxos"
import "sync"
import "os"
import "syscall"
import "encoding/gob"
import "math/rand"
const Debug=0
func DPrintf(format string, a ...interface{}) (n int, err error) {
if Debug > 0 {
log.Printf(format, a...)
}
return
}
type Op struct {
// Your definitions here.
// Field names must start with capital letters,
// otherwise RPC will break.
}
type KVPaxos struct {
mu sync.Mutex
l net.Listener
me int
dead bool // for testing
unreliable bool // for testing
px *paxos.Paxos
// Your definitions here.
}
func (kv *KVPaxos) Get(args *GetArgs, reply *GetReply) error {
// Your code here.
return nil
}
func (kv *KVPaxos) Put(args *PutArgs, reply *PutReply) error {
// Your code here.
return nil
}
// tell the server to shut itself down.
// please do not change this function.
func (kv *KVPaxos) kill() {
DPrintf("Kill(%d): die\n", kv.me)
kv.dead = true
kv.l.Close()
kv.px.Kill()
}
//
// servers[] contains the ports of the set of
// servers that will cooperate via Paxos to
// form the fault-tolerant key/value service.
// me is the index of the current server in servers[].
//
func StartServer(servers []string, me int) *KVPaxos {
// this call is all that's needed to persuade
// Go's RPC library to marshall/unmarshall
// struct Op.
gob.Register(Op{})
kv := new(KVPaxos)
kv.me = me
// Your initialization code here.
rpcs := rpc.NewServer()
rpcs.Register(kv)
kv.px = paxos.Make(servers, me, rpcs)
os.Remove(servers[me])
l, e := net.Listen("unix", servers[me]);
if e != nil {
log.Fatal("listen error: ", e);
}
kv.l = l
// please do not change any of the following code,
// or do anything to subvert it.
go func() {
for kv.dead == false {
conn, err := kv.l.Accept()
if err == nil && kv.dead == false {
if kv.unreliable && (rand.Int63() % 1000) < 100 {
// discard the request.
conn.Close()
} else if kv.unreliable && (rand.Int63() % 1000) < 200 {
// process the request but force discard of reply.
c1 := conn.(*net.UnixConn)
f, _ := c1.File()
err := syscall.Shutdown(int(f.Fd()), syscall.SHUT_WR)
if err != nil {
fmt.Printf("shutdown: %v\n", err)
}
go rpcs.ServeConn(conn)
} else {
go rpcs.ServeConn(conn)
}
} else if err == nil {
conn.Close()
}
if err != nil && kv.dead == false {
fmt.Printf("KVPaxos(%v) accept: %v\n", me, err.Error())
kv.kill()
}
}
}()
return kv
}
package kvpaxos
import "testing"
import "runtime"
import "strconv"
import "os"
import "time"
import "fmt"
import "math/rand"
func check(t *testing.T, ck *Clerk, key string, value string) {
v := ck.Get(key)
if v != value {
t.Fatalf("Get(%v) -> %v, expected %v", key, v, value)
}
}
func port(tag string, host int) string {
s := "/var/tmp/824-"
s += strconv.Itoa(os.Getuid()) + "/"
os.Mkdir(s, 0777)
s += "kv-"
s += strconv.Itoa(os.Getpid()) + "-"
s += tag + "-"
s += strconv.Itoa(host)
return s
}
func cleanup(kva []*KVPaxos) {
for i := 0; i < len(kva); i++ {
if kva[i] != nil {
kva[i].kill()
}
}
}
func NextValue(hprev string, val string) string {
h := hash(hprev + val)
return strconv.Itoa(int(h))
}
func TestBasic(t *testing.T) {
runtime.GOMAXPROCS(4)
const nservers = 3
var kva []*KVPaxos = make([]*KVPaxos, nservers)
var kvh []string = make([]string, nservers)
defer cleanup(kva)
for i := 0; i < nservers; i++ {
kvh[i] = port("basic", i)
}
for i := 0; i < nservers; i++ {
kva[i] = StartServer(kvh, i)
}
ck := MakeClerk(kvh)
var cka [nservers]*Clerk
for i := 0; i < nservers; i++ {
cka[i] = MakeClerk([]string{kvh[i]})
}
fmt.Printf("Test: Basic put/puthash/get ...\n")
pv := ck.PutHash("a", "x")
ov := ""
if ov != pv {
t.Fatalf("wrong value; expected %s got %s", ov, pv)
}
ck.Put("a", "aa")
check(t, ck, "a", "aa")
cka[1].Put("a", "aaa")
check(t, cka[2], "a", "aaa")
check(t, cka[1], "a", "aaa")
check(t, ck, "a", "aaa")
fmt.Printf(" ... Passed\n")
fmt.Printf("Test: Concurrent clients ...\n")
for iters := 0; iters < 20; iters++ {
const npara = 15
var ca [npara]chan bool
for nth := 0; nth < npara; nth++ {
ca[nth] = make(chan bool)
go func(me int) {
defer func() { ca[me] <- true }()
ci := (rand.Int() % nservers)
myck := MakeClerk([]string{kvh[ci]})
if (rand.Int() % 1000) < 500 {
myck.Put("b", strconv.Itoa(rand.Int()))
} else {
myck.Get("b")
}
}(nth)
}
for nth := 0; nth < npara; nth++ {
<- ca[nth]
}
var va [nservers]string
for i := 0; i < nservers; i++ {
va[i] = cka[i].Get("b")
if va[i] != va[0] {
t.Fatalf("mismatch")
}
}
}
fmt.Printf(" ... Passed\n")
time.Sleep(1 * time.Second)
}
func TestDone(t *testing.T) {
runtime.GOMAXPROCS(4)
const nservers = 3
var kva []*KVPaxos = make([]*KVPaxos, nservers)
var kvh []string = make([]string, nservers)
defer cleanup(kva)
for i := 0; i < nservers; i++ {
kvh[i] = port("done", i)
}
for i := 0; i < nservers; i++ {
kva[i] = StartServer(kvh, i)
}
ck := MakeClerk(kvh)
var cka [nservers]*Clerk
for pi := 0; pi < nservers; pi++ {
cka[pi] = MakeClerk([]string{kvh[pi]})
}
fmt.Printf("Test: server frees Paxos log memory...\n")
ck.Put("a", "aa")
check(t, ck, "a", "aa")
runtime.GC()
var m0 runtime.MemStats
runtime.ReadMemStats(&m0)
// rtm's m0.Alloc is 2 MB
sz := 1000000
items := 10
for iters := 0; iters < 2; iters++ {
for i := 0; i < items; i++ {
key := strconv.Itoa(i)
value := make([]byte, sz)
for j := 0; j < len(value); j++ {
value[j] = byte((rand.Int() % 100) + 1)
}
ck.Put(key, string(value))
check(t, cka[i % nservers], key, string(value))
}
}
// Put and Get to each of the replicas, in case
// the Done information is piggybacked on
// the Paxos proposer messages.
for iters := 0; iters < 2; iters++ {
for pi := 0; pi < nservers; pi++ {
cka[pi].Put("a", "aa")
check(t, cka[pi], "a", "aa")
}
}
time.Sleep(1 * time.Second)
runtime.GC()
var m1 runtime.MemStats
runtime.ReadMemStats(&m1)
// rtm's m1.Alloc is 45 MB
// fmt.Printf(" Memory: before %v, after %v\n", m0.Alloc, m1.Alloc)
allowed := m0.Alloc + uint64(nservers * items * sz * 2)
if m1.Alloc > allowed {
fmt.Printf(" You would fail this test if it were enabled (%v vs %v).\n", m1.Alloc, allowed)
}
fmt.Printf(" ... Passed\n")
}
func pp(tag string, src int, dst int) string {
s := "/var/tmp/824-"
s += strconv.Itoa(os.Getuid()) + "/"
s += "kv-" + tag + "-"
s += strconv.Itoa(os.Getpid()) + "-"
s += strconv.Itoa(src) + "-"
s += strconv.Itoa(dst)
return s
}
func cleanpp(tag string, n int) {
for i := 0; i < n; i++ {
for j := 0; j < n; j++ {
ij := pp(tag, i, j)
os.Remove(ij)
}
}
}
func part(t *testing.T, tag string, npaxos int, p1 []int, p2 []int, p3 []int) {
cleanpp(tag, npaxos)
pa := [][]int{p1, p2, p3}
for pi := 0; pi < len(pa); pi++ {
p := pa[pi]
for i := 0; i < len(p); i++ {
for j := 0; j < len(p); j++ {
ij := pp(tag, p[i], p[j])
pj := port(tag, p[j])
err := os.Link(pj, ij)
if err != nil {
t.Fatalf("os.Link(%v, %v): %v\n", pj, ij, err)
}
}
}
}
}
func TestPartition(t *testing.T) {
runtime.GOMAXPROCS(4)
tag := "partition"
const nservers = 5
var kva []*KVPaxos = make([]*KVPaxos, nservers)
defer cleanup(kva)
defer cleanpp(tag, nservers)
for i := 0; i < nservers; i++ {
var kvh []string = make([]string, nservers)
for j := 0; j < nservers; j++ {
if j == i {
kvh[j] = port(tag, i)
} else {
kvh[j] = pp(tag, i, j)
}
}
kva[i] = StartServer(kvh, i)
}
defer part(t, tag, nservers, []int{}, []int{}, []int{})
var cka [nservers]*Clerk
for i := 0; i < nservers; i++ {
cka[i] = MakeClerk([]string{port(tag, i)})
}
fmt.Printf("Test: No partition ...\n")
part(t, tag, nservers, []int{0,1,2,3,4}, []int{}, []int{})
cka[0].Put("1", "12")
cka[2].Put("1", "13")
check(t, cka[3], "1", "13")
fmt.Printf(" ... Passed\n")
fmt.Printf("Test: Progress in majority ...\n")
part(t, tag, nservers, []int{2,3,4}, []int{0,1}, []int{})
cka[2].Put("1", "14")
check(t, cka[4], "1", "14")
fmt.Printf(" ... Passed\n")
fmt.Printf("Test: No progress in minority ...\n")
done0 := false
done1 := false
go func() {
cka[0].Put("1", "15")
done0 = true
}()
go func() {
cka[1].Get("1")
done1 = true
}()
time.Sleep(time.Second)
if done0 {
t.Fatalf("Put in minority completed")
}
if done1 {
t.Fatalf("Get in minority completed")
}
check(t, cka[4], "1", "14")
cka[3].Put("1", "16")
check(t, cka[4], "1", "16")
fmt.Printf(" ... Passed\n")
fmt.Printf("Test: Completion after heal ...\n")
part(t, tag, nservers, []int{0,2,3,4}, []int{1}, []int{})
for iters := 0; iters < 30; iters++ {
if done0 {
break
}
time.Sleep(100 * time.Millisecond)
}
if done0 == false {
t.Fatalf("Put did not complete")
}
if done1 {
t.Fatalf("Get in minority completed")
}
check(t, cka[4], "1", "15")
check(t, cka[0], "1", "15")
part(t, tag, nservers, []int{0,1,2}, []int{3,4}, []int{})
for iters := 0; iters < 100; iters++ {
if done1 {
break
}
time.Sleep(100 * time.Millisecond)
}
if done1 == false {
t.Fatalf("Get did not complete")
}
check(t, cka[1], "1", "15")
fmt.Printf(" ... Passed\n")
}
func TestUnreliable(t *testing.T) {
runtime.GOMAXPROCS(4)
const nservers = 3
var kva []*KVPaxos = make([]*KVPaxos, nservers)
var kvh []string = make([]string, nservers)
defer cleanup(kva)
for i := 0; i < nservers; i++ {