Commit 70dce50f authored by Irene Y Zhang's avatar Irene Y Zhang

updating with lab 5 files

parent e6960dd3
package diskv
import "shardmaster"
import "net/rpc"
import "time"
import "sync"
import "fmt"
import "crypto/rand"
import "math/big"
type Clerk struct {
mu sync.Mutex // one RPC at a time
sm *shardmaster.Clerk
config shardmaster.Config
// You'll have to modify Clerk.
}
func nrand() int64 {
max := big.NewInt(int64(1) << 62)
bigx, _ := rand.Int(rand.Reader, max)
x := bigx.Int64()
return x
}
func MakeClerk(shardmasters []string) *Clerk {
ck := new(Clerk)
ck.sm = shardmaster.MakeClerk(shardmasters)
// You'll have to modify MakeClerk.
return ck
}
//
// call() sends an RPC to the rpcname handler on server srv
// with arguments args, waits for the reply, and leaves the
// reply in reply. the reply argument should be a pointer
// to a reply structure.
//
// the return value is true if the server responded, and false
// if call() was not able to contact the server. in particular,
// the reply's contents are only valid if call() returned true.
//
// you should assume that call() will return an
// error after a while if the server is dead.
// don't provide your own time-out mechanism.
//
// please use call() to send all RPCs, in client.go and server.go.
// please don't change this function.
//
func call(srv string, rpcname string,
args interface{}, reply interface{}) bool {
c, errx := rpc.Dial("unix", srv)
if errx != nil {
return false
}
defer c.Close()
err := c.Call(rpcname, args, reply)
if err == nil {
return true
}
fmt.Println(err)
return false
}
//
// which shard is a key in?
// please use this function,
// and please do not change it.
//
func key2shard(key string) int {
shard := 0
if len(key) > 0 {
shard = int(key[0])
}
shard %= shardmaster.NShards
return shard
}
//
// fetch the current value for a key.
// returns "" if the key does not exist.
// keeps trying forever in the face of all other errors.
//
func (ck *Clerk) Get(key string) string {
ck.mu.Lock()
defer ck.mu.Unlock()
// You'll have to modify Get().
for {
shard := key2shard(key)
gid := ck.config.Shards[shard]
servers, ok := ck.config.Groups[gid]
if ok {
// try each server in the shard's replication group.
for _, srv := range servers {
args := &GetArgs{}
args.Key = key
var reply GetReply
ok := call(srv, "DisKV.Get", args, &reply)
if ok && (reply.Err == OK || reply.Err == ErrNoKey) {
return reply.Value
}
if ok && (reply.Err == ErrWrongGroup) {
break
}
}
}
time.Sleep(100 * time.Millisecond)
// ask master for a new configuration.
ck.config = ck.sm.Query(-1)
}
}
// send a Put or Append request.
func (ck *Clerk) PutAppend(key string, value string, op string) {
ck.mu.Lock()
defer ck.mu.Unlock()
// You'll have to modify PutAppend().
for {
shard := key2shard(key)
gid := ck.config.Shards[shard]
servers, ok := ck.config.Groups[gid]
if ok {
// try each server in the shard's replication group.
for _, srv := range servers {
args := &PutAppendArgs{}
args.Key = key
args.Value = value
args.Op = op
var reply PutAppendReply
ok := call(srv, "DisKV.PutAppend", args, &reply)
if ok && reply.Err == OK {
return
}
if ok && (reply.Err == ErrWrongGroup) {
break
}
}
}
time.Sleep(100 * time.Millisecond)
// ask master for a new configuration.
ck.config = ck.sm.Query(-1)
}
}
func (ck *Clerk) Put(key string, value string) {
ck.PutAppend(key, value, "Put")
}
func (ck *Clerk) Append(key string, value string) {
ck.PutAppend(key, value, "Append")
}
package diskv
//
// Sharded key/value server.
// Lots of replica groups, each running op-at-a-time paxos.
// Shardmaster decides which group serves each shard.
// Shardmaster may change shard assignment from time to time.
//
// You will have to modify these definitions.
//
const (
OK = "OK"
ErrNoKey = "ErrNoKey"
ErrWrongGroup = "ErrWrongGroup"
)
type Err string
type PutAppendArgs struct {
Key string
Value string
Op string // "Put" or "Append"
// You'll have to add definitions here.
// Field names must start with capital letters,
// otherwise RPC will break.
}
type PutAppendReply struct {
Err Err
}
type GetArgs struct {
Key string
// You'll have to add definitions here.
}
type GetReply struct {
Err Err
Value string
}
package diskv
import "net"
import "fmt"
import "net/rpc"
import "log"
import "time"
import "paxos"
import "sync"
import "sync/atomic"
import "os"
import "syscall"
import "encoding/gob"
import "encoding/base32"
import "math/rand"
import "shardmaster"
import "io/ioutil"
import "strconv"
const Debug = 0
func DPrintf(format string, a ...interface{}) (n int, err error) {
if Debug > 0 {
log.Printf(format, a...)
}
return
}
type Op struct {
// Your definitions here.
}
type DisKV struct {
mu sync.Mutex
l net.Listener
me int
dead int32 // for testing
unreliable int32 // for testing
sm *shardmaster.Clerk
px *paxos.Paxos
dir string // each replica has its own data directory
gid int64 // my replica group ID
// Your definitions here.
}
//
// these are handy functions that might be useful
// for reading and writing key/value files, and
// for reading and writing entire shards.
// puts the key files for each shard in a separate
// directory.
//
func (kv *DisKV) shardDir(shard int) string {
d := kv.dir + "/shard-" + strconv.Itoa(shard) + "/"
// create directory if needed.
_, err := os.Stat(d)
if err != nil {
if err := os.Mkdir(d, 0777); err != nil {
log.Fatalf("Mkdir(%v): %v", d, err)
}
}
return d
}
// cannot use keys in file names directly, since
// they might contain troublesome characters like /.
// base32-encode the key to get a file name.
// base32 rather than base64 b/c Mac has case-insensitive
// file names.
func (kv *DisKV) encodeKey(key string) string {
return base32.StdEncoding.EncodeToString([]byte(key))
}
func (kv *DisKV) decodeKey(filename string) (string, error) {
key, err := base32.StdEncoding.DecodeString(filename)
return string(key), err
}
// read the content of a key's file.
func (kv *DisKV) fileGet(shard int, key string) (string, error) {
fullname := kv.shardDir(shard) + "/key-" + kv.encodeKey(key)
content, err := ioutil.ReadFile(fullname)
return string(content), err
}
// replace the content of a key's file.
// uses rename() to make the replacement atomic with
// respect to crashes.
func (kv *DisKV) filePut(shard int, key string, content string) error {
fullname := kv.shardDir(shard) + "/key-" + kv.encodeKey(key)
tempname := kv.shardDir(shard) + "/temp-" + kv.encodeKey(key)
if err := ioutil.WriteFile(tempname, []byte(content), 0666); err != nil {
return err
}
if err := os.Rename(tempname, fullname); err != nil {
return err
}
return nil
}
// return content of every key file in a given shard.
func (kv *DisKV) fileReadShard(shard int) map[string]string {
m := map[string]string{}
d := kv.shardDir(shard)
files, err := ioutil.ReadDir(d)
if err != nil {
log.Fatalf("fileReadShard could not read %v: %v", d, err)
}
for _, fi := range files {
n1 := fi.Name()
if n1[0:4] == "key-" {
key, err := kv.decodeKey(n1[4:])
if err != nil {
log.Fatalf("fileReadShard bad file name %v: %v", n1, err)
}
content, err := kv.fileGet(shard, key)
if err != nil {
log.Fatalf("fileReadShard fileGet failed for %v: %v", key, err)
}
m[key] = content
}
}
return m
}
// replace an entire shard directory.
func (kv *DisKV) fileReplaceShard(shard int, m map[string]string) {
d := kv.shardDir(shard)
os.RemoveAll(d) // remove all existing files from shard.
for k, v := range m {
kv.filePut(shard, k, v)
}
}
func (kv *DisKV) Get(args *GetArgs, reply *GetReply) error {
// Your code here.
return nil
}
// RPC handler for client Put and Append requests
func (kv *DisKV) PutAppend(args *PutAppendArgs, reply *PutAppendReply) error {
// Your code here.
return nil
}
//
// Ask the shardmaster if there's a new configuration;
// if so, re-configure.
//
func (kv *DisKV) tick() {
// Your code here.
}
// tell the server to shut itself down.
// please don't change these two functions.
func (kv *DisKV) kill() {
atomic.StoreInt32(&kv.dead, 1)
kv.l.Close()
kv.px.Kill()
}
// call this to find out if the server is dead.
func (kv *DisKV) isdead() bool {
return atomic.LoadInt32(&kv.dead) != 0
}
// please do not change these two functions.
func (kv *DisKV) Setunreliable(what bool) {
if what {
atomic.StoreInt32(&kv.unreliable, 1)
} else {
atomic.StoreInt32(&kv.unreliable, 0)
}
}
func (kv *DisKV) isunreliable() bool {
return atomic.LoadInt32(&kv.unreliable) != 0
}
//
// Start a shardkv server.
// gid is the ID of the server's replica group.
// shardmasters[] contains the ports of the
// servers that implement the shardmaster.
// servers[] contains the ports of the servers
// in this replica group.
// Me is the index of this server in servers[].
// dir is the directory name under which this
// replica should store all its files.
// each replica is passed a different directory.
// restart is false the very first time this server
// is started, and true to indicate a re-start
// after a crash or after a crash with disk loss.
//
func StartServer(gid int64, shardmasters []string,
servers []string, me int, dir string, restart bool) *DisKV {
kv := new(DisKV)
kv.me = me
kv.gid = gid
kv.sm = shardmaster.MakeClerk(shardmasters)
kv.dir = dir
// Your initialization code here.
// Don't call Join().
// log.SetOutput(ioutil.Discard)
gob.Register(Op{})
rpcs := rpc.NewServer()
rpcs.Register(kv)
kv.px = paxos.Make(servers, me, rpcs)
// log.SetOutput(os.Stdout)
os.Remove(servers[me])
l, e := net.Listen("unix", servers[me])
if e != nil {
log.Fatal("listen error: ", e)
}
kv.l = l
// please do not change any of the following code,
// or do anything to subvert it.
go func() {
for kv.isdead() == false {
conn, err := kv.l.Accept()
if err == nil && kv.isdead() == false {
if kv.isunreliable() && (rand.Int63()%1000) < 100 {
// discard the request.
conn.Close()
} else if kv.isunreliable() && (rand.Int63()%1000) < 200 {
// process the request but force discard of reply.
c1 := conn.(*net.UnixConn)
f, _ := c1.File()
err := syscall.Shutdown(int(f.Fd()), syscall.SHUT_WR)
if err != nil {
fmt.Printf("shutdown: %v\n", err)
}
go rpcs.ServeConn(conn)
} else {
go rpcs.ServeConn(conn)
}
} else if err == nil {
conn.Close()
}
if err != nil && kv.isdead() == false {
fmt.Printf("DisKV(%v) accept: %v\n", me, err.Error())
kv.kill()
}
}
}()
go func() {
for kv.isdead() == false {
kv.tick()
time.Sleep(250 * time.Millisecond)
}
}()
return kv
}
This diff is collapsed.
...@@ -109,7 +109,7 @@ func (ck *Clerk) Move(shard int, gid int64) { ...@@ -109,7 +109,7 @@ func (ck *Clerk) Move(shard int, gid int64) {
args := &MoveArgs{} args := &MoveArgs{}
args.Shard = shard args.Shard = shard
args.GID = gid args.GID = gid
var reply LeaveReply var reply MoveReply
ok := call(srv, "ShardMaster.Move", args, &reply) ok := call(srv, "ShardMaster.Move", args, &reply)
if ok { if ok {
return return
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment