diff --git a/.check-build b/.check-build index cbf3adf..036b722 100755 --- a/.check-build +++ b/.check-build @@ -118,8 +118,8 @@ check_lab4() { } check_lab5a() { - check_cmd cd src/shardctrler1 - check_cmd go test -c + # check_cmd cd src/shardctrler1 + # check_cmd go test -c } check_lab5b() { diff --git a/src/shardkv1/client.go b/src/shardkv1/client.go deleted file mode 100644 index 02ba37c..0000000 --- a/src/shardkv1/client.go +++ /dev/null @@ -1,51 +0,0 @@ -package shardkv - -// -// client code to talk to a sharded key/value service. -// -// the client uses the shardctrler to query for the current -// configuration and find the assignment of shards (keys) to groups, -// and then talks to the group that holds the key's shard. -// - -import ( - - "6.5840/kvsrv1/rpc" - "6.5840/kvtest1" - "6.5840/shardkv1/shardctrler" - "6.5840/tester1" -) - -type Clerk struct { - clnt *tester.Clnt - sck *shardctrler.ShardCtrler - // You will have to modify this struct. -} - -// The tester calls MakeClerk and passes in a shardctrler so that -// client can call it's Query method -func MakeClerk(clnt *tester.Clnt, sck *shardctrler.ShardCtrler) kvtest.IKVClerk { - ck := &Clerk{ - clnt: clnt, - sck: sck, - } - // You'll have to add code here. - return ck -} - - -// Get a key from a shardgrp. You can use shardcfg.Key2Shard(key) to -// find the shard responsible for the key and ck.sck.Query() to read -// the current configuration and lookup the servers in the group -// responsible for key. You can make a clerk for that group by -// calling shardgrp.MakeClerk(ck.clnt, servers). -func (ck *Clerk) Get(key string) (string, rpc.Tversion, rpc.Err) { - // You will have to modify this function. - return "", 0, "" -} - -// Put a key to a shard group. -func (ck *Clerk) Put(key string, value string, version rpc.Tversion) rpc.Err { - // You will have to modify this function. - return "" -} diff --git a/src/shardkv1/kvsrv1/client.go b/src/shardkv1/kvsrv1/client.go deleted file mode 100644 index 73f1d29..0000000 --- a/src/shardkv1/kvsrv1/client.go +++ /dev/null @@ -1,56 +0,0 @@ -package kvsrv - -import ( - "6.5840/kvsrv1/rpc" - "6.5840/kvtest1" - "6.5840/tester1" -) - - -type Clerk struct { - clnt *tester.Clnt - server string -} - -func MakeClerk(clnt *tester.Clnt, server string) kvtest.IKVClerk { - ck := &Clerk{clnt: clnt, server: server} - // You may add code here. - return ck -} - -// Get fetches the current value and version for a key. It returns -// ErrNoKey if the key does not exist. It keeps trying forever in the -// face of all other errors. -// -// You can send an RPC with code like this: -// ok := ck.clnt.Call(ck.server, "KVServer.Get", &args, &reply) -// -// The types of args and reply (including whether they are pointers) -// must match the declared types of the RPC handler function's -// arguments. Additionally, reply must be passed as a pointer. -func (ck *Clerk) Get(key string) (string, rpc.Tversion, rpc.Err) { - // You will have to modify this function. - return "", 0, rpc.ErrNoKey -} - -// Put updates key with value only if the version in the -// request matches the version of the key at the server. If the -// versions numbers don't match, the server should return -// ErrVersion. If Put receives an ErrVersion on its first RPC, Put -// should return ErrVersion, since the Put was definitely not -// performed at the server. If the server returns ErrVersion on a -// resend RPC, then Put must return ErrMaybe to the application, since -// its earlier RPC might have been processed by the server successfully -// but the response was lost, and the the Clerk doesn't know if -// the Put was performed or not. -// -// You can send an RPC with code like this: -// ok := ck.clnt.Call(ck.server, "KVServer.Put", &args, &reply) -// -// The types of args and reply (including whether they are pointers) -// must match the declared types of the RPC handler function's -// arguments. Additionally, reply must be passed as a pointer. -func (ck *Clerk) Put(key, value string, version rpc.Tversion) rpc.Err { - // You will have to modify this function. - return rpc.ErrNoKey -} diff --git a/src/shardkv1/kvsrv1/kvsrv_test.go b/src/shardkv1/kvsrv1/kvsrv_test.go deleted file mode 100644 index 68c1c79..0000000 --- a/src/shardkv1/kvsrv1/kvsrv_test.go +++ /dev/null @@ -1,162 +0,0 @@ -package kvsrv - -import ( - // "log" - "runtime" - "testing" - "time" - - "6.5840/kvsrv1/rpc" - "6.5840/kvtest1" -) - -// Test Put with a single client and a reliable network -func TestReliablePut(t *testing.T) { - const Val = "6.5840" - const Ver = 0 - - ts := MakeTestKV(t, true) - defer ts.Cleanup() - - ts.Begin("One client and reliable Put") - - ck := ts.MakeClerk() - if err := ck.Put("k", Val, Ver); err != rpc.OK { - t.Fatalf("Put err %v", err) - } - - if val, ver, err := ck.Get("k"); err != rpc.OK { - t.Fatalf("Get err %v; expected OK", err) - } else if val != Val { - t.Fatalf("Get value err %v; expected %v", val, Val) - } else if ver != Ver+1 { - t.Fatalf("Get wrong version %v; expected %v", ver, Ver+1) - } - - if err := ck.Put("k", Val, 0); err != rpc.ErrVersion { - t.Fatalf("expected Put to fail with ErrVersion; got err=%v", err) - } - - if err := ck.Put("y", Val, rpc.Tversion(1)); err != rpc.ErrNoKey { - t.Fatalf("expected Put to fail with ErrNoKey; got err=%v", err) - } - - if _, _, err := ck.Get("y"); err != rpc.ErrNoKey { - t.Fatalf("expected Get to fail with ErrNoKey; got err=%v", err) - } -} - -// Many clients putting on same key. -func TestPutConcurrentReliable(t *testing.T) { - const ( - PORCUPINETIME = 10 * time.Second - NCLNT = 10 - NSEC = 1 - ) - - ts := MakeTestKV(t, true) - defer ts.Cleanup() - - ts.Begin("Test: many clients racing to put values to the same key") - - rs := ts.SpawnClientsAndWait(NCLNT, NSEC*time.Second, func(me int, ck kvtest.IKVClerk, done chan struct{}) kvtest.ClntRes { - return ts.OneClientPut(me, ck, []string{"k"}, done) - }) - ck := ts.MakeClerk() - ts.CheckPutConcurrent(ck, "k", rs, &kvtest.ClntRes{}) - ts.CheckPorcupineT(PORCUPINETIME) -} - -// Check if memory used on server is reasonable -func TestMemPutManyClientsReliable(t *testing.T) { - const ( - NCLIENT = 100_000 - MEM = 1000 - ) - - ts := MakeTestKV(t, true) - defer ts.Cleanup() - - v := kvtest.RandValue(MEM) - - cks := make([]kvtest.IKVClerk, NCLIENT) - for i, _ := range cks { - cks[i] = ts.MakeClerk() - } - - // force allocation of ends for server in each client - for i := 0; i < NCLIENT; i++ { - if err := cks[i].Put("k", "", 1); err != rpc.ErrNoKey { - t.Fatalf("Put failed %v", err) - } - } - - ts.Begin("Test: memory use many put clients") - - // allow threads started by labrpc to start - time.Sleep(1 * time.Second) - - runtime.GC() - runtime.GC() - - var st runtime.MemStats - runtime.ReadMemStats(&st) - m0 := st.HeapAlloc - - for i := 0; i < NCLIENT; i++ { - if err := cks[i].Put("k", v, rpc.Tversion(i)); err != rpc.OK { - t.Fatalf("Put failed %v", err) - } - } - - runtime.GC() - time.Sleep(1 * time.Second) - runtime.GC() - - runtime.ReadMemStats(&st) - m1 := st.HeapAlloc - f := (float64(m1) - float64(m0)) / NCLIENT - if m1 > m0+(NCLIENT*200) { - t.Fatalf("error: server using too much memory %d %d (%.2f per client)\n", m0, m1, f) - } -} - -// Test with one client and unreliable network. If Clerk.Put returns -// ErrMaybe, the Put must have happened, since the test uses only one -// client. -func TestUnreliableNet(t *testing.T) { - const NTRY = 100 - - ts := MakeTestKV(t, false) - defer ts.Cleanup() - - ts.Begin("One client") - - ck := ts.MakeClerk() - - retried := false - for try := 0; try < NTRY; try++ { - for i := 0; true; i++ { - if err := ts.PutJson(ck, "k", i, rpc.Tversion(try), 0); err != rpc.ErrMaybe { - if i > 0 && err != rpc.ErrVersion { - t.Fatalf("Put shouldn't have happen more than once %v", err) - } - break - } - // Try put again; it should fail with ErrVersion - retried = true - } - v := 0 - if ver := ts.GetJson(ck, "k", 0, &v); ver != rpc.Tversion(try+1) { - t.Fatalf("Wrong version %d expect %d", ver, try+1) - } - if v != 0 { - t.Fatalf("Wrong value %d expect %d", v, 0) - } - } - if !retried { - t.Fatalf("Clerk.Put never returned ErrMaybe") - } - - ts.CheckPorcupine() -} diff --git a/src/shardkv1/kvsrv1/lock/lock.go b/src/shardkv1/kvsrv1/lock/lock.go deleted file mode 100644 index f788304..0000000 --- a/src/shardkv1/kvsrv1/lock/lock.go +++ /dev/null @@ -1,31 +0,0 @@ -package lock - -import ( - - "6.5840/kvsrv1/rpc" - "6.5840/shardkv1/kvsrv1" - "6.5840/shardkv1/shardctrler/param" -) - - -type Lock struct { - ck *kvsrv.Clerk - -} - -// Use l as the key to store the "lock state" (you would have to decide -// precisely what the lock state is). -func MakeLock(ck kvtest.IKVClerk, l string) *Lock { - lk := &Lock{ck: ck.(*kvsrv.Clerk)} - // You may add code here - return lk -} - - -func (lk *Lock) Acquire() { - // You may add code here. -} - -func (lk *Lock) Release() { - // You may add code here. -} diff --git a/src/shardkv1/kvsrv1/lock/lock_test.go b/src/shardkv1/kvsrv1/lock/lock_test.go deleted file mode 100644 index 92c9e67..0000000 --- a/src/shardkv1/kvsrv1/lock/lock_test.go +++ /dev/null @@ -1,89 +0,0 @@ -package lock - -import ( - "fmt" - // "log" - "strconv" - "testing" - "time" - - "6.5840/kvsrv1" - "6.5840/kvsrv1/rpc" - "6.5840/kvtest1" -) - -const ( - NACQUIRE = 10 - NCLNT = 10 - NSEC = 2 -) - -func oneClient(t *testing.T, me int, ck kvtest.IKVClerk, done chan struct{}) kvtest.ClntRes { - lk := MakeLock(ck, "l") - ck.Put("l0", "", 0) - for i := 1; true; i++ { - select { - case <-done: - return kvtest.ClntRes{i, 0} - default: - lk.Acquire() - - // log.Printf("%d: acquired lock", me) - - b := strconv.Itoa(me) - val, ver, err := ck.Get("l0") - if err == rpc.OK { - if val != "" { - t.Fatalf("%d: two clients acquired lock %v", me, val) - } - } else { - t.Fatalf("%d: get failed %v", me, err) - } - - err = ck.Put("l0", string(b), ver) - if !(err == rpc.OK || err == rpc.ErrMaybe) { - t.Fatalf("%d: put failed %v", me, err) - } - - time.Sleep(10 * time.Millisecond) - - err = ck.Put("l0", "", ver+1) - if !(err == rpc.OK || err == rpc.ErrMaybe) { - t.Fatalf("%d: put failed %v", me, err) - } - - // log.Printf("%d: release lock", me) - - lk.Release() - } - } - return kvtest.ClntRes{} -} - -// Run test clients -func runClients(t *testing.T, nclnt int, reliable bool) { - ts := kvsrv.MakeTestKV(t, reliable) - defer ts.Cleanup() - - ts.Begin(fmt.Sprintf("Test: %d lock clients", nclnt)) - - ts.SpawnClientsAndWait(nclnt, NSEC*time.Second, func(me int, myck kvtest.IKVClerk, done chan struct{}) kvtest.ClntRes { - return oneClient(t, me, myck, done) - }) -} - -func TestOneClientReliable(t *testing.T) { - runClients(t, 1, true) -} - -func TestManyClientsReliable(t *testing.T) { - runClients(t, NCLNT, true) -} - -func TestOneClientUnreliable(t *testing.T) { - runClients(t, 1, false) -} - -func TestManyClientsUnreliable(t *testing.T) { - runClients(t, NCLNT, false) -} diff --git a/src/shardkv1/kvsrv1/server.go b/src/shardkv1/kvsrv1/server.go deleted file mode 100644 index 83c256a..0000000 --- a/src/shardkv1/kvsrv1/server.go +++ /dev/null @@ -1,48 +0,0 @@ -package kvsrv - -import ( - "sync" - - - "6.5840/kvsrv1/rpc" - "6.5840/labrpc" - "6.5840/tester1" -) - - -type KVServer struct { - mu sync.Mutex - - // Your definitions here. -} - -func MakeKVServer() *KVServer { - kv := &KVServer{} - // Your code here. - return kv -} - -// Get returns the value and version for args.Key, if args.Key -// exists. Otherwise, Get returns ErrNoKey. -func (kv *KVServer) Get(args *rpc.GetArgs, reply *rpc.GetReply) { - // Your code here. -} - -// Update the value for a key if args.Version matches the version of -// the key on the server. If versions don't match, return ErrVersion. -// If the key doesn't exist, Put installs the value if the -// args.Version is 0, and returns ErrNoKey otherwise. -func (kv *KVServer) Put(args *rpc.PutArgs, reply *rpc.PutReply) { - // Your code here. -} - -// You can ignore Kill() for this lab -func (kv *KVServer) Kill() { -} - - -// You can ignore all arguments; they are for replicated KVservers -func StartKVServer(ends []*labrpc.ClientEnd, gid tester.Tgid, srv int, persister *tester.Persister) []tester.IService { - kv := MakeKVServer() - return []tester.IService{kv} -} diff --git a/src/shardkv1/kvsrv1/test.go b/src/shardkv1/kvsrv1/test.go deleted file mode 100644 index bd8269b..0000000 --- a/src/shardkv1/kvsrv1/test.go +++ /dev/null @@ -1,36 +0,0 @@ -package kvsrv - -import ( - // "log" - "testing" - - "6.5840/kvtest1" - "6.5840/tester1" -) - -type TestKV struct { - *kvtest.Test - t *testing.T - reliable bool -} - -func MakeTestKV(t *testing.T, reliable bool) *TestKV { - cfg := tester.MakeConfig(t, 1, reliable, StartKVServer) - ts := &TestKV{ - t: t, - reliable: reliable, - } - ts.Test = kvtest.MakeTest(t, cfg, false, ts) - return ts -} - -func (ts *TestKV) MakeClerk() kvtest.IKVClerk { - clnt := ts.Config.MakeClient() - ck := MakeClerk(clnt, tester.ServerName(tester.GRP0, 0)) - return &kvtest.TestClerk{ck, clnt} -} - -func (ts *TestKV) DeleteClerk(ck kvtest.IKVClerk) { - tck := ck.(*kvtest.TestClerk) - ts.DeleteClient(tck.Clnt) -} diff --git a/src/shardkv1/shardcfg/shardcfg.go b/src/shardkv1/shardcfg/shardcfg.go deleted file mode 100644 index b7f97fa..0000000 --- a/src/shardkv1/shardcfg/shardcfg.go +++ /dev/null @@ -1,284 +0,0 @@ -package shardcfg - -import ( - "encoding/json" - "hash/fnv" - "log" - "runtime/debug" - "slices" - "testing" - - "6.5840/tester1" -) - -type Tshid int -type Tnum int - -const ( - NShards = 12 // The number of shards. - NumFirst = Tnum(1) -) - -const ( - Gid1 = tester.Tgid(1) -) - -// which shard is a key in? -// please use this function, -// and please do not change it. -func Key2Shard(key string) Tshid { - h := fnv.New32a() - h.Write([]byte(key)) - shard := Tshid(Tshid(h.Sum32()) % NShards) - return shard -} - -// A configuration -- an assignment of shards to groups. -// Please don't change this. -type ShardConfig struct { - Num Tnum // config number - Shards [NShards]tester.Tgid // shard -> gid - Groups map[tester.Tgid][]string // gid -> servers[] -} - -func MakeShardConfig() *ShardConfig { - c := &ShardConfig{ - Groups: make(map[tester.Tgid][]string), - } - return c -} - -func (cfg *ShardConfig) String() string { - b, err := json.Marshal(cfg) - if err != nil { - log.Fatalf("Unmarshall err %v", err) - } - return string(b) -} - -func FromString(s string) *ShardConfig { - scfg := &ShardConfig{} - if err := json.Unmarshal([]byte(s), scfg); err != nil { - log.Fatalf("Unmarshall err %v", err) - } - return scfg -} - -func (cfg *ShardConfig) Copy() *ShardConfig { - c := MakeShardConfig() - c.Num = cfg.Num - c.Shards = cfg.Shards - for k, srvs := range cfg.Groups { - s := make([]string, len(srvs)) - copy(s, srvs) - c.Groups[k] = s - } - return c -} - -// mostgroup, mostn, leastgroup, leastn -func analyze(c *ShardConfig) (tester.Tgid, int, tester.Tgid, int) { - counts := map[tester.Tgid]int{} - for _, g := range c.Shards { - counts[g] += 1 - } - - mn := -1 - var mg tester.Tgid = -1 - ln := 257 - var lg tester.Tgid = -1 - // Enforce deterministic ordering, map iteration - // is randomized in go - groups := make([]tester.Tgid, len(c.Groups)) - i := 0 - for k := range c.Groups { - groups[i] = k - i++ - } - slices.Sort(groups) - for _, g := range groups { - if counts[g] < ln { - ln = counts[g] - lg = g - } - if counts[g] > mn { - mn = counts[g] - mg = g - } - } - - return mg, mn, lg, ln -} - -// return GID of group with least number of -// assigned shards. -func least(c *ShardConfig) tester.Tgid { - _, _, lg, _ := analyze(c) - return lg -} - -// balance assignment of shards to groups. -// modifies c. -func (c *ShardConfig) Rebalance() { - // if no groups, un-assign all shards - if len(c.Groups) < 1 { - for s, _ := range c.Shards { - c.Shards[s] = 0 - } - return - } - - // assign all unassigned shards - for s, g := range c.Shards { - _, ok := c.Groups[g] - if ok == false { - lg := least(c) - c.Shards[s] = lg - } - } - - // move shards from most to least heavily loaded - for { - mg, mn, lg, ln := analyze(c) - if mn < ln+2 { - break - } - // move 1 shard from mg to lg - for s, g := range c.Shards { - if g == mg { - c.Shards[s] = lg - break - } - } - } -} - -func (cfg *ShardConfig) Join(servers map[tester.Tgid][]string) bool { - changed := false - for gid, servers := range servers { - _, ok := cfg.Groups[gid] - if ok { - log.Printf("re-Join %v", gid) - return false - } - for xgid, xservers := range cfg.Groups { - for _, s1 := range xservers { - for _, s2 := range servers { - if s1 == s2 { - log.Fatalf("Join(%v) puts server %v in groups %v and %v", gid, s1, xgid, gid) - } - } - } - } - // new GID - // modify cfg to reflect the Join() - cfg.Groups[gid] = servers - changed = true - } - if changed == false { - log.Fatalf("Join but no change") - } - cfg.Num += 1 - return true -} - -func (cfg *ShardConfig) Leave(gids []tester.Tgid) bool { - changed := false - for _, gid := range gids { - _, ok := cfg.Groups[gid] - if ok == false { - // already no GID! - log.Printf("Leave(%v) but not in config", gid) - return false - } else { - // modify op.Config to reflect the Leave() - delete(cfg.Groups, gid) - changed = true - } - } - if changed == false { - debug.PrintStack() - log.Fatalf("Leave but no change") - } - cfg.Num += 1 - return true -} - -func (cfg *ShardConfig) JoinBalance(servers map[tester.Tgid][]string) bool { - if !cfg.Join(servers) { - return false - } - cfg.Rebalance() - return true -} - -func (cfg *ShardConfig) LeaveBalance(gids []tester.Tgid) bool { - if !cfg.Leave(gids) { - return false - } - cfg.Rebalance() - return true -} - -func (cfg *ShardConfig) GidServers(sh Tshid) (tester.Tgid, []string, bool) { - gid := cfg.Shards[sh] - srvs, ok := cfg.Groups[gid] - return gid, srvs, ok -} - -func (cfg *ShardConfig) IsMember(gid tester.Tgid) bool { - for _, g := range cfg.Shards { - if g == gid { - return true - } - } - return false -} - -func (cfg *ShardConfig) CheckConfig(t *testing.T, groups []tester.Tgid) { - if len(cfg.Groups) != len(groups) { - fatalf(t, "wanted %v groups, got %v", len(groups), len(cfg.Groups)) - } - - // are the groups as expected? - for _, g := range groups { - _, ok := cfg.Groups[g] - if ok != true { - fatalf(t, "missing group %v", g) - } - } - - // any un-allocated shards? - if len(groups) > 0 { - for s, g := range cfg.Shards { - _, ok := cfg.Groups[g] - if ok == false { - fatalf(t, "shard %v -> invalid group %v", s, g) - } - } - } - - // more or less balanced sharding? - counts := map[tester.Tgid]int{} - for _, g := range cfg.Shards { - counts[g] += 1 - } - min := 257 - max := 0 - for g, _ := range cfg.Groups { - if counts[g] > max { - max = counts[g] - } - if counts[g] < min { - min = counts[g] - } - } - if max > min+1 { - fatalf(t, "max %v too much larger than min %v", max, min) - } -} - -func fatalf(t *testing.T, format string, args ...any) { - debug.PrintStack() - t.Fatalf(format, args...) -} diff --git a/src/shardkv1/shardcfg/shardcfg_test.go b/src/shardkv1/shardcfg/shardcfg_test.go deleted file mode 100644 index 4d45204..0000000 --- a/src/shardkv1/shardcfg/shardcfg_test.go +++ /dev/null @@ -1,62 +0,0 @@ -package shardcfg - -import ( - "testing" - - "6.5840/tester1" -) - -func check_same_config(t *testing.T, c1 ShardConfig, c2 ShardConfig) { - if c1.Num != c2.Num { - t.Fatalf("Num wrong") - } - if c1.Shards != c2.Shards { - t.Fatalf("Shards wrong") - } - if len(c1.Groups) != len(c2.Groups) { - t.Fatalf("number of Groups is wrong") - } - for gid, sa := range c1.Groups { - sa1, ok := c2.Groups[gid] - if ok == false || len(sa1) != len(sa) { - t.Fatalf("len(Groups) wrong") - } - if ok && len(sa1) == len(sa) { - for j := 0; j < len(sa); j++ { - if sa[j] != sa1[j] { - t.Fatalf("Groups wrong") - } - } - } - } -} - -func TestBasic(t *testing.T) { - const ( - Gid1 = 1 - Gid2 = 2 - ) - cfg := MakeShardConfig() - cfg.CheckConfig(t, []tester.Tgid{}) - - cfg.JoinBalance(map[tester.Tgid][]string{Gid1: []string{"x", "y", "z"}}) - cfg.CheckConfig(t, []tester.Tgid{Gid1}) - - cfg.JoinBalance(map[tester.Tgid][]string{Gid2: []string{"a", "b", "c"}}) - cfg.CheckConfig(t, []tester.Tgid{Gid1, Gid2}) - - sa1 := cfg.Groups[Gid1] - if len(sa1) != 3 || sa1[0] != "x" || sa1[1] != "y" || sa1[2] != "z" { - t.Fatalf("wrong servers for gid %v: %v\n", Gid1, sa1) - } - sa2 := cfg.Groups[Gid2] - if len(sa2) != 3 || sa2[0] != "a" || sa2[1] != "b" || sa2[2] != "c" { - t.Fatalf("wrong servers for gid %v: %v\n", Gid2, sa2) - } - - cfg.LeaveBalance([]tester.Tgid{Gid1}) - cfg.CheckConfig(t, []tester.Tgid{Gid2}) - - cfg.LeaveBalance([]tester.Tgid{Gid2}) - cfg.CheckConfig(t, []tester.Tgid{}) -} diff --git a/src/shardkv1/shardctrler/param/param.go b/src/shardkv1/shardctrler/param/param.go deleted file mode 100644 index 8df0627..0000000 --- a/src/shardkv1/shardctrler/param/param.go +++ /dev/null @@ -1,7 +0,0 @@ -package param - -const ( - // Length of time that a lease is valid. If a client doesn't - // refresh the lease within this time, the lease will expire. - LEASETIMESEC = 3 -) diff --git a/src/shardkv1/shardctrler/shardctrler.go b/src/shardkv1/shardctrler/shardctrler.go deleted file mode 100644 index 516bdd9..0000000 --- a/src/shardkv1/shardctrler/shardctrler.go +++ /dev/null @@ -1,87 +0,0 @@ -package shardctrler - -// -// Shardctrler with InitConfig, Query, and ChangeConfigTo methods -// - -import ( - - "sync/atomic" - - "6.5840/kvsrv1/rpc" - "6.5840/kvtest1" - "6.5840/shardkv1/kvsrv1" - "6.5840/shardkv1/shardcfg" - "6.5840/tester1" -) - - -// ShardCtrler for the controller and kv clerk. -type ShardCtrler struct { - clnt *tester.Clnt - kvtest.IKVClerk - - killed int32 // set by Kill() - leases bool - - // Your data here. -} - -// Make a ShardCltler, which stores its state in a kvsrv. -func MakeShardCtrler(clnt *tester.Clnt, leases bool) *ShardCtrler { - sck := &ShardCtrler{clnt: clnt, leases: leases} - srv := tester.ServerName(tester.GRP0, 0) - sck.IKVClerk = kvsrv.MakeClerk(clnt, srv) - // Your code here. - return sck -} - -// The tester calls InitController() before starting a new -// controller. In this method you can implement recovery (part B) and -// use a lock to become leader (part C). InitController may fail when -// another controller supersedes (e.g., when this controller is -// partitioned during recovery). -func (sck *ShardCtrler) InitController() rpc.Err { - return rpc.ErrNoKey -} - -// The tester calls ExitController to exit a controller. In part B and -// C, release lock. -func (sck *ShardCtrler) ExitController() { -} - -// Called once by the tester to supply the first configuration. You -// can marshal ShardConfig into a string using shardcfg.String(), and -// then Put it in the kvsrv for the controller at version 0. You can -// pick the key to name the configuration. -func (sck *ShardCtrler) InitConfig(cfg *shardcfg.ShardConfig) { - // Your code here -} - -// Called by the tester to ask the controller to change the -// configuration from the current one to new. It may return an error -// if this controller is disconnected for a while and another -// controller takes over in the mean time, as in part C. -func (sck *ShardCtrler) ChangeConfigTo(new *shardcfg.ShardConfig) rpc.Err { - return rpc.OK -} - -// Tester "kills" shardctrler by calling Kill(). For your -// convenience, we also supply isKilled() method to test killed in -// loops. -func (sck *ShardCtrler) Kill() { - atomic.StoreInt32(&sck.killed, 1) -} - -func (sck *ShardCtrler) isKilled() bool { - z := atomic.LoadInt32(&sck.killed) - return z == 1 -} - - -// Return the current configuration -func (sck *ShardCtrler) Query() (*shardcfg.ShardConfig, rpc.Tversion) { - // Your code here. - return nil, 0 -} - diff --git a/src/shardkv1/shardgrp/client.go b/src/shardkv1/shardgrp/client.go deleted file mode 100644 index e99325e..0000000 --- a/src/shardkv1/shardgrp/client.go +++ /dev/null @@ -1,42 +0,0 @@ -package shardgrp - -import ( - - - "6.5840/kvsrv1/rpc" - "6.5840/shardkv1/shardcfg" - "6.5840/tester1" -) - -type Clerk struct { - clnt *tester.Clnt - servers []string - leader int // last successful leader (index into servers[]) -} - -func MakeClerk(clnt *tester.Clnt, servers []string) *Clerk { - ck := &Clerk{clnt: clnt, servers: servers} - return ck -} - -func (ck *Clerk) Get(key string, n shardcfg.Tnum) (string, rpc.Tversion, rpc.Err) { - // Your code here - return "", 0, "" -} - -func (ck *Clerk) Put(key string, value string, version rpc.Tversion, n shardcfg.Tnum) (bool, rpc.Err) { - // Your code here - return false, "" -} - -func (ck *Clerk) Freeze(s shardcfg.Tshid, num shardcfg.Tnum) ([]byte, rpc.Err) { - return nil, "" -} - -func (ck *Clerk) InstallShard(s shardcfg.Tshid, state []byte, num shardcfg.Tnum) rpc.Err { - return "" -} - -func (ck *Clerk) Delete(s shardcfg.Tshid, num shardcfg.Tnum) rpc.Err { - return "" -} diff --git a/src/shardkv1/shardgrp/server.go b/src/shardkv1/shardgrp/server.go deleted file mode 100644 index c6a8e38..0000000 --- a/src/shardkv1/shardgrp/server.go +++ /dev/null @@ -1,102 +0,0 @@ -package shardgrp - -import ( - "sync/atomic" - - - "6.5840/kvraft1/rsm" - "6.5840/kvsrv1/rpc" - "6.5840/labgob" - "6.5840/labrpc" - "6.5840/shardkv1/shardgrp/shardrpc" - "6.5840/tester1" -) - - -type KVServer struct { - gid tester.Tgid - me int - dead int32 // set by Kill() - rsm *rsm.RSM - frozen bool // for testing purposes - -} - - -func (kv *KVServer) DoOp(req any) any { - // Your code here - return nil -} - - -func (kv *KVServer) Snapshot() []byte { - // Your code here - return nil -} - -func (kv *KVServer) Restore(data []byte) { - // Your code here -} - -func (kv *KVServer) Get(args *shardrpc.GetArgs, reply *rpc.GetReply) { - // Your code here -} - -func (kv *KVServer) Put(args *shardrpc.PutArgs, reply *rpc.PutReply) { - // Your code here -} - -// Freeze the specified shard (i.e., reject future Get/Puts for this -// shard) and return the key/values stored in that shard. -func (kv *KVServer) Freeze(args *shardrpc.FreezeArgs, reply *shardrpc.FreezeReply) { - // Your code here -} - -// Install the supplied state for the specified shard. -func (kv *KVServer) InstallShard(args *shardrpc.InstallShardArgs, reply *shardrpc.InstallShardReply) { - // Your code here -} - -// Delete the specified shard. -func (kv *KVServer) Delete(args *shardrpc.DeleteShardArgs, reply *shardrpc.DeleteShardReply) { - // Your code here -} - -// the tester calls Kill() when a KVServer instance won't -// be needed again. for your convenience, we supply -// code to set rf.dead (without needing a lock), -// and a killed() method to test rf.dead in -// long-running loops. you can also add your own -// code to Kill(). you're not required to do anything -// about this, but it may be convenient (for example) -// to suppress debug output from a Kill()ed instance. -func (kv *KVServer) Kill() { - atomic.StoreInt32(&kv.dead, 1) - // Your code here, if desired. -} - -func (kv *KVServer) killed() bool { - z := atomic.LoadInt32(&kv.dead) - return z == 1 -} - -// StartShardServerGrp starts a server for shardgrp `gid`. -// -// StartShardServerGrp() and MakeRSM() must return quickly, so they should -// start goroutines for any long-running work. -func StartServerShardGrp(servers []*labrpc.ClientEnd, gid tester.Tgid, me int, persister *tester.Persister, maxraftstate int) []tester.IService { - // call labgob.Register on structures you want - // Go's RPC library to marshall/unmarshall. - labgob.Register(shardrpc.PutArgs{}) - labgob.Register(shardrpc.GetArgs{}) - labgob.Register(shardrpc.FreezeArgs{}) - labgob.Register(shardrpc.InstallShardArgs{}) - labgob.Register(shardrpc.DeleteShardArgs{}) - labgob.Register(rsm.Op{}) - - kv := &KVServer{gid: gid, me: me} - kv.rsm = rsm.MakeRSM(servers, me, persister, maxraftstate, kv) - - // Your code here - return []tester.IService{kv, kv.rsm.Raft()} -} diff --git a/src/shardkv1/shardgrp/shardrpc/shardrpc.go b/src/shardkv1/shardgrp/shardrpc/shardrpc.go deleted file mode 100644 index 03f83a1..0000000 --- a/src/shardkv1/shardgrp/shardrpc/shardrpc.go +++ /dev/null @@ -1,50 +0,0 @@ -package shardrpc - -import ( - "6.5840/kvsrv1/rpc" - "6.5840/shardkv1/shardcfg" -) - -// Same as Put in kvsrv1/rpc, but with a configuration number -type PutArgs struct { - Key string - Value string - Version rpc.Tversion - Num shardcfg.Tnum -} - -// Same as Get in kvsrv1/rpc, but with a configuration number. -type GetArgs struct { - Key string - Num shardcfg.Tnum -} - -type FreezeArgs struct { - Shard shardcfg.Tshid - Num shardcfg.Tnum -} - -type FreezeReply struct { - State []byte - Num shardcfg.Tnum - Err rpc.Err -} - -type InstallShardArgs struct { - Shard shardcfg.Tshid - State []byte - Num shardcfg.Tnum -} - -type InstallShardReply struct { - Err rpc.Err -} - -type DeleteShardArgs struct { - Shard shardcfg.Tshid - Num shardcfg.Tnum -} - -type DeleteShardReply struct { - Err rpc.Err -} diff --git a/src/shardkv1/shardkv_test.go b/src/shardkv1/shardkv_test.go deleted file mode 100644 index 74eac41..0000000 --- a/src/shardkv1/shardkv_test.go +++ /dev/null @@ -1,818 +0,0 @@ -package shardkv - -import ( - "log" - "testing" - "time" - - "6.5840/kvsrv1/rpc" - "6.5840/kvtest1" - "6.5840/shardkv1/shardcfg" - "6.5840/shardkv1/shardctrler" - "6.5840/shardkv1/shardctrler/param" - "6.5840/tester1" -) - -const ( - NGRP = 8 - NKEYS = 5 * shardcfg.NShards -) - -// Test shard controller's Init and Query with a key/value server from -// kvsrv1 lab. -func TestInitQuery5A(t *testing.T) { - - // MakeTest starts a key/value server using `kvsrv.StartKVServer`, - // which is defined in shardkv1/kvsrv1. - ts := MakeTest(t, "Test (5A): Init and Query ...", true) - defer ts.Cleanup() - - // Make a shard controller - sck := shardctrler.MakeShardCtrler(ts.Config.MakeClient(), ts.leases) - - // Make an empty shard configuration - scfg := shardcfg.MakeShardConfig() - - // Compute a new shard configuration as if `shardcfg.Gid1` joins the cluster, - // assigning all shards to `shardcfg.Gid1`. - scfg.JoinBalance(map[tester.Tgid][]string{shardcfg.Gid1: []string{"xxx"}}) - - // Invoke the controller to initialize to store the first configuration - sck.InitConfig(scfg) - - // Read the initial configuration and check it - cfg, v := sck.Query() - if v != 1 || cfg.Num != 1 || cfg.Shards[0] != shardcfg.Gid1 { - ts.t.Fatalf("Static wrong %v %v", cfg, v) - } - cfg.CheckConfig(t, []tester.Tgid{shardcfg.Gid1}) -} - -// Test shardkv clerk's Get/Put with 1 shardgrp (without reconfiguration) -func TestStaticOneShardGroup5A(t *testing.T) { - ts := MakeTest(t, "Test (5A): one shard group ...", true) - defer ts.Cleanup() - - // The tester's setupKVService() sets up a kvsrv for the - // controller to store configurations and calls the controller's - // Init() method to create the first configuration with 1 - // shardgrp. - ts.setupKVService() - - ck := ts.MakeClerk() // make a shardkv clerk - ka, va := ts.SpreadPuts(ck, NKEYS) // do some puts - n := len(ka) - for i := 0; i < n; i++ { - ts.CheckGet(ck, ka[i], va[i], rpc.Tversion(1)) // check the puts - } - - // disconnect raft leader of shardgrp and check that keys are - // still avaialable - ts.disconnectClntFromLeader(ck.(*kvtest.TestClerk).Clnt, shardcfg.Gid1) - - for i := 0; i < n; i++ { - ts.CheckGet(ck, ka[i], va[i], rpc.Tversion(1)) // check the puts - } -} - -// test shardctrler's join, which adds a new group Gid2 and must move -// shards to the new group and the old group should reject Get/Puts on -// shards that moved. -func TestJoinBasic5A(t *testing.T) { - ts := MakeTest(t, "Test (5A): a group joins...", true) - defer ts.Cleanup() - - gid1 := ts.setupKVService() - ck := ts.MakeClerk() - ka, va := ts.SpreadPuts(ck, NKEYS) - - sck := ts.ShardCtrler() - cfg, _ := sck.Query() - - gid2 := ts.newGid() - err := ts.joinGroups(sck, []tester.Tgid{gid2}) - if err != rpc.OK { - ts.t.Fatalf("joinGroups: err %v", err) - } - - cfg1, _ := sck.Query() - if cfg.Num+1 != cfg1.Num { - ts.t.Fatalf("wrong num %d expected %d ", cfg1.Num, cfg.Num+1) - } - - if !cfg1.IsMember(gid2) { - ts.t.Fatalf("%d isn't a member of %v", gid2, cfg1) - } - - // check shards at shardcfg.Gid2 - ts.checkShutdownSharding(gid1, gid2, ka, va) - - for i := 0; i < len(ka); i++ { - ts.CheckGet(ck, ka[i], va[i], rpc.Tversion(1)) - } - - // check shards at shardcfg.Gid1 - ts.checkShutdownSharding(gid2, gid1, ka, va) - - for i := 0; i < len(ka); i++ { - ts.CheckGet(ck, ka[i], va[i], rpc.Tversion(1)) - } -} - -// test shardgrps delete moved shards -func TestDeleteBasic5A(t *testing.T) { - const ( - MAXRAFTSTATE = 1000 - VALUESIZE = 10000 - ) - - ts := MakeTestMaxRaft(t, "Test (5A): delete ...", true, false, VALUESIZE) - defer ts.Cleanup() - - gid1 := ts.setupKVService() - ck := ts.MakeClerk() - - ka, va := ts.SpreadPutsSize(ck, NKEYS, MAXRAFTSTATE) - - sz := ts.Group(gid1).SnapshotSize() - - sck := ts.ShardCtrler() - gid2 := ts.newGid() - err := ts.joinGroups(sck, []tester.Tgid{gid2}) - if err != rpc.OK { - ts.t.Fatalf("joinGroups: err %v", err) - } - - // push more Get's through so that all peers snapshot - for j := 0; j < 5; j++ { - for i := 0; i < len(ka); i++ { - ts.CheckGet(ck, ka[i], va[i], rpc.Tversion(1)) - } - } - sz1 := ts.Group(gid1).SnapshotSize() - sz2 := ts.Group(gid2).SnapshotSize() - if sz1+sz2 > sz+10000 { - ts.t.Fatalf("gid1 %d + gid2 %d = %d use too much space %d", sz1, sz2, sz1+sz2, sz) - } -} - -// test shardctrler's leave -func TestJoinLeaveBasic5A(t *testing.T) { - ts := MakeTest(t, "Test (5A): basic groups join/leave ...", true) - defer ts.Cleanup() - - gid1 := ts.setupKVService() - ck := ts.MakeClerk() - ka, va := ts.SpreadPuts(ck, NKEYS) - - sck := ts.ShardCtrler() - gid2 := ts.newGid() - err := ts.joinGroups(sck, []tester.Tgid{gid2}) - if err != rpc.OK { - ts.t.Fatalf("joinGroups: err %v", err) - } - - // check shards at shardcfg.Gid2 - ts.checkShutdownSharding(gid1, gid2, ka, va) - - for i := 0; i < len(ka); i++ { - ts.CheckGet(ck, ka[i], va[i], rpc.Tversion(1)) - } - - err = ts.leave(sck, shardcfg.Gid1) - if err != rpc.OK { - ts.t.Fatalf("Leave: err %v", err) - } - cfg, _ := sck.Query() - if cfg.IsMember(shardcfg.Gid1) { - ts.t.Fatalf("%d is a member of %v", shardcfg.Gid1, cfg) - } - - ts.Group(shardcfg.Gid1).Shutdown() - - for i := 0; i < len(ka); i++ { - ts.CheckGet(ck, ka[i], va[i], rpc.Tversion(1)) - } - - // bring the crashed shard/group back to life. - ts.Group(shardcfg.Gid1).StartServers() - - // Rejoin - ts.join(sck, shardcfg.Gid1, ts.Group(shardcfg.Gid1).SrvNames()) - - for i := 0; i < len(ka); i++ { - ts.CheckGet(ck, ka[i], va[i], rpc.Tversion(1)) - } - - // check shards at shardcfg.Gid2 - ts.checkShutdownSharding(gid2, gid1, ka, va) -} - -// test many groups joining and leaving, reliable or unreliable -func joinLeave5A(t *testing.T, reliable bool, part string) { - ts := MakeTest(t, "Test (5A): many groups join/leave ...", reliable) - defer ts.Cleanup() - - ts.setupKVService() - ck := ts.MakeClerk() - ka, va := ts.SpreadPuts(ck, NKEYS) - - sck := ts.ShardCtrler() - grps := ts.groups(NGRP) - - ts.joinGroups(sck, grps) - - ts.checkShutdownSharding(grps[0], grps[1], ka, va) - - for i := 0; i < len(ka); i++ { - ts.CheckGet(ck, ka[i], va[i], rpc.Tversion(1)) - } - - ts.leaveGroups(sck, grps) - - for i := 0; i < len(ka); i++ { - ts.CheckGet(ck, ka[i], va[i], rpc.Tversion(1)) - } -} - -func TestManyJoinLeaveReliable5A(t *testing.T) { - joinLeave5A(t, true, "Test (5A): many groups join/leave reliable...") -} - -func TestManyJoinLeaveUnreliable5A(t *testing.T) { - joinLeave5A(t, false, "Test (5A): many groups join/leave unreliable...") -} - -// Test recovery from complete shutdown -func TestShutdown5A(t *testing.T) { - const NJOIN = 2 - const NGRP = 2 + NJOIN - - ts := MakeTest(t, "Test (5A): shutdown ...", true) - defer ts.Cleanup() - - ts.setupKVService() - - ck := ts.MakeClerk() - ka, va := ts.SpreadPuts(ck, NKEYS) - - sck := ts.ShardCtrler() - grps := ts.groups(NJOIN) - ts.joinGroups(sck, grps) - - ts.checkShutdownSharding(grps[0], grps[1], ka, va) - - for i := 0; i < len(ka); i++ { - ts.CheckGet(ck, ka[i], va[i], rpc.Tversion(1)) - } - - for i := shardcfg.Gid1; i < NGRP; i++ { - ts.Group(i).Shutdown() - } - - for i := shardcfg.Gid1; i < NGRP; i++ { - ts.Group(i).StartServers() - } - - for i := 0; i < len(ka); i++ { - ts.CheckGet(ck, ka[i], va[i], rpc.Tversion(1)) - } -} - -// Test that Gets for keys at groups that are alive -// return -func TestProgressShutdown(t *testing.T) { - const ( - NJOIN = 4 - NSEC = 2 - ) - - ts := MakeTest(t, "Test (5A): progress ...", true) - defer ts.Cleanup() - - ts.setupKVService() - - ck := ts.MakeClerk() - ka, va := ts.SpreadPuts(ck, NKEYS) - - sck := ts.ShardCtrler() - grps := ts.groups(NJOIN) - ts.joinGroups(sck, grps) - - end := 2 - for _, g := range grps[0:2] { - //log.Printf("shutdown %d", g) - ts.Group(g).Shutdown() - } - - alive := make(map[tester.Tgid]bool) - for _, g := range grps[end:] { - alive[g] = true - } - - cfg, _ := sck.Query() - - ch := make(chan rpc.Err) - go func() { - for i := 0; i < len(ka); i++ { - s := shardcfg.Key2Shard(ka[i]) - g := cfg.Shards[s] - if _, ok := alive[g]; ok { - //log.Printf("key lookup %v(%d) gid %d", ka[i], s, g) - ts.CheckGet(ck, ka[i], va[i], rpc.Tversion(1)) - } - } - ch <- rpc.OK - }() - - select { - case <-ch: - case <-time.After(NSEC * time.Second): - ts.Fatalf("Gets didn't finish") - } -} - -// Test that Gets from a non-moving shard return quickly -func TestProgressJoin(t *testing.T) { - const ( - NJOIN = 4 - NSEC = 4 - NCNT = 100 - ) - - ts := MakeTest(t, "Test (5A): progress ...", true) - defer ts.Cleanup() - - ts.setupKVService() - - ck := ts.MakeClerk() - ka, va := ts.SpreadPuts(ck, NKEYS) - - sck := ts.ShardCtrler() - grps := ts.groups(NJOIN) - ts.joinGroups(sck, grps) - - cfg, _ := sck.Query() - newcfg := cfg.Copy() - newgid := tester.Tgid(NJOIN + 3) - if ok := newcfg.JoinBalance(map[tester.Tgid][]string{newgid: []string{"xxx"}}); !ok { - t.Fatalf("JoinBalance failed") - } - newcfg1 := newcfg.Copy() - if ok := newcfg1.LeaveBalance([]tester.Tgid{newgid}); !ok { - t.Fatalf("JoinBalance failed") - } - - // compute which shards don't move and which groups are involved - // in moving shards - stable := make(map[shardcfg.Tshid]bool) - participating := make(map[tester.Tgid]bool) - for i, g := range newcfg1.Shards { - if newcfg.Shards[i] == g { - stable[shardcfg.Tshid(i)] = true - } else { - participating[g] = true - } - } - - //log.Printf("groups participating %v stable %v", participating, stable) - //log.Printf("\ncfg %v\n %v\n %v", cfg.Shards, newcfg.Shards, newcfg1.Shards) - - ch0 := make(chan rpc.Err) - go func() { - for true { - select { - case <-ch0: - return - default: - //log.Printf("join/leave %v", newgid) - if err := ts.joinGroups(sck, []tester.Tgid{newgid}); err != rpc.OK { - t.Fatalf("joined err %v", err) - } - if err := ts.leaveGroups(sck, []tester.Tgid{newgid}); err != rpc.OK { - t.Fatalf("leave err %v", err) - } - } - } - }() - - ch1 := make(chan int) - go func() { - // get the keys that are on groups that are involved in the - // join but not in the shards that are moving - t := time.Now().Add(NSEC * time.Second) - nget := 0 - for time.Now().Before(t) { - for i := 0; i < len(ka); i++ { - s := shardcfg.Key2Shard(ka[i]) - if _, ok := stable[s]; ok { - g := newcfg1.Shards[s] - if _, ok := participating[g]; ok { - // log.Printf("key lookup %v(%d) gid %d", ka[i], s, g) - ts.CheckGet(ck, ka[i], va[i], rpc.Tversion(1)) - nget++ - } - } - } - } - ch1 <- nget - }() - - select { - case cnt := <-ch1: - log.Printf("cnt %d", cnt) - if cnt < NCNT { - ts.Fatalf("Two few gets finished %d; expected more than %d", cnt, NCNT) - } - - case <-time.After(2 * NSEC * time.Second): - ts.Fatalf("Gets didn't finish") - } - ch0 <- rpc.OK -} - -// Test linearizability with groups joining/leaving and `nclnt` -// concurrent clerks put/get's in `unreliable` net. -func concurrentClerk(t *testing.T, nclnt int, reliable bool, part string) { - const ( - NSEC = 20 - ) - - ts := MakeTest(t, part, reliable) - defer ts.Cleanup() - - ts.setupKVService() - - ka := kvtest.MakeKeys(NKEYS) - ch := make(chan []kvtest.ClntRes) - - go func(ch chan []kvtest.ClntRes) { - rs := ts.SpawnClientsAndWait(nclnt, NSEC*time.Second, func(me int, ck kvtest.IKVClerk, done chan struct{}) kvtest.ClntRes { - return ts.OneClientPut(me, ck, ka, done) - }) - ch <- rs - }(ch) - - sck := ts.ShardCtrler() - grps := ts.groups(NGRP) - if err := ts.joinGroups(sck, grps); err != rpc.OK { - t.Fatalf("joinGroups err %v", err) - } - - if err := ts.leaveGroups(sck, grps); err != rpc.OK { - t.Fatalf("leaveGroups err %v", err) - } - - <-ch - - ts.CheckPorcupine() -} - -// Test linearizability with groups joining/leaving and 1 concurrent clerks put/get's -func TestOneConcurrentClerkReliable5A(t *testing.T) { - concurrentClerk(t, 1, true, "Test (5A): one concurrent clerk reliable...") -} - -// Test linearizability with groups joining/leaving and many concurrent clerks put/get's -func TestManyConcurrentClerkReliable5A(t *testing.T) { - const NCLNT = 10 - concurrentClerk(t, NCLNT, true, "Test (5A): many concurrent clerks reliable...") -} - -// Test linearizability with groups joining/leaving and 1 concurrent clerks put/get's -func TestOneConcurrentClerkUnreliable5A(t *testing.T) { - concurrentClerk(t, 1, false, "Test (5A): one concurrent clerk unreliable ...") -} - -// Test linearizability with groups joining/leaving and many concurrent clerks put/get's -func TestManyConcurrentClerkUnreliable5A(t *testing.T) { - const NCLNT = 10 - concurrentClerk(t, NCLNT, false, "Test (5A): many concurrent clerks unreliable...") -} - -// Test if join/leave complete even if shardgrp is down for a while, but -// don't complete while the shardgrp is down. -func TestJoinLeave5B(t *testing.T) { - const NSEC = 2 - - ts := MakeTest(t, "Test (5B): Join/leave while a shardgrp is down...", true) - defer ts.Cleanup() - - gid1 := ts.setupKVService() - ck := ts.MakeClerk() - ka, va := ts.SpreadPuts(ck, NKEYS) - - sck := ts.ShardCtrler() - cfg, _ := sck.Query() - - ts.Group(gid1).Shutdown() - - gid2 := ts.newGid() - ch := make(chan rpc.Err) - go func() { - err := ts.joinGroups(sck, []tester.Tgid{gid2}) - ch <- err - }() - - select { - case err := <-ch: - ts.Fatalf("Join finished %v", err) - case <-time.After(1 * NSEC): - // Give Join some time to try to join - } - - // Now join should be able to finish - ts.Group(gid1).StartServers() - - select { - case err := <-ch: - if err != rpc.OK { - ts.Fatalf("Join returns err %v", err) - } - case <-time.After(time.Second * NSEC): - ts.Fatalf("Join didn't complete") - } - - cfg1, _ := sck.Query() - if cfg.Num+1 != cfg1.Num { - ts.t.Fatalf("wrong num %d expected %d ", cfg1.Num, cfg.Num+1) - } - - ts.Group(gid2).Shutdown() - - ch = make(chan rpc.Err) - go func() { - err := ts.leave(sck, shardcfg.Gid1) - ch <- err - }() - - select { - case err := <-ch: - ts.Fatalf("Leave finished %v", err) - case <-time.After(NSEC * time.Second): - // Give give some time to try to join - } - - // Now leave should be able to finish - ts.Group(gid2).StartServers() - - select { - case err := <-ch: - if err != rpc.OK { - ts.Fatalf("Leave returns err %v", err) - } - case <-time.After(time.Second * NSEC): - ts.Fatalf("Leave didn't complete") - } - for i := 0; i < len(ka); i++ { - ts.CheckGet(ck, ka[i], va[i], rpc.Tversion(1)) - } -} - -// test recovery of partitioned controlers -func TestRecoverCtrler5B(t *testing.T) { - const ( - NPARTITION = 5 - ) - - ts := MakeTest(t, "Test (5B): recover controler ...", true) - defer ts.Cleanup() - - gid := ts.setupKVService() - ck := ts.MakeClerk() - ka, va := ts.SpreadPuts(ck, NKEYS) - - for i := 0; i < NPARTITION; i++ { - ts.killCtrler(ck, gid, ka, va) - } -} - -// Test concurrent ctrlers fighting for leadership reliable -func TestAcquireLockConcurrentReliable5C(t *testing.T) { - ts := MakeTestLeases(t, "Test (5C): Concurent ctrlers acquiring leadership ...", true) - defer ts.Cleanup() - ts.setupKVService() - ck := ts.MakeClerk() - ka, va := ts.SpreadPuts(ck, NKEYS) - ts.electCtrler(ck, ka, va) -} - -// Test concurrent ctrlers fighting for leadership unreliable -func TestAcquireLockConcurrentUnreliable5C(t *testing.T) { - ts := MakeTestLeases(t, "Test (5C): Concurent ctrlers acquiring leadership ...", false) - defer ts.Cleanup() - ts.setupKVService() - ck := ts.MakeClerk() - ka, va := ts.SpreadPuts(ck, NKEYS) - ts.electCtrler(ck, ka, va) -} - -// Test that ReleaseLock allows a new leader to start quickly -func TestLeaseBasicRelease5C(t *testing.T) { - ts := MakeTestLeases(t, "Test (5C): release lease ...", true) - defer ts.Cleanup() - ts.setupKVService() - - sck0, clnt0 := ts.makeShardCtrlerClnt() - go func() { - if err := sck0.InitController(); err != rpc.OK { - t.Fatalf("failed to init controller %v", err) - } - time.Sleep(200 * time.Millisecond) - sck0.ExitController() - }() - - time.Sleep(10 * time.Millisecond) - - // start new controller - sck1, clnt1 := ts.makeShardCtrlerClnt() - ch := make(chan struct{}) - go func() { - if err := sck1.InitController(); err != rpc.OK { - t.Fatalf("failed to init controller %v", err) - } - time.Sleep(200 * time.Millisecond) - sck1.ExitController() - ch <- struct{}{} - }() - - select { - case <-time.After(1 * time.Second): - ts.Fatalf("Release didn't give up leadership") - case <-ch: - } - - ts.Config.DeleteClient(clnt0) - ts.Config.DeleteClient(clnt1) -} - -// Test lease expiring -func TestLeaseBasicExpire5C(t *testing.T) { - ts := MakeTestLeases(t, "Test (5C): lease expiring ...", true) - defer ts.Cleanup() - ts.setupKVService() - - sck0, clnt0 := ts.makeShardCtrlerClnt() - go func() { - if err := sck0.InitController(); err != rpc.OK { - t.Fatalf("failed to init controller %v", err) - } - for { - time.Sleep(10 * time.Millisecond) - } - }() - - time.Sleep(100 * time.Millisecond) - - // partition sck0 forever - clnt0.DisconnectAll() - - // start new controller - sck1, clnt1 := ts.makeShardCtrlerClnt() - ch := make(chan struct{}) - go func() { - if err := sck1.InitController(); err != rpc.OK { - t.Fatalf("failed to init controller %v", err) - } - time.Sleep(100 * time.Millisecond) - sck1.ExitController() - ch <- struct{}{} - }() - - select { - case <-time.After((param.LEASETIMESEC + 1) * time.Second): - ts.Fatalf("Lease didn't expire") - case <-ch: - } - - ts.Config.DeleteClient(clnt0) - ts.Config.DeleteClient(clnt1) -} - -// Test lease is being extended -func TestLeaseBasicRefresh5C(t *testing.T) { - const LEADERSEC = 3 - - ts := MakeTestLeases(t, "Test (5C): lease refresh ...", true) - defer ts.Cleanup() - ts.setupKVService() - - sck0, clnt0 := ts.makeShardCtrlerClnt() - go func() { - if err := sck0.InitController(); err != rpc.OK { - t.Fatalf("failed to init controller %v", err) - } - time.Sleep(LEADERSEC * param.LEASETIMESEC * time.Second) - sck0.ExitController() - }() - - // give sck0 time to become leader - time.Sleep(100 * time.Millisecond) - - // start new controller - sck1, clnt1 := ts.makeShardCtrlerClnt() - ch := make(chan struct{}) - go func() { - if err := sck1.InitController(); err != rpc.OK { - t.Fatalf("failed to init controller %v", err) - } - time.Sleep(100 * time.Millisecond) - sck1.ExitController() - ch <- struct{}{} - }() - - select { - case <-time.After((LEADERSEC + param.LEASETIMESEC + 1) * time.Second): - case <-ch: - ts.Fatalf("Lease not refreshed") - } - - ts.Config.DeleteClient(clnt0) - ts.Config.DeleteClient(clnt1) -} - -// Test if old leader is fenced off when reconnecting while it is in -// the middle of a Join. -func TestPartitionControlerJoin5C(t *testing.T) { - const ( - NSLEEP = 2 - RAND = 1000 - ) - - ts := MakeTestLeases(t, "Test (5C): partition controller in join...", true) - defer ts.Cleanup() - ts.setupKVService() - ck := ts.MakeClerk() - ka, va := ts.SpreadPuts(ck, NKEYS) - - sck, clnt := ts.makeShardCtrlerClnt() - if err := sck.InitController(); err != rpc.OK { - ts.Fatalf("failed to init controller %v", err) - } - - ch := make(chan rpc.Err) - ngid := tester.Tgid(0) - go func() { - ngid = ts.newGid() - ts.Config.MakeGroupStart(ngid, NSRV, ts.StartServerShardGrp) - ts.Group(ngid).Shutdown() - ch <- ts.join(sck, ngid, ts.Group(ngid).SrvNames()) - }() - - // sleep for a while to get the chance for the controler to get stuck - // in join or leave, because gid is down - time.Sleep(1 * time.Second) - - // partition sck - clnt.DisconnectAll() - - // wait until sck's lease expired before restarting shardgrp `ngid` - time.Sleep((param.LEASETIMESEC + 1) * time.Second) - - ts.Group(ngid).StartServers() - - // start new controler to supersede partitioned one, - // it will also be stuck - sck0 := ts.makeShardCtrler() - if err := sck0.InitController(); err != rpc.OK { - t.Fatalf("failed to init controller %v", err) - } - - sck0.ExitController() - - //log.Printf("reconnect") - - // reconnect old controller, which shouldn't be able - // to do anything - clnt.ConnectAll() - - err := <-ch - if err == rpc.OK { - t.Fatalf("Old leader succeeded %v", err) - } - - time.Sleep(1 * time.Second) - - for i := 0; i < len(ka); i++ { - ts.CheckGet(ck, ka[i], va[i], rpc.Tversion(1)) - } -} - -// Make a leader controller loses its leadership during join/leave and -// test if the next controller recovers correctly. -func TestPartitionRecovery5C(t *testing.T) { - const ( - // NPARTITION = 10 - NPARTITION = 5 - ) - - ts := MakeTestLeases(t, "Test (5C): controllers with leased leadership ...", true) - defer ts.Cleanup() - gid := ts.setupKVService() - ck := ts.MakeClerk() - ka, va := ts.SpreadPuts(ck, NKEYS) - - for i := 0; i < NPARTITION; i++ { - ts.killCtrler(ck, gid, ka, va) - } -} diff --git a/src/shardkv1/test.go b/src/shardkv1/test.go deleted file mode 100644 index f37ee5c..0000000 --- a/src/shardkv1/test.go +++ /dev/null @@ -1,417 +0,0 @@ -package shardkv - -import ( - "fmt" - //"log" - "math/rand" - "sync" - "sync/atomic" - "testing" - "time" - - "6.5840/kvraft1/rsm" - "6.5840/kvsrv1/rpc" - "6.5840/kvtest1" - "6.5840/labrpc" - "6.5840/shardkv1/kvsrv1" - "6.5840/shardkv1/shardcfg" - "6.5840/shardkv1/shardctrler" - "6.5840/shardkv1/shardctrler/param" - "6.5840/shardkv1/shardgrp" - "6.5840/tester1" -) - -type Test struct { - t *testing.T - *kvtest.Test - - sck *shardctrler.ShardCtrler - part string - leases bool - - maxraftstate int - mu sync.Mutex - ngid tester.Tgid -} - -const ( - Controler = tester.Tgid(0) // controler uses group 0 for a kvraft group - NSRV = 3 // servers per group - INTERGRPDELAY = 200 // time in ms between group changes -) - -// Setup kvserver for the shard controller and make the controller -func MakeTestMaxRaft(t *testing.T, part string, reliable, leases bool, maxraftstate int) *Test { - ts := &Test{ - ngid: shardcfg.Gid1 + 1, // Gid1 is in use - t: t, - leases: leases, - maxraftstate: maxraftstate, - } - cfg := tester.MakeConfig(t, 1, reliable, kvsrv.StartKVServer) - ts.Test = kvtest.MakeTest(t, cfg, false, ts) - ts.Begin(part) - return ts -} - -func MakeTest(t *testing.T, part string, reliable bool) *Test { - return MakeTestMaxRaft(t, part, reliable, false, -1) -} - -func MakeTestLeases(t *testing.T, part string, reliable bool) *Test { - return MakeTestMaxRaft(t, part, reliable, true, -1) -} - -func (ts *Test) MakeClerk() kvtest.IKVClerk { - clnt := ts.Config.MakeClient() - ck := MakeClerk(clnt, ts.makeShardCtrler()) - return &kvtest.TestClerk{ck, clnt} -} - -func (ts *Test) DeleteClerk(ck kvtest.IKVClerk) { - tck := ck.(*kvtest.TestClerk) - ts.DeleteClient(tck.Clnt) -} - -func (ts *Test) ShardCtrler() *shardctrler.ShardCtrler { - return ts.sck -} - -func (ts *Test) makeShardCtrler() *shardctrler.ShardCtrler { - ck, _ := ts.makeShardCtrlerClnt() - return ck -} - -func (ts *Test) makeShardCtrlerClnt() (*shardctrler.ShardCtrler, *tester.Clnt) { - clnt := ts.Config.MakeClient() - return shardctrler.MakeShardCtrler(clnt, ts.leases), clnt -} - -func (ts *Test) makeKVClerk() *kvsrv.Clerk { - srv := tester.ServerName(tester.GRP0, 0) - clnt := ts.Config.MakeClient() - return kvsrv.MakeClerk(clnt, srv).(*kvsrv.Clerk) -} - -func (ts *Test) newGid() tester.Tgid { - ts.mu.Lock() - defer ts.mu.Unlock() - - gid := ts.ngid - ts.ngid += 1 - return gid -} - -func (ts *Test) groups(n int) []tester.Tgid { - grps := make([]tester.Tgid, n) - for i := 0; i < n; i++ { - grps[i] = ts.newGid() - } - return grps -} - -// Set up KVServervice with one group Gid1. Gid1 should initialize itself to -// own all shards. -func (ts *Test) setupKVService() tester.Tgid { - ts.sck = ts.makeShardCtrler() - scfg := shardcfg.MakeShardConfig() - ts.Config.MakeGroupStart(shardcfg.Gid1, NSRV, ts.StartServerShardGrp) - scfg.JoinBalance(map[tester.Tgid][]string{shardcfg.Gid1: ts.Group(shardcfg.Gid1).SrvNames()}) - ts.sck.InitConfig(scfg) - return shardcfg.Gid1 -} - -func (ts *Test) StartServerShardGrp(servers []*labrpc.ClientEnd, gid tester.Tgid, me int, persister *tester.Persister) []tester.IService { - return shardgrp.StartServerShardGrp(servers, gid, me, persister, ts.maxraftstate) -} - -// Add group gid -func (ts *Test) join(sck *shardctrler.ShardCtrler, gid tester.Tgid, srvs []string) rpc.Err { - cfg, _ := sck.Query() - newcfg := cfg.Copy() - ok := newcfg.JoinBalance(map[tester.Tgid][]string{gid: srvs}) - if !ok { - return rpc.ErrVersion - } - err := sck.ChangeConfigTo(newcfg) - return err -} - -func (ts *Test) joinGroups(sck *shardctrler.ShardCtrler, gids []tester.Tgid) rpc.Err { - for i, gid := range gids { - ts.Config.MakeGroupStart(gid, NSRV, ts.StartServerShardGrp) - if err := ts.join(sck, gid, ts.Group(gid).SrvNames()); err != rpc.OK { - return err - } - if i < len(gids)-1 { - time.Sleep(INTERGRPDELAY * time.Millisecond) - } - } - return rpc.OK -} - -// Group gid leaves. -func (ts *Test) leave(sck *shardctrler.ShardCtrler, gid tester.Tgid) rpc.Err { - cfg, _ := sck.Query() - newcfg := cfg.Copy() - ok := newcfg.LeaveBalance([]tester.Tgid{gid}) - if !ok { - return rpc.ErrVersion - } - return sck.ChangeConfigTo(newcfg) -} - -func (ts *Test) leaveGroups(sck *shardctrler.ShardCtrler, gids []tester.Tgid) rpc.Err { - for i, gid := range gids { - if err := ts.leave(sck, gid); err != rpc.OK { - return err - } - ts.Config.ExitGroup(gid) - if i < len(gids)-1 { - time.Sleep(INTERGRPDELAY * time.Millisecond) - } - } - return rpc.OK -} - -func (ts *Test) disconnectRaftLeader(gid tester.Tgid) (int, string) { - _, l := rsm.Leader(ts.Config, gid) - g := ts.Group(gid) - ln := g.SrvName(l) - g.DisconnectAll(l) - return l, ln -} - -func (ts *Test) reconnectOldLeader(gid tester.Tgid, l int) { - g := ts.Group(gid) - g.ConnectOne(l) -} - -func (ts *Test) disconnectClntFromLeader(clnt *tester.Clnt, gid tester.Tgid) int { - l, ln := ts.disconnectRaftLeader(gid) - p := ts.Group(gid).AllowServersExcept(l) - srvs := ts.Group(gid).SrvNamesTo(p) - clnt.Disconnect(ln) - clnt.ConnectTo(srvs) - return l -} - -func (ts *Test) checkLogs(gids []tester.Tgid) { - for _, gid := range gids { - n := ts.Group(gid).LogSize() - s := ts.Group(gid).SnapshotSize() - if ts.maxraftstate >= 0 && n > 8*ts.maxraftstate { - ts.t.Fatalf("persister.RaftStateSize() %v, but maxraftstate %v", - n, ts.maxraftstate) - } - if ts.maxraftstate < 0 && s > 0 { - ts.t.Fatalf("maxraftstate is -1, but snapshot is non-empty!") - } - - } -} - -// make sure that the data really is sharded by -// shutting down one shard and checking that some -// Get()s don't succeed. -func (ts *Test) checkShutdownSharding(down, up tester.Tgid, ka []string, va []string) { - const NSEC = 2 - - ts.Group(down).Shutdown() - - ts.checkLogs([]tester.Tgid{down, up}) // forbid snapshots - - n := len(ka) - ch := make(chan string) - done := int32(0) - for xi := 0; xi < n; xi++ { - ck1 := ts.MakeClerk() - go func(i int) { - v, _, _ := ck1.Get(ka[i]) - if atomic.LoadInt32(&done) == 1 { - return - } - if v != va[i] { - ch <- fmt.Sprintf("Get(%v): expected:\n%v\nreceived:\n%v", ka[i], va[i], v) - } else { - ch <- "" - } - }(xi) - } - - // wait a bit, only about half the Gets should succeed. - ndone := 0 - for atomic.LoadInt32(&done) != 1 { - select { - case err := <-ch: - if err != "" { - ts.Fatalf(err) - } - ndone += 1 - case <-time.After(time.Second * NSEC): - atomic.StoreInt32(&done, 1) - break - } - } - - //log.Printf("%d completions out of %d; down %d", ndone, n, down) - if ndone >= n { - ts.Fatalf("expected less than %d completions with one shard dead\n", n) - } - - // bring the crashed shard/group back to life. - ts.Group(down).StartServers() -} - -// Run one controler and then partition it after some time. Run -// another cntrler that must finish the first ctrler's unfinished -// shard moves. To ensure first ctrler is in a join/leave the test -// shuts down shardgrp `gid`. After the second controller is done, -// heal the partition to test if Freeze,InstallShard, and Delete are -// are fenced. -func (ts *Test) killCtrler(ck kvtest.IKVClerk, gid tester.Tgid, ka, va []string) { - const ( - NSLEEP = 2 - - RAND = 1000 - - JOIN = 1 - LEAVE = 2 - ) - - sck, clnt := ts.makeShardCtrlerClnt() - if err := sck.InitController(); err != rpc.OK { - ts.Fatalf("failed to init controller %v", err) - } - - cfg, _ := ts.ShardCtrler().Query() - num := cfg.Num - - state := 0 - ngid := tester.Tgid(0) - go func() { - for { - ngid = ts.newGid() - state = JOIN - err := ts.joinGroups(sck, []tester.Tgid{ngid}) - if err == rpc.OK { - state = LEAVE - err = ts.leaveGroups(sck, []tester.Tgid{ngid}) - } else { - //log.Printf("deposed err %v", err) - return - } - } - }() - - r := rand.Int() % RAND - d := time.Duration(r) * time.Millisecond - time.Sleep(d) - - //log.Printf("shutdown gid %d after %dms", gid, r) - ts.Group(gid).Shutdown() - - // sleep for a while to get the chance for the controler to get stuck - // in join or leave, because gid is down - time.Sleep(NSLEEP * time.Second) - - //log.Printf("disconnect sck %v ngid %d num %d state %d", d, ngid, num, state) - - // partition controller - clnt.DisconnectAll() - - if ts.leases { - // wait until sck's lease expired before restarting shardgrp `gid` - time.Sleep((param.LEASETIMESEC + 1) * time.Second) - } - - ts.Group(gid).StartServers() - - // start new controler to pick up where sck left off - sck0, clnt0 := ts.makeShardCtrlerClnt() - if err := sck0.InitController(); err != rpc.OK { - ts.Fatalf("failed to init controller %v", err) - } - cfg, _ = sck0.Query() - s := "join" - if state == LEAVE { - s = "leave" - } - //log.Printf("%v cfg %v recovered %s", s, cfg, s) - - if cfg.Num <= num { - ts.Fatalf("didn't recover; expected %d > %d", num, cfg.Num) - } - - present := cfg.IsMember(ngid) - if (state == JOIN && !present) || (state == LEAVE && present) { - ts.Fatalf("didn't recover %d correctly after %v", ngid, s) - } - - if state == JOIN && present { - // cleanup if disconnected after join but before leave - ts.leaveGroups(sck0, []tester.Tgid{ngid}) - } - - for i := 0; i < len(ka); i++ { - ts.CheckGet(ck, ka[i], va[i], rpc.Tversion(1)) - } - - sck0.ExitController() - - if ts.leases { - // reconnect old controller, which shouldn't be able - // to do anything - clnt.ConnectAll() - - time.Sleep(1 * time.Second) - - for i := 0; i < len(ka); i++ { - ts.CheckGet(ck, ka[i], va[i], rpc.Tversion(1)) - } - } - ts.Config.DeleteClient(clnt) - ts.Config.DeleteClient(clnt0) -} - -func (ts *Test) electCtrler(ck kvtest.IKVClerk, ka, va []string) { - const ( - NSEC = 5 - N = 4 - ) - - ch := make(chan struct{}) - f := func(ch chan struct{}, i int) { - for true { - select { - case <-ch: - return - default: - ngid := ts.newGid() - sck := ts.makeShardCtrler() - if err := sck.InitController(); err != rpc.OK { - ts.Fatalf("failed to init controller %v", err) - } - //log.Printf("%d(%p): join/leave %v", i, sck, ngid) - if err := ts.joinGroups(sck, []tester.Tgid{ngid}); err == rpc.OK { - ts.leaveGroups(sck, []tester.Tgid{ngid}) - } - sck.ExitController() - } - } - } - for i := 0; i < N; i++ { - go f(ch, i) - } - - // let f()'s run for a while - time.Sleep(NSEC * time.Second) - - for i := 0; i < N; i++ { - ch <- struct{}{} - } - for i := 0; i < len(ka); i++ { - ts.CheckGet(ck, ka[i], va[i], rpc.Tversion(1)) - } - -}