diff --git a/.check-build b/.check-build index 036b722..493e19a 100755 --- a/.check-build +++ b/.check-build @@ -46,12 +46,10 @@ REFERENCE_FILES=( src/kvraft1/kvraft_test.go src/kvraft1/test.go - # lab 5a - # src/shardkv1/test.go - - # lab 5b - # src/shardkv1/shardkv_test.go - # src/shardkv1/shardcfg/shardcfg_test.go + # lab 5 + src/shardkv1/test.go + src/shardkv1/shardkv_test.go + src/shardkv1/shardcfg/shardcfg_test.go ) main() { @@ -79,8 +77,7 @@ main() { "lab2") check_lab2;; "lab3a"|"lab3b"|"lab3c"|"lab3d") check_lab3;; "lab4a"|"lab4b"|"lab4c") check_lab4;; - "lab5a") check_lab5a;; - "lab5b") check_lab5b;; + "lab5a"|"lab5b"|"lab5c") check_lab5;; *) die "unknown lab: $labnum";; esac @@ -117,18 +114,11 @@ check_lab4() { check_cmd go test -c } -check_lab5a() { - # check_cmd cd src/shardctrler1 - # check_cmd go test -c -} - -check_lab5b() { - # check_cmd cd src/shardkv1 - # check_cmd go test -c +check_lab5() { + check_cmd cd src/shardkv1 + check_cmd go test -c # also check other labs/parts cd "$tmpdir" - check_lab5a - cd "$tmpdir" check_lab4 cd "$tmpdir" check_lab3 diff --git a/Makefile b/Makefile index 6863063..74b62cc 100644 --- a/Makefile +++ b/Makefile @@ -1,8 +1,8 @@ # This is the Makefile helping you submit the labs. # Submit your lab with the following command: -# $ make [lab1|lab2|lab3a|lab3b|lab3c|lab3d|lab4a|lab4b|lab4c|lab5a|lab5b] +# $ make [lab1|lab2|lab3a|lab3b|lab3c|lab3d|lab4a|lab4b|lab4c|lab5a|lab5b|lab5c] -LABS=" lab1 lab2 lab3a lab3b lab3c lab3d lab4a lab4b lab4c lab5a lab5b " +LABS=" lab1 lab2 lab3a lab3b lab3c lab3d lab4a lab4b lab4c lab5a lab5b lab5c " %: check-% @echo "Preparing $@-handin.tar.gz" diff --git a/src/shardkv1/client.go b/src/shardkv1/client.go new file mode 100644 index 0000000..02ba37c --- /dev/null +++ b/src/shardkv1/client.go @@ -0,0 +1,51 @@ +package shardkv + +// +// client code to talk to a sharded key/value service. +// +// the client uses the shardctrler to query for the current +// configuration and find the assignment of shards (keys) to groups, +// and then talks to the group that holds the key's shard. +// + +import ( + + "6.5840/kvsrv1/rpc" + "6.5840/kvtest1" + "6.5840/shardkv1/shardctrler" + "6.5840/tester1" +) + +type Clerk struct { + clnt *tester.Clnt + sck *shardctrler.ShardCtrler + // You will have to modify this struct. +} + +// The tester calls MakeClerk and passes in a shardctrler so that +// client can call it's Query method +func MakeClerk(clnt *tester.Clnt, sck *shardctrler.ShardCtrler) kvtest.IKVClerk { + ck := &Clerk{ + clnt: clnt, + sck: sck, + } + // You'll have to add code here. + return ck +} + + +// Get a key from a shardgrp. You can use shardcfg.Key2Shard(key) to +// find the shard responsible for the key and ck.sck.Query() to read +// the current configuration and lookup the servers in the group +// responsible for key. You can make a clerk for that group by +// calling shardgrp.MakeClerk(ck.clnt, servers). +func (ck *Clerk) Get(key string) (string, rpc.Tversion, rpc.Err) { + // You will have to modify this function. + return "", 0, "" +} + +// Put a key to a shard group. +func (ck *Clerk) Put(key string, value string, version rpc.Tversion) rpc.Err { + // You will have to modify this function. + return "" +} diff --git a/src/shardkv1/kvsrv1/client.go b/src/shardkv1/kvsrv1/client.go new file mode 100644 index 0000000..73f1d29 --- /dev/null +++ b/src/shardkv1/kvsrv1/client.go @@ -0,0 +1,56 @@ +package kvsrv + +import ( + "6.5840/kvsrv1/rpc" + "6.5840/kvtest1" + "6.5840/tester1" +) + + +type Clerk struct { + clnt *tester.Clnt + server string +} + +func MakeClerk(clnt *tester.Clnt, server string) kvtest.IKVClerk { + ck := &Clerk{clnt: clnt, server: server} + // You may add code here. + return ck +} + +// Get fetches the current value and version for a key. It returns +// ErrNoKey if the key does not exist. It keeps trying forever in the +// face of all other errors. +// +// You can send an RPC with code like this: +// ok := ck.clnt.Call(ck.server, "KVServer.Get", &args, &reply) +// +// The types of args and reply (including whether they are pointers) +// must match the declared types of the RPC handler function's +// arguments. Additionally, reply must be passed as a pointer. +func (ck *Clerk) Get(key string) (string, rpc.Tversion, rpc.Err) { + // You will have to modify this function. + return "", 0, rpc.ErrNoKey +} + +// Put updates key with value only if the version in the +// request matches the version of the key at the server. If the +// versions numbers don't match, the server should return +// ErrVersion. If Put receives an ErrVersion on its first RPC, Put +// should return ErrVersion, since the Put was definitely not +// performed at the server. If the server returns ErrVersion on a +// resend RPC, then Put must return ErrMaybe to the application, since +// its earlier RPC might have been processed by the server successfully +// but the response was lost, and the the Clerk doesn't know if +// the Put was performed or not. +// +// You can send an RPC with code like this: +// ok := ck.clnt.Call(ck.server, "KVServer.Put", &args, &reply) +// +// The types of args and reply (including whether they are pointers) +// must match the declared types of the RPC handler function's +// arguments. Additionally, reply must be passed as a pointer. +func (ck *Clerk) Put(key, value string, version rpc.Tversion) rpc.Err { + // You will have to modify this function. + return rpc.ErrNoKey +} diff --git a/src/shardkv1/kvsrv1/kvsrv_test.go b/src/shardkv1/kvsrv1/kvsrv_test.go new file mode 100644 index 0000000..68c1c79 --- /dev/null +++ b/src/shardkv1/kvsrv1/kvsrv_test.go @@ -0,0 +1,162 @@ +package kvsrv + +import ( + // "log" + "runtime" + "testing" + "time" + + "6.5840/kvsrv1/rpc" + "6.5840/kvtest1" +) + +// Test Put with a single client and a reliable network +func TestReliablePut(t *testing.T) { + const Val = "6.5840" + const Ver = 0 + + ts := MakeTestKV(t, true) + defer ts.Cleanup() + + ts.Begin("One client and reliable Put") + + ck := ts.MakeClerk() + if err := ck.Put("k", Val, Ver); err != rpc.OK { + t.Fatalf("Put err %v", err) + } + + if val, ver, err := ck.Get("k"); err != rpc.OK { + t.Fatalf("Get err %v; expected OK", err) + } else if val != Val { + t.Fatalf("Get value err %v; expected %v", val, Val) + } else if ver != Ver+1 { + t.Fatalf("Get wrong version %v; expected %v", ver, Ver+1) + } + + if err := ck.Put("k", Val, 0); err != rpc.ErrVersion { + t.Fatalf("expected Put to fail with ErrVersion; got err=%v", err) + } + + if err := ck.Put("y", Val, rpc.Tversion(1)); err != rpc.ErrNoKey { + t.Fatalf("expected Put to fail with ErrNoKey; got err=%v", err) + } + + if _, _, err := ck.Get("y"); err != rpc.ErrNoKey { + t.Fatalf("expected Get to fail with ErrNoKey; got err=%v", err) + } +} + +// Many clients putting on same key. +func TestPutConcurrentReliable(t *testing.T) { + const ( + PORCUPINETIME = 10 * time.Second + NCLNT = 10 + NSEC = 1 + ) + + ts := MakeTestKV(t, true) + defer ts.Cleanup() + + ts.Begin("Test: many clients racing to put values to the same key") + + rs := ts.SpawnClientsAndWait(NCLNT, NSEC*time.Second, func(me int, ck kvtest.IKVClerk, done chan struct{}) kvtest.ClntRes { + return ts.OneClientPut(me, ck, []string{"k"}, done) + }) + ck := ts.MakeClerk() + ts.CheckPutConcurrent(ck, "k", rs, &kvtest.ClntRes{}) + ts.CheckPorcupineT(PORCUPINETIME) +} + +// Check if memory used on server is reasonable +func TestMemPutManyClientsReliable(t *testing.T) { + const ( + NCLIENT = 100_000 + MEM = 1000 + ) + + ts := MakeTestKV(t, true) + defer ts.Cleanup() + + v := kvtest.RandValue(MEM) + + cks := make([]kvtest.IKVClerk, NCLIENT) + for i, _ := range cks { + cks[i] = ts.MakeClerk() + } + + // force allocation of ends for server in each client + for i := 0; i < NCLIENT; i++ { + if err := cks[i].Put("k", "", 1); err != rpc.ErrNoKey { + t.Fatalf("Put failed %v", err) + } + } + + ts.Begin("Test: memory use many put clients") + + // allow threads started by labrpc to start + time.Sleep(1 * time.Second) + + runtime.GC() + runtime.GC() + + var st runtime.MemStats + runtime.ReadMemStats(&st) + m0 := st.HeapAlloc + + for i := 0; i < NCLIENT; i++ { + if err := cks[i].Put("k", v, rpc.Tversion(i)); err != rpc.OK { + t.Fatalf("Put failed %v", err) + } + } + + runtime.GC() + time.Sleep(1 * time.Second) + runtime.GC() + + runtime.ReadMemStats(&st) + m1 := st.HeapAlloc + f := (float64(m1) - float64(m0)) / NCLIENT + if m1 > m0+(NCLIENT*200) { + t.Fatalf("error: server using too much memory %d %d (%.2f per client)\n", m0, m1, f) + } +} + +// Test with one client and unreliable network. If Clerk.Put returns +// ErrMaybe, the Put must have happened, since the test uses only one +// client. +func TestUnreliableNet(t *testing.T) { + const NTRY = 100 + + ts := MakeTestKV(t, false) + defer ts.Cleanup() + + ts.Begin("One client") + + ck := ts.MakeClerk() + + retried := false + for try := 0; try < NTRY; try++ { + for i := 0; true; i++ { + if err := ts.PutJson(ck, "k", i, rpc.Tversion(try), 0); err != rpc.ErrMaybe { + if i > 0 && err != rpc.ErrVersion { + t.Fatalf("Put shouldn't have happen more than once %v", err) + } + break + } + // Try put again; it should fail with ErrVersion + retried = true + } + v := 0 + if ver := ts.GetJson(ck, "k", 0, &v); ver != rpc.Tversion(try+1) { + t.Fatalf("Wrong version %d expect %d", ver, try+1) + } + if v != 0 { + t.Fatalf("Wrong value %d expect %d", v, 0) + } + } + if !retried { + t.Fatalf("Clerk.Put never returned ErrMaybe") + } + + ts.CheckPorcupine() +} diff --git a/src/shardkv1/kvsrv1/lock/lock.go b/src/shardkv1/kvsrv1/lock/lock.go new file mode 100644 index 0000000..f788304 --- /dev/null +++ b/src/shardkv1/kvsrv1/lock/lock.go @@ -0,0 +1,31 @@ +package lock + +import ( + + "6.5840/kvsrv1/rpc" + "6.5840/shardkv1/kvsrv1" + "6.5840/shardkv1/shardctrler/param" +) + + +type Lock struct { + ck *kvsrv.Clerk + +} + +// Use l as the key to store the "lock state" (you would have to decide +// precisely what the lock state is). +func MakeLock(ck kvtest.IKVClerk, l string) *Lock { + lk := &Lock{ck: ck.(*kvsrv.Clerk)} + // You may add code here + return lk +} + + +func (lk *Lock) Acquire() { + // You may add code here. +} + +func (lk *Lock) Release() { + // You may add code here. +} diff --git a/src/shardkv1/kvsrv1/lock/lock_test.go b/src/shardkv1/kvsrv1/lock/lock_test.go new file mode 100644 index 0000000..92c9e67 --- /dev/null +++ b/src/shardkv1/kvsrv1/lock/lock_test.go @@ -0,0 +1,89 @@ +package lock + +import ( + "fmt" + // "log" + "strconv" + "testing" + "time" + + "6.5840/kvsrv1" + "6.5840/kvsrv1/rpc" + "6.5840/kvtest1" +) + +const ( + NACQUIRE = 10 + NCLNT = 10 + NSEC = 2 +) + +func oneClient(t *testing.T, me int, ck kvtest.IKVClerk, done chan struct{}) kvtest.ClntRes { + lk := MakeLock(ck, "l") + ck.Put("l0", "", 0) + for i := 1; true; i++ { + select { + case <-done: + return kvtest.ClntRes{i, 0} + default: + lk.Acquire() + + // log.Printf("%d: acquired lock", me) + + b := strconv.Itoa(me) + val, ver, err := ck.Get("l0") + if err == rpc.OK { + if val != "" { + t.Fatalf("%d: two clients acquired lock %v", me, val) + } + } else { + t.Fatalf("%d: get failed %v", me, err) + } + + err = ck.Put("l0", string(b), ver) + if !(err == rpc.OK || err == rpc.ErrMaybe) { + t.Fatalf("%d: put failed %v", me, err) + } + + time.Sleep(10 * time.Millisecond) + + err = ck.Put("l0", "", ver+1) + if !(err == rpc.OK || err == rpc.ErrMaybe) { + t.Fatalf("%d: put failed %v", me, err) + } + + // log.Printf("%d: release lock", me) + + lk.Release() + } + } + return kvtest.ClntRes{} +} + +// Run test clients +func runClients(t *testing.T, nclnt int, reliable bool) { + ts := kvsrv.MakeTestKV(t, reliable) + defer ts.Cleanup() + + ts.Begin(fmt.Sprintf("Test: %d lock clients", nclnt)) + + ts.SpawnClientsAndWait(nclnt, NSEC*time.Second, func(me int, myck kvtest.IKVClerk, done chan struct{}) kvtest.ClntRes { + return oneClient(t, me, myck, done) + }) +} + +func TestOneClientReliable(t *testing.T) { + runClients(t, 1, true) +} + +func TestManyClientsReliable(t *testing.T) { + runClients(t, NCLNT, true) +} + +func TestOneClientUnreliable(t *testing.T) { + runClients(t, 1, false) +} + +func TestManyClientsUnreliable(t *testing.T) { + runClients(t, NCLNT, false) +} diff --git a/src/shardkv1/kvsrv1/server.go b/src/shardkv1/kvsrv1/server.go new file mode 100644 index 0000000..83c256a --- /dev/null +++ b/src/shardkv1/kvsrv1/server.go @@ -0,0 +1,48 @@ +package kvsrv + +import ( + "sync" + + + "6.5840/kvsrv1/rpc" + "6.5840/labrpc" + "6.5840/tester1" +) + + +type KVServer struct { + mu sync.Mutex + + // Your definitions here. +} + +func MakeKVServer() *KVServer { + kv := &KVServer{} + // Your code here. + return kv +} + +// Get returns the value and version for args.Key, if args.Key +// exists. Otherwise, Get returns ErrNoKey. +func (kv *KVServer) Get(args *rpc.GetArgs, reply *rpc.GetReply) { + // Your code here. +} + +// Update the value for a key if args.Version matches the version of +// the key on the server. If versions don't match, return ErrVersion. +// If the key doesn't exist, Put installs the value if the +// args.Version is 0, and returns ErrNoKey otherwise. +func (kv *KVServer) Put(args *rpc.PutArgs, reply *rpc.PutReply) { + // Your code here. +} + +// You can ignore Kill() for this lab +func (kv *KVServer) Kill() { +} + + +// You can ignore all arguments; they are for replicated KVservers +func StartKVServer(ends []*labrpc.ClientEnd, gid tester.Tgid, srv int, persister *tester.Persister) []tester.IService { + kv := MakeKVServer() + return []tester.IService{kv} +} diff --git a/src/shardkv1/kvsrv1/test.go b/src/shardkv1/kvsrv1/test.go new file mode 100644 index 0000000..bd8269b --- /dev/null +++ b/src/shardkv1/kvsrv1/test.go @@ -0,0 +1,36 @@ +package kvsrv + +import ( + // "log" + "testing" + + "6.5840/kvtest1" + "6.5840/tester1" +) + +type TestKV struct { + *kvtest.Test + t *testing.T + reliable bool +} + +func MakeTestKV(t *testing.T, reliable bool) *TestKV { + cfg := tester.MakeConfig(t, 1, reliable, StartKVServer) + ts := &TestKV{ + t: t, + reliable: reliable, + } + ts.Test = kvtest.MakeTest(t, cfg, false, ts) + return ts +} + +func (ts *TestKV) MakeClerk() kvtest.IKVClerk { + clnt := ts.Config.MakeClient() + ck := MakeClerk(clnt, tester.ServerName(tester.GRP0, 0)) + return &kvtest.TestClerk{ck, clnt} +} + +func (ts *TestKV) DeleteClerk(ck kvtest.IKVClerk) { + tck := ck.(*kvtest.TestClerk) + ts.DeleteClient(tck.Clnt) +} diff --git a/src/shardkv1/shardcfg/shardcfg.go b/src/shardkv1/shardcfg/shardcfg.go new file mode 100644 index 0000000..b7f97fa --- /dev/null +++ b/src/shardkv1/shardcfg/shardcfg.go @@ -0,0 +1,284 @@ +package shardcfg + +import ( + "encoding/json" + "hash/fnv" + "log" + "runtime/debug" + "slices" + "testing" + + "6.5840/tester1" +) + +type Tshid int +type Tnum int + +const ( + NShards = 12 // The number of shards. + NumFirst = Tnum(1) +) + +const ( + Gid1 = tester.Tgid(1) +) + +// which shard is a key in? +// please use this function, +// and please do not change it. +func Key2Shard(key string) Tshid { + h := fnv.New32a() + h.Write([]byte(key)) + shard := Tshid(Tshid(h.Sum32()) % NShards) + return shard +} + +// A configuration -- an assignment of shards to groups. +// Please don't change this. +type ShardConfig struct { + Num Tnum // config number + Shards [NShards]tester.Tgid // shard -> gid + Groups map[tester.Tgid][]string // gid -> servers[] +} + +func MakeShardConfig() *ShardConfig { + c := &ShardConfig{ + Groups: make(map[tester.Tgid][]string), + } + return c +} + +func (cfg *ShardConfig) String() string { + b, err := json.Marshal(cfg) + if err != nil { + log.Fatalf("Unmarshall err %v", err) + } + return string(b) +} + +func FromString(s string) *ShardConfig { + scfg := &ShardConfig{} + if err := json.Unmarshal([]byte(s), scfg); err != nil { + log.Fatalf("Unmarshall err %v", err) + } + return scfg +} + +func (cfg *ShardConfig) Copy() *ShardConfig { + c := MakeShardConfig() + c.Num = cfg.Num + c.Shards = cfg.Shards + for k, srvs := range cfg.Groups { + s := make([]string, len(srvs)) + copy(s, srvs) + c.Groups[k] = s + } + return c +} + +// mostgroup, mostn, leastgroup, leastn +func analyze(c *ShardConfig) (tester.Tgid, int, tester.Tgid, int) { + counts := map[tester.Tgid]int{} + for _, g := range c.Shards { + counts[g] += 1 + } + + mn := -1 + var mg tester.Tgid = -1 + ln := 257 + var lg tester.Tgid = -1 + // Enforce deterministic ordering, map iteration + // is randomized in go + groups := make([]tester.Tgid, len(c.Groups)) + i := 0 + for k := range c.Groups { + groups[i] = k + i++ + } + slices.Sort(groups) + for _, g := range groups { + if counts[g] < ln { + ln = counts[g] + lg = g + } + if counts[g] > mn { + mn = counts[g] + mg = g + } + } + + return mg, mn, lg, ln +} + +// return GID of group with least number of +// assigned shards. +func least(c *ShardConfig) tester.Tgid { + _, _, lg, _ := analyze(c) + return lg +} + +// balance assignment of shards to groups. +// modifies c. +func (c *ShardConfig) Rebalance() { + // if no groups, un-assign all shards + if len(c.Groups) < 1 { + for s, _ := range c.Shards { + c.Shards[s] = 0 + } + return + } + + // assign all unassigned shards + for s, g := range c.Shards { + _, ok := c.Groups[g] + if ok == false { + lg := least(c) + c.Shards[s] = lg + } + } + + // move shards from most to least heavily loaded + for { + mg, mn, lg, ln := analyze(c) + if mn < ln+2 { + break + } + // move 1 shard from mg to lg + for s, g := range c.Shards { + if g == mg { + c.Shards[s] = lg + break + } + } + } +} + +func (cfg *ShardConfig) Join(servers map[tester.Tgid][]string) bool { + changed := false + for gid, servers := range servers { + _, ok := cfg.Groups[gid] + if ok { + log.Printf("re-Join %v", gid) + return false + } + for xgid, xservers := range cfg.Groups { + for _, s1 := range xservers { + for _, s2 := range servers { + if s1 == s2 { + log.Fatalf("Join(%v) puts server %v in groups %v and %v", gid, s1, xgid, gid) + } + } + } + } + // new GID + // modify cfg to reflect the Join() + cfg.Groups[gid] = servers + changed = true + } + if changed == false { + log.Fatalf("Join but no change") + } + cfg.Num += 1 + return true +} + +func (cfg *ShardConfig) Leave(gids []tester.Tgid) bool { + changed := false + for _, gid := range gids { + _, ok := cfg.Groups[gid] + if ok == false { + // already no GID! + log.Printf("Leave(%v) but not in config", gid) + return false + } else { + // modify op.Config to reflect the Leave() + delete(cfg.Groups, gid) + changed = true + } + } + if changed == false { + debug.PrintStack() + log.Fatalf("Leave but no change") + } + cfg.Num += 1 + return true +} + +func (cfg *ShardConfig) JoinBalance(servers map[tester.Tgid][]string) bool { + if !cfg.Join(servers) { + return false + } + cfg.Rebalance() + return true +} + +func (cfg *ShardConfig) LeaveBalance(gids []tester.Tgid) bool { + if !cfg.Leave(gids) { + return false + } + cfg.Rebalance() + return true +} + +func (cfg *ShardConfig) GidServers(sh Tshid) (tester.Tgid, []string, bool) { + gid := cfg.Shards[sh] + srvs, ok := cfg.Groups[gid] + return gid, srvs, ok +} + +func (cfg *ShardConfig) IsMember(gid tester.Tgid) bool { + for _, g := range cfg.Shards { + if g == gid { + return true + } + } + return false +} + +func (cfg *ShardConfig) CheckConfig(t *testing.T, groups []tester.Tgid) { + if len(cfg.Groups) != len(groups) { + fatalf(t, "wanted %v groups, got %v", len(groups), len(cfg.Groups)) + } + + // are the groups as expected? + for _, g := range groups { + _, ok := cfg.Groups[g] + if ok != true { + fatalf(t, "missing group %v", g) + } + } + + // any un-allocated shards? + if len(groups) > 0 { + for s, g := range cfg.Shards { + _, ok := cfg.Groups[g] + if ok == false { + fatalf(t, "shard %v -> invalid group %v", s, g) + } + } + } + + // more or less balanced sharding? + counts := map[tester.Tgid]int{} + for _, g := range cfg.Shards { + counts[g] += 1 + } + min := 257 + max := 0 + for g, _ := range cfg.Groups { + if counts[g] > max { + max = counts[g] + } + if counts[g] < min { + min = counts[g] + } + } + if max > min+1 { + fatalf(t, "max %v too much larger than min %v", max, min) + } +} + +func fatalf(t *testing.T, format string, args ...any) { + debug.PrintStack() + t.Fatalf(format, args...) +} diff --git a/src/shardkv1/shardcfg/shardcfg_test.go b/src/shardkv1/shardcfg/shardcfg_test.go new file mode 100644 index 0000000..4d45204 --- /dev/null +++ b/src/shardkv1/shardcfg/shardcfg_test.go @@ -0,0 +1,62 @@ +package shardcfg + +import ( + "testing" + + "6.5840/tester1" +) + +func check_same_config(t *testing.T, c1 ShardConfig, c2 ShardConfig) { + if c1.Num != c2.Num { + t.Fatalf("Num wrong") + } + if c1.Shards != c2.Shards { + t.Fatalf("Shards wrong") + } + if len(c1.Groups) != len(c2.Groups) { + t.Fatalf("number of Groups is wrong") + } + for gid, sa := range c1.Groups { + sa1, ok := c2.Groups[gid] + if ok == false || len(sa1) != len(sa) { + t.Fatalf("len(Groups) wrong") + } + if ok && len(sa1) == len(sa) { + for j := 0; j < len(sa); j++ { + if sa[j] != sa1[j] { + t.Fatalf("Groups wrong") + } + } + } + } +} + +func TestBasic(t *testing.T) { + const ( + Gid1 = 1 + Gid2 = 2 + ) + cfg := MakeShardConfig() + cfg.CheckConfig(t, []tester.Tgid{}) + + cfg.JoinBalance(map[tester.Tgid][]string{Gid1: []string{"x", "y", "z"}}) + cfg.CheckConfig(t, []tester.Tgid{Gid1}) + + cfg.JoinBalance(map[tester.Tgid][]string{Gid2: []string{"a", "b", "c"}}) + cfg.CheckConfig(t, []tester.Tgid{Gid1, Gid2}) + + sa1 := cfg.Groups[Gid1] + if len(sa1) != 3 || sa1[0] != "x" || sa1[1] != "y" || sa1[2] != "z" { + t.Fatalf("wrong servers for gid %v: %v\n", Gid1, sa1) + } + sa2 := cfg.Groups[Gid2] + if len(sa2) != 3 || sa2[0] != "a" || sa2[1] != "b" || sa2[2] != "c" { + t.Fatalf("wrong servers for gid %v: %v\n", Gid2, sa2) + } + + cfg.LeaveBalance([]tester.Tgid{Gid1}) + cfg.CheckConfig(t, []tester.Tgid{Gid2}) + + cfg.LeaveBalance([]tester.Tgid{Gid2}) + cfg.CheckConfig(t, []tester.Tgid{}) +} diff --git a/src/shardkv1/shardctrler/param/param.go b/src/shardkv1/shardctrler/param/param.go new file mode 100644 index 0000000..8df0627 --- /dev/null +++ b/src/shardkv1/shardctrler/param/param.go @@ -0,0 +1,7 @@ +package param + +const ( + // Length of time that a lease is valid. If a client doesn't + // refresh the lease within this time, the lease will expire. + LEASETIMESEC = 3 +) diff --git a/src/shardkv1/shardctrler/shardctrler.go b/src/shardkv1/shardctrler/shardctrler.go new file mode 100644 index 0000000..516bdd9 --- /dev/null +++ b/src/shardkv1/shardctrler/shardctrler.go @@ -0,0 +1,87 @@ +package shardctrler + +// +// Shardctrler with InitConfig, Query, and ChangeConfigTo methods +// + +import ( + + "sync/atomic" + + "6.5840/kvsrv1/rpc" + "6.5840/kvtest1" + "6.5840/shardkv1/kvsrv1" + "6.5840/shardkv1/shardcfg" + "6.5840/tester1" +) + + +// ShardCtrler for the controller and kv clerk. +type ShardCtrler struct { + clnt *tester.Clnt + kvtest.IKVClerk + + killed int32 // set by Kill() + leases bool + + // Your data here. +} + +// Make a ShardCltler, which stores its state in a kvsrv. +func MakeShardCtrler(clnt *tester.Clnt, leases bool) *ShardCtrler { + sck := &ShardCtrler{clnt: clnt, leases: leases} + srv := tester.ServerName(tester.GRP0, 0) + sck.IKVClerk = kvsrv.MakeClerk(clnt, srv) + // Your code here. + return sck +} + +// The tester calls InitController() before starting a new +// controller. In this method you can implement recovery (part B) and +// use a lock to become leader (part C). InitController may fail when +// another controller supersedes (e.g., when this controller is +// partitioned during recovery). +func (sck *ShardCtrler) InitController() rpc.Err { + return rpc.ErrNoKey +} + +// The tester calls ExitController to exit a controller. In part B and +// C, release lock. +func (sck *ShardCtrler) ExitController() { +} + +// Called once by the tester to supply the first configuration. You +// can marshal ShardConfig into a string using shardcfg.String(), and +// then Put it in the kvsrv for the controller at version 0. You can +// pick the key to name the configuration. +func (sck *ShardCtrler) InitConfig(cfg *shardcfg.ShardConfig) { + // Your code here +} + +// Called by the tester to ask the controller to change the +// configuration from the current one to new. It may return an error +// if this controller is disconnected for a while and another +// controller takes over in the mean time, as in part C. +func (sck *ShardCtrler) ChangeConfigTo(new *shardcfg.ShardConfig) rpc.Err { + return rpc.OK +} + +// Tester "kills" shardctrler by calling Kill(). For your +// convenience, we also supply isKilled() method to test killed in +// loops. +func (sck *ShardCtrler) Kill() { + atomic.StoreInt32(&sck.killed, 1) +} + +func (sck *ShardCtrler) isKilled() bool { + z := atomic.LoadInt32(&sck.killed) + return z == 1 +} + + +// Return the current configuration +func (sck *ShardCtrler) Query() (*shardcfg.ShardConfig, rpc.Tversion) { + // Your code here. + return nil, 0 +} + diff --git a/src/shardkv1/shardgrp/client.go b/src/shardkv1/shardgrp/client.go new file mode 100644 index 0000000..e99325e --- /dev/null +++ b/src/shardkv1/shardgrp/client.go @@ -0,0 +1,42 @@ +package shardgrp + +import ( + + + "6.5840/kvsrv1/rpc" + "6.5840/shardkv1/shardcfg" + "6.5840/tester1" +) + +type Clerk struct { + clnt *tester.Clnt + servers []string + leader int // last successful leader (index into servers[]) +} + +func MakeClerk(clnt *tester.Clnt, servers []string) *Clerk { + ck := &Clerk{clnt: clnt, servers: servers} + return ck +} + +func (ck *Clerk) Get(key string, n shardcfg.Tnum) (string, rpc.Tversion, rpc.Err) { + // Your code here + return "", 0, "" +} + +func (ck *Clerk) Put(key string, value string, version rpc.Tversion, n shardcfg.Tnum) (bool, rpc.Err) { + // Your code here + return false, "" +} + +func (ck *Clerk) Freeze(s shardcfg.Tshid, num shardcfg.Tnum) ([]byte, rpc.Err) { + return nil, "" +} + +func (ck *Clerk) InstallShard(s shardcfg.Tshid, state []byte, num shardcfg.Tnum) rpc.Err { + return "" +} + +func (ck *Clerk) Delete(s shardcfg.Tshid, num shardcfg.Tnum) rpc.Err { + return "" +} diff --git a/src/shardkv1/shardgrp/server.go b/src/shardkv1/shardgrp/server.go new file mode 100644 index 0000000..c6a8e38 --- /dev/null +++ b/src/shardkv1/shardgrp/server.go @@ -0,0 +1,102 @@ +package shardgrp + +import ( + "sync/atomic" + + + "6.5840/kvraft1/rsm" + "6.5840/kvsrv1/rpc" + "6.5840/labgob" + "6.5840/labrpc" + "6.5840/shardkv1/shardgrp/shardrpc" + "6.5840/tester1" +) + + +type KVServer struct { + gid tester.Tgid + me int + dead int32 // set by Kill() + rsm *rsm.RSM + frozen bool // for testing purposes + +} + + +func (kv *KVServer) DoOp(req any) any { + // Your code here + return nil +} + + +func (kv *KVServer) Snapshot() []byte { + // Your code here + return nil +} + +func (kv *KVServer) Restore(data []byte) { + // Your code here +} + +func (kv *KVServer) Get(args *shardrpc.GetArgs, reply *rpc.GetReply) { + // Your code here +} + +func (kv *KVServer) Put(args *shardrpc.PutArgs, reply *rpc.PutReply) { + // Your code here +} + +// Freeze the specified shard (i.e., reject future Get/Puts for this +// shard) and return the key/values stored in that shard. +func (kv *KVServer) Freeze(args *shardrpc.FreezeArgs, reply *shardrpc.FreezeReply) { + // Your code here +} + +// Install the supplied state for the specified shard. +func (kv *KVServer) InstallShard(args *shardrpc.InstallShardArgs, reply *shardrpc.InstallShardReply) { + // Your code here +} + +// Delete the specified shard. +func (kv *KVServer) Delete(args *shardrpc.DeleteShardArgs, reply *shardrpc.DeleteShardReply) { + // Your code here +} + +// the tester calls Kill() when a KVServer instance won't +// be needed again. for your convenience, we supply +// code to set rf.dead (without needing a lock), +// and a killed() method to test rf.dead in +// long-running loops. you can also add your own +// code to Kill(). you're not required to do anything +// about this, but it may be convenient (for example) +// to suppress debug output from a Kill()ed instance. +func (kv *KVServer) Kill() { + atomic.StoreInt32(&kv.dead, 1) + // Your code here, if desired. +} + +func (kv *KVServer) killed() bool { + z := atomic.LoadInt32(&kv.dead) + return z == 1 +} + +// StartShardServerGrp starts a server for shardgrp `gid`. +// +// StartShardServerGrp() and MakeRSM() must return quickly, so they should +// start goroutines for any long-running work. +func StartServerShardGrp(servers []*labrpc.ClientEnd, gid tester.Tgid, me int, persister *tester.Persister, maxraftstate int) []tester.IService { + // call labgob.Register on structures you want + // Go's RPC library to marshall/unmarshall. + labgob.Register(shardrpc.PutArgs{}) + labgob.Register(shardrpc.GetArgs{}) + labgob.Register(shardrpc.FreezeArgs{}) + labgob.Register(shardrpc.InstallShardArgs{}) + labgob.Register(shardrpc.DeleteShardArgs{}) + labgob.Register(rsm.Op{}) + + kv := &KVServer{gid: gid, me: me} + kv.rsm = rsm.MakeRSM(servers, me, persister, maxraftstate, kv) + + // Your code here + return []tester.IService{kv, kv.rsm.Raft()} +} diff --git a/src/shardkv1/shardgrp/shardrpc/shardrpc.go b/src/shardkv1/shardgrp/shardrpc/shardrpc.go new file mode 100644 index 0000000..03f83a1 --- /dev/null +++ b/src/shardkv1/shardgrp/shardrpc/shardrpc.go @@ -0,0 +1,50 @@ +package shardrpc + +import ( + "6.5840/kvsrv1/rpc" + "6.5840/shardkv1/shardcfg" +) + +// Same as Put in kvsrv1/rpc, but with a configuration number +type PutArgs struct { + Key string + Value string + Version rpc.Tversion + Num shardcfg.Tnum +} + +// Same as Get in kvsrv1/rpc, but with a configuration number. +type GetArgs struct { + Key string + Num shardcfg.Tnum +} + +type FreezeArgs struct { + Shard shardcfg.Tshid + Num shardcfg.Tnum +} + +type FreezeReply struct { + State []byte + Num shardcfg.Tnum + Err rpc.Err +} + +type InstallShardArgs struct { + Shard shardcfg.Tshid + State []byte + Num shardcfg.Tnum +} + +type InstallShardReply struct { + Err rpc.Err +} + +type DeleteShardArgs struct { + Shard shardcfg.Tshid + Num shardcfg.Tnum +} + +type DeleteShardReply struct { + Err rpc.Err +} diff --git a/src/shardkv1/shardkv_test.go b/src/shardkv1/shardkv_test.go new file mode 100644 index 0000000..74eac41 --- /dev/null +++ b/src/shardkv1/shardkv_test.go @@ -0,0 +1,818 @@ +package shardkv + +import ( + "log" + "testing" + "time" + + "6.5840/kvsrv1/rpc" + "6.5840/kvtest1" + "6.5840/shardkv1/shardcfg" + "6.5840/shardkv1/shardctrler" + "6.5840/shardkv1/shardctrler/param" + "6.5840/tester1" +) + +const ( + NGRP = 8 + NKEYS = 5 * shardcfg.NShards +) + +// Test shard controller's Init and Query with a key/value server from +// kvsrv1 lab. +func TestInitQuery5A(t *testing.T) { + + // MakeTest starts a key/value server using `kvsrv.StartKVServer`, + // which is defined in shardkv1/kvsrv1. + ts := MakeTest(t, "Test (5A): Init and Query ...", true) + defer ts.Cleanup() + + // Make a shard controller + sck := shardctrler.MakeShardCtrler(ts.Config.MakeClient(), ts.leases) + + // Make an empty shard configuration + scfg := shardcfg.MakeShardConfig() + + // Compute a new shard configuration as if `shardcfg.Gid1` joins the cluster, + // assigning all shards to `shardcfg.Gid1`. + scfg.JoinBalance(map[tester.Tgid][]string{shardcfg.Gid1: []string{"xxx"}}) + + // Invoke the controller to initialize to store the first configuration + sck.InitConfig(scfg) + + // Read the initial configuration and check it + cfg, v := sck.Query() + if v != 1 || cfg.Num != 1 || cfg.Shards[0] != shardcfg.Gid1 { + ts.t.Fatalf("Static wrong %v %v", cfg, v) + } + cfg.CheckConfig(t, []tester.Tgid{shardcfg.Gid1}) +} + +// Test shardkv clerk's Get/Put with 1 shardgrp (without reconfiguration) +func TestStaticOneShardGroup5A(t *testing.T) { + ts := MakeTest(t, "Test (5A): one shard group ...", true) + defer ts.Cleanup() + + // The tester's setupKVService() sets up a kvsrv for the + // controller to store configurations and calls the controller's + // Init() method to create the first configuration with 1 + // shardgrp. + ts.setupKVService() + + ck := ts.MakeClerk() // make a shardkv clerk + ka, va := ts.SpreadPuts(ck, NKEYS) // do some puts + n := len(ka) + for i := 0; i < n; i++ { + ts.CheckGet(ck, ka[i], va[i], rpc.Tversion(1)) // check the puts + } + + // disconnect raft leader of shardgrp and check that keys are + // still avaialable + ts.disconnectClntFromLeader(ck.(*kvtest.TestClerk).Clnt, shardcfg.Gid1) + + for i := 0; i < n; i++ { + ts.CheckGet(ck, ka[i], va[i], rpc.Tversion(1)) // check the puts + } +} + +// test shardctrler's join, which adds a new group Gid2 and must move +// shards to the new group and the old group should reject Get/Puts on +// shards that moved. +func TestJoinBasic5A(t *testing.T) { + ts := MakeTest(t, "Test (5A): a group joins...", true) + defer ts.Cleanup() + + gid1 := ts.setupKVService() + ck := ts.MakeClerk() + ka, va := ts.SpreadPuts(ck, NKEYS) + + sck := ts.ShardCtrler() + cfg, _ := sck.Query() + + gid2 := ts.newGid() + err := ts.joinGroups(sck, []tester.Tgid{gid2}) + if err != rpc.OK { + ts.t.Fatalf("joinGroups: err %v", err) + } + + cfg1, _ := sck.Query() + if cfg.Num+1 != cfg1.Num { + ts.t.Fatalf("wrong num %d expected %d ", cfg1.Num, cfg.Num+1) + } + + if !cfg1.IsMember(gid2) { + ts.t.Fatalf("%d isn't a member of %v", gid2, cfg1) + } + + // check shards at shardcfg.Gid2 + ts.checkShutdownSharding(gid1, gid2, ka, va) + + for i := 0; i < len(ka); i++ { + ts.CheckGet(ck, ka[i], va[i], rpc.Tversion(1)) + } + + // check shards at shardcfg.Gid1 + ts.checkShutdownSharding(gid2, gid1, ka, va) + + for i := 0; i < len(ka); i++ { + ts.CheckGet(ck, ka[i], va[i], rpc.Tversion(1)) + } +} + +// test shardgrps delete moved shards +func TestDeleteBasic5A(t *testing.T) { + const ( + MAXRAFTSTATE = 1000 + VALUESIZE = 10000 + ) + + ts := MakeTestMaxRaft(t, "Test (5A): delete ...", true, false, VALUESIZE) + defer ts.Cleanup() + + gid1 := ts.setupKVService() + ck := ts.MakeClerk() + + ka, va := ts.SpreadPutsSize(ck, NKEYS, MAXRAFTSTATE) + + sz := ts.Group(gid1).SnapshotSize() + + sck := ts.ShardCtrler() + gid2 := ts.newGid() + err := ts.joinGroups(sck, []tester.Tgid{gid2}) + if err != rpc.OK { + ts.t.Fatalf("joinGroups: err %v", err) + } + + // push more Get's through so that all peers snapshot + for j := 0; j < 5; j++ { + for i := 0; i < len(ka); i++ { + ts.CheckGet(ck, ka[i], va[i], rpc.Tversion(1)) + } + } + sz1 := ts.Group(gid1).SnapshotSize() + sz2 := ts.Group(gid2).SnapshotSize() + if sz1+sz2 > sz+10000 { + ts.t.Fatalf("gid1 %d + gid2 %d = %d use too much space %d", sz1, sz2, sz1+sz2, sz) + } +} + +// test shardctrler's leave +func TestJoinLeaveBasic5A(t *testing.T) { + ts := MakeTest(t, "Test (5A): basic groups join/leave ...", true) + defer ts.Cleanup() + + gid1 := ts.setupKVService() + ck := ts.MakeClerk() + ka, va := ts.SpreadPuts(ck, NKEYS) + + sck := ts.ShardCtrler() + gid2 := ts.newGid() + err := ts.joinGroups(sck, []tester.Tgid{gid2}) + if err != rpc.OK { + ts.t.Fatalf("joinGroups: err %v", err) + } + + // check shards at shardcfg.Gid2 + ts.checkShutdownSharding(gid1, gid2, ka, va) + + for i := 0; i < len(ka); i++ { + ts.CheckGet(ck, ka[i], va[i], rpc.Tversion(1)) + } + + err = ts.leave(sck, shardcfg.Gid1) + if err != rpc.OK { + ts.t.Fatalf("Leave: err %v", err) + } + cfg, _ := sck.Query() + if cfg.IsMember(shardcfg.Gid1) { + ts.t.Fatalf("%d is a member of %v", shardcfg.Gid1, cfg) + } + + ts.Group(shardcfg.Gid1).Shutdown() + + for i := 0; i < len(ka); i++ { + ts.CheckGet(ck, ka[i], va[i], rpc.Tversion(1)) + } + + // bring the crashed shard/group back to life. + ts.Group(shardcfg.Gid1).StartServers() + + // Rejoin + ts.join(sck, shardcfg.Gid1, ts.Group(shardcfg.Gid1).SrvNames()) + + for i := 0; i < len(ka); i++ { + ts.CheckGet(ck, ka[i], va[i], rpc.Tversion(1)) + } + + // check shards at shardcfg.Gid2 + ts.checkShutdownSharding(gid2, gid1, ka, va) +} + +// test many groups joining and leaving, reliable or unreliable +func joinLeave5A(t *testing.T, reliable bool, part string) { + ts := MakeTest(t, "Test (5A): many groups join/leave ...", reliable) + defer ts.Cleanup() + + ts.setupKVService() + ck := ts.MakeClerk() + ka, va := ts.SpreadPuts(ck, NKEYS) + + sck := ts.ShardCtrler() + grps := ts.groups(NGRP) + + ts.joinGroups(sck, grps) + + ts.checkShutdownSharding(grps[0], grps[1], ka, va) + + for i := 0; i < len(ka); i++ { + ts.CheckGet(ck, ka[i], va[i], rpc.Tversion(1)) + } + + ts.leaveGroups(sck, grps) + + for i := 0; i < len(ka); i++ { + ts.CheckGet(ck, ka[i], va[i], rpc.Tversion(1)) + } +} + +func TestManyJoinLeaveReliable5A(t *testing.T) { + joinLeave5A(t, true, "Test (5A): many groups join/leave reliable...") +} + +func TestManyJoinLeaveUnreliable5A(t *testing.T) { + joinLeave5A(t, false, "Test (5A): many groups join/leave unreliable...") +} + +// Test recovery from complete shutdown +func TestShutdown5A(t *testing.T) { + const NJOIN = 2 + const NGRP = 2 + NJOIN + + ts := MakeTest(t, "Test (5A): shutdown ...", true) + defer ts.Cleanup() + + ts.setupKVService() + + ck := ts.MakeClerk() + ka, va := ts.SpreadPuts(ck, NKEYS) + + sck := ts.ShardCtrler() + grps := ts.groups(NJOIN) + ts.joinGroups(sck, grps) + + ts.checkShutdownSharding(grps[0], grps[1], ka, va) + + for i := 0; i < len(ka); i++ { + ts.CheckGet(ck, ka[i], va[i], rpc.Tversion(1)) + } + + for i := shardcfg.Gid1; i < NGRP; i++ { + ts.Group(i).Shutdown() + } + + for i := shardcfg.Gid1; i < NGRP; i++ { + ts.Group(i).StartServers() + } + + for i := 0; i < len(ka); i++ { + ts.CheckGet(ck, ka[i], va[i], rpc.Tversion(1)) + } +} + +// Test that Gets for keys at groups that are alive +// return +func TestProgressShutdown(t *testing.T) { + const ( + NJOIN = 4 + NSEC = 2 + ) + + ts := MakeTest(t, "Test (5A): progress ...", true) + defer ts.Cleanup() + + ts.setupKVService() + + ck := ts.MakeClerk() + ka, va := ts.SpreadPuts(ck, NKEYS) + + sck := ts.ShardCtrler() + grps := ts.groups(NJOIN) + ts.joinGroups(sck, grps) + + end := 2 + for _, g := range grps[0:2] { + //log.Printf("shutdown %d", g) + ts.Group(g).Shutdown() + } + + alive := make(map[tester.Tgid]bool) + for _, g := range grps[end:] { + alive[g] = true + } + + cfg, _ := sck.Query() + + ch := make(chan rpc.Err) + go func() { + for i := 0; i < len(ka); i++ { + s := shardcfg.Key2Shard(ka[i]) + g := cfg.Shards[s] + if _, ok := alive[g]; ok { + //log.Printf("key lookup %v(%d) gid %d", ka[i], s, g) + ts.CheckGet(ck, ka[i], va[i], rpc.Tversion(1)) + } + } + ch <- rpc.OK + }() + + select { + case <-ch: + case <-time.After(NSEC * time.Second): + ts.Fatalf("Gets didn't finish") + } +} + +// Test that Gets from a non-moving shard return quickly +func TestProgressJoin(t *testing.T) { + const ( + NJOIN = 4 + NSEC = 4 + NCNT = 100 + ) + + ts := MakeTest(t, "Test (5A): progress ...", true) + defer ts.Cleanup() + + ts.setupKVService() + + ck := ts.MakeClerk() + ka, va := ts.SpreadPuts(ck, NKEYS) + + sck := ts.ShardCtrler() + grps := ts.groups(NJOIN) + ts.joinGroups(sck, grps) + + cfg, _ := sck.Query() + newcfg := cfg.Copy() + newgid := tester.Tgid(NJOIN + 3) + if ok := newcfg.JoinBalance(map[tester.Tgid][]string{newgid: []string{"xxx"}}); !ok { + t.Fatalf("JoinBalance failed") + } + newcfg1 := newcfg.Copy() + if ok := newcfg1.LeaveBalance([]tester.Tgid{newgid}); !ok { + t.Fatalf("JoinBalance failed") + } + + // compute which shards don't move and which groups are involved + // in moving shards + stable := make(map[shardcfg.Tshid]bool) + participating := make(map[tester.Tgid]bool) + for i, g := range newcfg1.Shards { + if newcfg.Shards[i] == g { + stable[shardcfg.Tshid(i)] = true + } else { + participating[g] = true + } + } + + //log.Printf("groups participating %v stable %v", participating, stable) + //log.Printf("\ncfg %v\n %v\n %v", cfg.Shards, newcfg.Shards, newcfg1.Shards) + + ch0 := make(chan rpc.Err) + go func() { + for true { + select { + case <-ch0: + return + default: + //log.Printf("join/leave %v", newgid) + if err := ts.joinGroups(sck, []tester.Tgid{newgid}); err != rpc.OK { + t.Fatalf("joined err %v", err) + } + if err := ts.leaveGroups(sck, []tester.Tgid{newgid}); err != rpc.OK { + t.Fatalf("leave err %v", err) + } + } + } + }() + + ch1 := make(chan int) + go func() { + // get the keys that are on groups that are involved in the + // join but not in the shards that are moving + t := time.Now().Add(NSEC * time.Second) + nget := 0 + for time.Now().Before(t) { + for i := 0; i < len(ka); i++ { + s := shardcfg.Key2Shard(ka[i]) + if _, ok := stable[s]; ok { + g := newcfg1.Shards[s] + if _, ok := participating[g]; ok { + // log.Printf("key lookup %v(%d) gid %d", ka[i], s, g) + ts.CheckGet(ck, ka[i], va[i], rpc.Tversion(1)) + nget++ + } + } + } + } + ch1 <- nget + }() + + select { + case cnt := <-ch1: + log.Printf("cnt %d", cnt) + if cnt < NCNT { + ts.Fatalf("Two few gets finished %d; expected more than %d", cnt, NCNT) + } + + case <-time.After(2 * NSEC * time.Second): + ts.Fatalf("Gets didn't finish") + } + ch0 <- rpc.OK +} + +// Test linearizability with groups joining/leaving and `nclnt` +// concurrent clerks put/get's in `unreliable` net. +func concurrentClerk(t *testing.T, nclnt int, reliable bool, part string) { + const ( + NSEC = 20 + ) + + ts := MakeTest(t, part, reliable) + defer ts.Cleanup() + + ts.setupKVService() + + ka := kvtest.MakeKeys(NKEYS) + ch := make(chan []kvtest.ClntRes) + + go func(ch chan []kvtest.ClntRes) { + rs := ts.SpawnClientsAndWait(nclnt, NSEC*time.Second, func(me int, ck kvtest.IKVClerk, done chan struct{}) kvtest.ClntRes { + return ts.OneClientPut(me, ck, ka, done) + }) + ch <- rs + }(ch) + + sck := ts.ShardCtrler() + grps := ts.groups(NGRP) + if err := ts.joinGroups(sck, grps); err != rpc.OK { + t.Fatalf("joinGroups err %v", err) + } + + if err := ts.leaveGroups(sck, grps); err != rpc.OK { + t.Fatalf("leaveGroups err %v", err) + } + + <-ch + + ts.CheckPorcupine() +} + +// Test linearizability with groups joining/leaving and 1 concurrent clerks put/get's +func TestOneConcurrentClerkReliable5A(t *testing.T) { + concurrentClerk(t, 1, true, "Test (5A): one concurrent clerk reliable...") +} + +// Test linearizability with groups joining/leaving and many concurrent clerks put/get's +func TestManyConcurrentClerkReliable5A(t *testing.T) { + const NCLNT = 10 + concurrentClerk(t, NCLNT, true, "Test (5A): many concurrent clerks reliable...") +} + +// Test linearizability with groups joining/leaving and 1 concurrent clerks put/get's +func TestOneConcurrentClerkUnreliable5A(t *testing.T) { + concurrentClerk(t, 1, false, "Test (5A): one concurrent clerk unreliable ...") +} + +// Test linearizability with groups joining/leaving and many concurrent clerks put/get's +func TestManyConcurrentClerkUnreliable5A(t *testing.T) { + const NCLNT = 10 + concurrentClerk(t, NCLNT, false, "Test (5A): many concurrent clerks unreliable...") +} + +// Test if join/leave complete even if shardgrp is down for a while, but +// don't complete while the shardgrp is down. +func TestJoinLeave5B(t *testing.T) { + const NSEC = 2 + + ts := MakeTest(t, "Test (5B): Join/leave while a shardgrp is down...", true) + defer ts.Cleanup() + + gid1 := ts.setupKVService() + ck := ts.MakeClerk() + ka, va := ts.SpreadPuts(ck, NKEYS) + + sck := ts.ShardCtrler() + cfg, _ := sck.Query() + + ts.Group(gid1).Shutdown() + + gid2 := ts.newGid() + ch := make(chan rpc.Err) + go func() { + err := ts.joinGroups(sck, []tester.Tgid{gid2}) + ch <- err + }() + + select { + case err := <-ch: + ts.Fatalf("Join finished %v", err) + case <-time.After(1 * NSEC): + // Give Join some time to try to join + } + + // Now join should be able to finish + ts.Group(gid1).StartServers() + + select { + case err := <-ch: + if err != rpc.OK { + ts.Fatalf("Join returns err %v", err) + } + case <-time.After(time.Second * NSEC): + ts.Fatalf("Join didn't complete") + } + + cfg1, _ := sck.Query() + if cfg.Num+1 != cfg1.Num { + ts.t.Fatalf("wrong num %d expected %d ", cfg1.Num, cfg.Num+1) + } + + ts.Group(gid2).Shutdown() + + ch = make(chan rpc.Err) + go func() { + err := ts.leave(sck, shardcfg.Gid1) + ch <- err + }() + + select { + case err := <-ch: + ts.Fatalf("Leave finished %v", err) + case <-time.After(NSEC * time.Second): + // Give give some time to try to join + } + + // Now leave should be able to finish + ts.Group(gid2).StartServers() + + select { + case err := <-ch: + if err != rpc.OK { + ts.Fatalf("Leave returns err %v", err) + } + case <-time.After(time.Second * NSEC): + ts.Fatalf("Leave didn't complete") + } + for i := 0; i < len(ka); i++ { + ts.CheckGet(ck, ka[i], va[i], rpc.Tversion(1)) + } +} + +// test recovery of partitioned controlers +func TestRecoverCtrler5B(t *testing.T) { + const ( + NPARTITION = 5 + ) + + ts := MakeTest(t, "Test (5B): recover controler ...", true) + defer ts.Cleanup() + + gid := ts.setupKVService() + ck := ts.MakeClerk() + ka, va := ts.SpreadPuts(ck, NKEYS) + + for i := 0; i < NPARTITION; i++ { + ts.killCtrler(ck, gid, ka, va) + } +} + +// Test concurrent ctrlers fighting for leadership reliable +func TestAcquireLockConcurrentReliable5C(t *testing.T) { + ts := MakeTestLeases(t, "Test (5C): Concurent ctrlers acquiring leadership ...", true) + defer ts.Cleanup() + ts.setupKVService() + ck := ts.MakeClerk() + ka, va := ts.SpreadPuts(ck, NKEYS) + ts.electCtrler(ck, ka, va) +} + +// Test concurrent ctrlers fighting for leadership unreliable +func TestAcquireLockConcurrentUnreliable5C(t *testing.T) { + ts := MakeTestLeases(t, "Test (5C): Concurent ctrlers acquiring leadership ...", false) + defer ts.Cleanup() + ts.setupKVService() + ck := ts.MakeClerk() + ka, va := ts.SpreadPuts(ck, NKEYS) + ts.electCtrler(ck, ka, va) +} + +// Test that ReleaseLock allows a new leader to start quickly +func TestLeaseBasicRelease5C(t *testing.T) { + ts := MakeTestLeases(t, "Test (5C): release lease ...", true) + defer ts.Cleanup() + ts.setupKVService() + + sck0, clnt0 := ts.makeShardCtrlerClnt() + go func() { + if err := sck0.InitController(); err != rpc.OK { + t.Fatalf("failed to init controller %v", err) + } + time.Sleep(200 * time.Millisecond) + sck0.ExitController() + }() + + time.Sleep(10 * time.Millisecond) + + // start new controller + sck1, clnt1 := ts.makeShardCtrlerClnt() + ch := make(chan struct{}) + go func() { + if err := sck1.InitController(); err != rpc.OK { + t.Fatalf("failed to init controller %v", err) + } + time.Sleep(200 * time.Millisecond) + sck1.ExitController() + ch <- struct{}{} + }() + + select { + case <-time.After(1 * time.Second): + ts.Fatalf("Release didn't give up leadership") + case <-ch: + } + + ts.Config.DeleteClient(clnt0) + ts.Config.DeleteClient(clnt1) +} + +// Test lease expiring +func TestLeaseBasicExpire5C(t *testing.T) { + ts := MakeTestLeases(t, "Test (5C): lease expiring ...", true) + defer ts.Cleanup() + ts.setupKVService() + + sck0, clnt0 := ts.makeShardCtrlerClnt() + go func() { + if err := sck0.InitController(); err != rpc.OK { + t.Fatalf("failed to init controller %v", err) + } + for { + time.Sleep(10 * time.Millisecond) + } + }() + + time.Sleep(100 * time.Millisecond) + + // partition sck0 forever + clnt0.DisconnectAll() + + // start new controller + sck1, clnt1 := ts.makeShardCtrlerClnt() + ch := make(chan struct{}) + go func() { + if err := sck1.InitController(); err != rpc.OK { + t.Fatalf("failed to init controller %v", err) + } + time.Sleep(100 * time.Millisecond) + sck1.ExitController() + ch <- struct{}{} + }() + + select { + case <-time.After((param.LEASETIMESEC + 1) * time.Second): + ts.Fatalf("Lease didn't expire") + case <-ch: + } + + ts.Config.DeleteClient(clnt0) + ts.Config.DeleteClient(clnt1) +} + +// Test lease is being extended +func TestLeaseBasicRefresh5C(t *testing.T) { + const LEADERSEC = 3 + + ts := MakeTestLeases(t, "Test (5C): lease refresh ...", true) + defer ts.Cleanup() + ts.setupKVService() + + sck0, clnt0 := ts.makeShardCtrlerClnt() + go func() { + if err := sck0.InitController(); err != rpc.OK { + t.Fatalf("failed to init controller %v", err) + } + time.Sleep(LEADERSEC * param.LEASETIMESEC * time.Second) + sck0.ExitController() + }() + + // give sck0 time to become leader + time.Sleep(100 * time.Millisecond) + + // start new controller + sck1, clnt1 := ts.makeShardCtrlerClnt() + ch := make(chan struct{}) + go func() { + if err := sck1.InitController(); err != rpc.OK { + t.Fatalf("failed to init controller %v", err) + } + time.Sleep(100 * time.Millisecond) + sck1.ExitController() + ch <- struct{}{} + }() + + select { + case <-time.After((LEADERSEC + param.LEASETIMESEC + 1) * time.Second): + case <-ch: + ts.Fatalf("Lease not refreshed") + } + + ts.Config.DeleteClient(clnt0) + ts.Config.DeleteClient(clnt1) +} + +// Test if old leader is fenced off when reconnecting while it is in +// the middle of a Join. +func TestPartitionControlerJoin5C(t *testing.T) { + const ( + NSLEEP = 2 + RAND = 1000 + ) + + ts := MakeTestLeases(t, "Test (5C): partition controller in join...", true) + defer ts.Cleanup() + ts.setupKVService() + ck := ts.MakeClerk() + ka, va := ts.SpreadPuts(ck, NKEYS) + + sck, clnt := ts.makeShardCtrlerClnt() + if err := sck.InitController(); err != rpc.OK { + ts.Fatalf("failed to init controller %v", err) + } + + ch := make(chan rpc.Err) + ngid := tester.Tgid(0) + go func() { + ngid = ts.newGid() + ts.Config.MakeGroupStart(ngid, NSRV, ts.StartServerShardGrp) + ts.Group(ngid).Shutdown() + ch <- ts.join(sck, ngid, ts.Group(ngid).SrvNames()) + }() + + // sleep for a while to get the chance for the controler to get stuck + // in join or leave, because gid is down + time.Sleep(1 * time.Second) + + // partition sck + clnt.DisconnectAll() + + // wait until sck's lease expired before restarting shardgrp `ngid` + time.Sleep((param.LEASETIMESEC + 1) * time.Second) + + ts.Group(ngid).StartServers() + + // start new controler to supersede partitioned one, + // it will also be stuck + sck0 := ts.makeShardCtrler() + if err := sck0.InitController(); err != rpc.OK { + t.Fatalf("failed to init controller %v", err) + } + + sck0.ExitController() + + //log.Printf("reconnect") + + // reconnect old controller, which shouldn't be able + // to do anything + clnt.ConnectAll() + + err := <-ch + if err == rpc.OK { + t.Fatalf("Old leader succeeded %v", err) + } + + time.Sleep(1 * time.Second) + + for i := 0; i < len(ka); i++ { + ts.CheckGet(ck, ka[i], va[i], rpc.Tversion(1)) + } +} + +// Make a leader controller loses its leadership during join/leave and +// test if the next controller recovers correctly. +func TestPartitionRecovery5C(t *testing.T) { + const ( + // NPARTITION = 10 + NPARTITION = 5 + ) + + ts := MakeTestLeases(t, "Test (5C): controllers with leased leadership ...", true) + defer ts.Cleanup() + gid := ts.setupKVService() + ck := ts.MakeClerk() + ka, va := ts.SpreadPuts(ck, NKEYS) + + for i := 0; i < NPARTITION; i++ { + ts.killCtrler(ck, gid, ka, va) + } +} diff --git a/src/shardkv1/test.go b/src/shardkv1/test.go new file mode 100644 index 0000000..f37ee5c --- /dev/null +++ b/src/shardkv1/test.go @@ -0,0 +1,417 @@ +package shardkv + +import ( + "fmt" + //"log" + "math/rand" + "sync" + "sync/atomic" + "testing" + "time" + + "6.5840/kvraft1/rsm" + "6.5840/kvsrv1/rpc" + "6.5840/kvtest1" + "6.5840/labrpc" + "6.5840/shardkv1/kvsrv1" + "6.5840/shardkv1/shardcfg" + "6.5840/shardkv1/shardctrler" + "6.5840/shardkv1/shardctrler/param" + "6.5840/shardkv1/shardgrp" + "6.5840/tester1" +) + +type Test struct { + t *testing.T + *kvtest.Test + + sck *shardctrler.ShardCtrler + part string + leases bool + + maxraftstate int + mu sync.Mutex + ngid tester.Tgid +} + +const ( + Controler = tester.Tgid(0) // controler uses group 0 for a kvraft group + NSRV = 3 // servers per group + INTERGRPDELAY = 200 // time in ms between group changes +) + +// Setup kvserver for the shard controller and make the controller +func MakeTestMaxRaft(t *testing.T, part string, reliable, leases bool, maxraftstate int) *Test { + ts := &Test{ + ngid: shardcfg.Gid1 + 1, // Gid1 is in use + t: t, + leases: leases, + maxraftstate: maxraftstate, + } + cfg := tester.MakeConfig(t, 1, reliable, kvsrv.StartKVServer) + ts.Test = kvtest.MakeTest(t, cfg, false, ts) + ts.Begin(part) + return ts +} + +func MakeTest(t *testing.T, part string, reliable bool) *Test { + return MakeTestMaxRaft(t, part, reliable, false, -1) +} + +func MakeTestLeases(t *testing.T, part string, reliable bool) *Test { + return MakeTestMaxRaft(t, part, reliable, true, -1) +} + +func (ts *Test) MakeClerk() kvtest.IKVClerk { + clnt := ts.Config.MakeClient() + ck := MakeClerk(clnt, ts.makeShardCtrler()) + return &kvtest.TestClerk{ck, clnt} +} + +func (ts *Test) DeleteClerk(ck kvtest.IKVClerk) { + tck := ck.(*kvtest.TestClerk) + ts.DeleteClient(tck.Clnt) +} + +func (ts *Test) ShardCtrler() *shardctrler.ShardCtrler { + return ts.sck +} + +func (ts *Test) makeShardCtrler() *shardctrler.ShardCtrler { + ck, _ := ts.makeShardCtrlerClnt() + return ck +} + +func (ts *Test) makeShardCtrlerClnt() (*shardctrler.ShardCtrler, *tester.Clnt) { + clnt := ts.Config.MakeClient() + return shardctrler.MakeShardCtrler(clnt, ts.leases), clnt +} + +func (ts *Test) makeKVClerk() *kvsrv.Clerk { + srv := tester.ServerName(tester.GRP0, 0) + clnt := ts.Config.MakeClient() + return kvsrv.MakeClerk(clnt, srv).(*kvsrv.Clerk) +} + +func (ts *Test) newGid() tester.Tgid { + ts.mu.Lock() + defer ts.mu.Unlock() + + gid := ts.ngid + ts.ngid += 1 + return gid +} + +func (ts *Test) groups(n int) []tester.Tgid { + grps := make([]tester.Tgid, n) + for i := 0; i < n; i++ { + grps[i] = ts.newGid() + } + return grps +} + +// Set up KVServervice with one group Gid1. Gid1 should initialize itself to +// own all shards. +func (ts *Test) setupKVService() tester.Tgid { + ts.sck = ts.makeShardCtrler() + scfg := shardcfg.MakeShardConfig() + ts.Config.MakeGroupStart(shardcfg.Gid1, NSRV, ts.StartServerShardGrp) + scfg.JoinBalance(map[tester.Tgid][]string{shardcfg.Gid1: ts.Group(shardcfg.Gid1).SrvNames()}) + ts.sck.InitConfig(scfg) + return shardcfg.Gid1 +} + +func (ts *Test) StartServerShardGrp(servers []*labrpc.ClientEnd, gid tester.Tgid, me int, persister *tester.Persister) []tester.IService { + return shardgrp.StartServerShardGrp(servers, gid, me, persister, ts.maxraftstate) +} + +// Add group gid +func (ts *Test) join(sck *shardctrler.ShardCtrler, gid tester.Tgid, srvs []string) rpc.Err { + cfg, _ := sck.Query() + newcfg := cfg.Copy() + ok := newcfg.JoinBalance(map[tester.Tgid][]string{gid: srvs}) + if !ok { + return rpc.ErrVersion + } + err := sck.ChangeConfigTo(newcfg) + return err +} + +func (ts *Test) joinGroups(sck *shardctrler.ShardCtrler, gids []tester.Tgid) rpc.Err { + for i, gid := range gids { + ts.Config.MakeGroupStart(gid, NSRV, ts.StartServerShardGrp) + if err := ts.join(sck, gid, ts.Group(gid).SrvNames()); err != rpc.OK { + return err + } + if i < len(gids)-1 { + time.Sleep(INTERGRPDELAY * time.Millisecond) + } + } + return rpc.OK +} + +// Group gid leaves. +func (ts *Test) leave(sck *shardctrler.ShardCtrler, gid tester.Tgid) rpc.Err { + cfg, _ := sck.Query() + newcfg := cfg.Copy() + ok := newcfg.LeaveBalance([]tester.Tgid{gid}) + if !ok { + return rpc.ErrVersion + } + return sck.ChangeConfigTo(newcfg) +} + +func (ts *Test) leaveGroups(sck *shardctrler.ShardCtrler, gids []tester.Tgid) rpc.Err { + for i, gid := range gids { + if err := ts.leave(sck, gid); err != rpc.OK { + return err + } + ts.Config.ExitGroup(gid) + if i < len(gids)-1 { + time.Sleep(INTERGRPDELAY * time.Millisecond) + } + } + return rpc.OK +} + +func (ts *Test) disconnectRaftLeader(gid tester.Tgid) (int, string) { + _, l := rsm.Leader(ts.Config, gid) + g := ts.Group(gid) + ln := g.SrvName(l) + g.DisconnectAll(l) + return l, ln +} + +func (ts *Test) reconnectOldLeader(gid tester.Tgid, l int) { + g := ts.Group(gid) + g.ConnectOne(l) +} + +func (ts *Test) disconnectClntFromLeader(clnt *tester.Clnt, gid tester.Tgid) int { + l, ln := ts.disconnectRaftLeader(gid) + p := ts.Group(gid).AllowServersExcept(l) + srvs := ts.Group(gid).SrvNamesTo(p) + clnt.Disconnect(ln) + clnt.ConnectTo(srvs) + return l +} + +func (ts *Test) checkLogs(gids []tester.Tgid) { + for _, gid := range gids { + n := ts.Group(gid).LogSize() + s := ts.Group(gid).SnapshotSize() + if ts.maxraftstate >= 0 && n > 8*ts.maxraftstate { + ts.t.Fatalf("persister.RaftStateSize() %v, but maxraftstate %v", + n, ts.maxraftstate) + } + if ts.maxraftstate < 0 && s > 0 { + ts.t.Fatalf("maxraftstate is -1, but snapshot is non-empty!") + } + + } +} + +// make sure that the data really is sharded by +// shutting down one shard and checking that some +// Get()s don't succeed. +func (ts *Test) checkShutdownSharding(down, up tester.Tgid, ka []string, va []string) { + const NSEC = 2 + + ts.Group(down).Shutdown() + + ts.checkLogs([]tester.Tgid{down, up}) // forbid snapshots + + n := len(ka) + ch := make(chan string) + done := int32(0) + for xi := 0; xi < n; xi++ { + ck1 := ts.MakeClerk() + go func(i int) { + v, _, _ := ck1.Get(ka[i]) + if atomic.LoadInt32(&done) == 1 { + return + } + if v != va[i] { + ch <- fmt.Sprintf("Get(%v): expected:\n%v\nreceived:\n%v", ka[i], va[i], v) + } else { + ch <- "" + } + }(xi) + } + + // wait a bit, only about half the Gets should succeed. + ndone := 0 + for atomic.LoadInt32(&done) != 1 { + select { + case err := <-ch: + if err != "" { + ts.Fatalf(err) + } + ndone += 1 + case <-time.After(time.Second * NSEC): + atomic.StoreInt32(&done, 1) + break + } + } + + //log.Printf("%d completions out of %d; down %d", ndone, n, down) + if ndone >= n { + ts.Fatalf("expected less than %d completions with one shard dead\n", n) + } + + // bring the crashed shard/group back to life. + ts.Group(down).StartServers() +} + +// Run one controler and then partition it after some time. Run +// another cntrler that must finish the first ctrler's unfinished +// shard moves. To ensure first ctrler is in a join/leave the test +// shuts down shardgrp `gid`. After the second controller is done, +// heal the partition to test if Freeze,InstallShard, and Delete are +// are fenced. +func (ts *Test) killCtrler(ck kvtest.IKVClerk, gid tester.Tgid, ka, va []string) { + const ( + NSLEEP = 2 + + RAND = 1000 + + JOIN = 1 + LEAVE = 2 + ) + + sck, clnt := ts.makeShardCtrlerClnt() + if err := sck.InitController(); err != rpc.OK { + ts.Fatalf("failed to init controller %v", err) + } + + cfg, _ := ts.ShardCtrler().Query() + num := cfg.Num + + state := 0 + ngid := tester.Tgid(0) + go func() { + for { + ngid = ts.newGid() + state = JOIN + err := ts.joinGroups(sck, []tester.Tgid{ngid}) + if err == rpc.OK { + state = LEAVE + err = ts.leaveGroups(sck, []tester.Tgid{ngid}) + } else { + //log.Printf("deposed err %v", err) + return + } + } + }() + + r := rand.Int() % RAND + d := time.Duration(r) * time.Millisecond + time.Sleep(d) + + //log.Printf("shutdown gid %d after %dms", gid, r) + ts.Group(gid).Shutdown() + + // sleep for a while to get the chance for the controler to get stuck + // in join or leave, because gid is down + time.Sleep(NSLEEP * time.Second) + + //log.Printf("disconnect sck %v ngid %d num %d state %d", d, ngid, num, state) + + // partition controller + clnt.DisconnectAll() + + if ts.leases { + // wait until sck's lease expired before restarting shardgrp `gid` + time.Sleep((param.LEASETIMESEC + 1) * time.Second) + } + + ts.Group(gid).StartServers() + + // start new controler to pick up where sck left off + sck0, clnt0 := ts.makeShardCtrlerClnt() + if err := sck0.InitController(); err != rpc.OK { + ts.Fatalf("failed to init controller %v", err) + } + cfg, _ = sck0.Query() + s := "join" + if state == LEAVE { + s = "leave" + } + //log.Printf("%v cfg %v recovered %s", s, cfg, s) + + if cfg.Num <= num { + ts.Fatalf("didn't recover; expected %d > %d", num, cfg.Num) + } + + present := cfg.IsMember(ngid) + if (state == JOIN && !present) || (state == LEAVE && present) { + ts.Fatalf("didn't recover %d correctly after %v", ngid, s) + } + + if state == JOIN && present { + // cleanup if disconnected after join but before leave + ts.leaveGroups(sck0, []tester.Tgid{ngid}) + } + + for i := 0; i < len(ka); i++ { + ts.CheckGet(ck, ka[i], va[i], rpc.Tversion(1)) + } + + sck0.ExitController() + + if ts.leases { + // reconnect old controller, which shouldn't be able + // to do anything + clnt.ConnectAll() + + time.Sleep(1 * time.Second) + + for i := 0; i < len(ka); i++ { + ts.CheckGet(ck, ka[i], va[i], rpc.Tversion(1)) + } + } + ts.Config.DeleteClient(clnt) + ts.Config.DeleteClient(clnt0) +} + +func (ts *Test) electCtrler(ck kvtest.IKVClerk, ka, va []string) { + const ( + NSEC = 5 + N = 4 + ) + + ch := make(chan struct{}) + f := func(ch chan struct{}, i int) { + for true { + select { + case <-ch: + return + default: + ngid := ts.newGid() + sck := ts.makeShardCtrler() + if err := sck.InitController(); err != rpc.OK { + ts.Fatalf("failed to init controller %v", err) + } + //log.Printf("%d(%p): join/leave %v", i, sck, ngid) + if err := ts.joinGroups(sck, []tester.Tgid{ngid}); err == rpc.OK { + ts.leaveGroups(sck, []tester.Tgid{ngid}) + } + sck.ExitController() + } + } + } + for i := 0; i < N; i++ { + go f(ch, i) + } + + // let f()'s run for a while + time.Sleep(NSEC * time.Second) + + for i := 0; i < N; i++ { + ch <- struct{}{} + } + for i := 0; i < len(ka); i++ { + ts.CheckGet(ck, ka[i], va[i], rpc.Tversion(1)) + } + +}