diff --git a/src/shardkv1/client.go b/src/shardkv1/client.go deleted file mode 100644 index 4b7863b..0000000 --- a/src/shardkv1/client.go +++ /dev/null @@ -1,51 +0,0 @@ -package shardkv - -// -// client code to talk to a sharded key/value service. -// -// the client uses the shardctrler's clerk to query for the current -// configuration and find the assignment of shards (keys) to groups, -// and then talks to the group that holds the key's shard. -// - -import ( - - "6.5840/kvsrv1/rpc" - "6.5840/kvtest1" - "6.5840/shardkv1/shardctrler" - "6.5840/tester1" -) - -type Clerk struct { - clnt *tester.Clnt - qck *shardctrler.QueryClerk - // You will have to modify this struct. -} - -// The tester calls MakeClerk and passes in a clerk for the -// shardctrler with only the Query method. -func MakeClerk(clnt *tester.Clnt, qck *shardctrler.QueryClerk) kvtest.IKVClerk { - ck := &Clerk{ - clnt: clnt, - qck: qck, - } - // You'll have to add code here. - return ck -} - - -// Get a key from a shardgrp. You can use shardcfg.Key2Shard(key) to -// find the shard responsible for the key and ck.qck.Query() to read -// the current configuration and lookup the servers in the group -// responsible for key. You can make a clerk for that group by -// calling shardgrp.MakeClerk(ck.clnt, servers). -func (ck *Clerk) Get(key string) (string, rpc.Tversion, rpc.Err) { - // You will have to modify this function. - return "", 0, "" -} - -// Put a key to a shard group. -func (ck *Clerk) Put(key string, value string, version rpc.Tversion) rpc.Err { - // You will have to modify this function. - return "" -} diff --git a/src/shardkv1/shardcfg/shardcfg.go b/src/shardkv1/shardcfg/shardcfg.go deleted file mode 100644 index 3be3775..0000000 --- a/src/shardkv1/shardcfg/shardcfg.go +++ /dev/null @@ -1,275 +0,0 @@ -package shardcfg - -import ( - "encoding/json" - "hash/fnv" - "log" - "runtime/debug" - "slices" - "testing" - - "6.5840/tester1" -) - -type Tshid int -type Tnum int - -const ( - NShards = 12 // The number of shards. - NumFirst = Tnum(1) -) - -const ( - Gid1 = tester.Tgid(1) -) - -// which shard is a key in? -// please use this function, -// and please do not change it. -func Key2Shard(key string) Tshid { - h := fnv.New32a() - h.Write([]byte(key)) - shard := Tshid(Tshid(h.Sum32()) % NShards) - return shard -} - -// A configuration -- an assignment of shards to groups. -// Please don't change this. -type ShardConfig struct { - Num Tnum // config number - Shards [NShards]tester.Tgid // shard -> gid - Groups map[tester.Tgid][]string // gid -> servers[] -} - -func MakeShardConfig() *ShardConfig { - c := &ShardConfig{ - Groups: make(map[tester.Tgid][]string), - } - return c -} - -func (cfg *ShardConfig) String() string { - b, err := json.Marshal(cfg) - if err != nil { - log.Fatalf("Unmarshall err %v", err) - } - return string(b) -} - -func FromString(s string) *ShardConfig { - scfg := &ShardConfig{} - if err := json.Unmarshal([]byte(s), scfg); err != nil { - log.Fatalf("Unmarshall err %v", err) - } - return scfg -} - -func (cfg *ShardConfig) Copy() *ShardConfig { - c := MakeShardConfig() - c.Num = cfg.Num - c.Shards = cfg.Shards - for k, srvs := range cfg.Groups { - s := make([]string, len(srvs)) - copy(s, srvs) - c.Groups[k] = s - } - return c -} - -// mostgroup, mostn, leastgroup, leastn -func analyze(c *ShardConfig) (tester.Tgid, int, tester.Tgid, int) { - counts := map[tester.Tgid]int{} - for _, g := range c.Shards { - counts[g] += 1 - } - - mn := -1 - var mg tester.Tgid = -1 - ln := 257 - var lg tester.Tgid = -1 - // Enforce deterministic ordering, map iteration - // is randomized in go - groups := make([]tester.Tgid, len(c.Groups)) - i := 0 - for k := range c.Groups { - groups[i] = k - i++ - } - slices.Sort(groups) - for _, g := range groups { - if counts[g] < ln { - ln = counts[g] - lg = g - } - if counts[g] > mn { - mn = counts[g] - mg = g - } - } - - return mg, mn, lg, ln -} - -// return GID of group with least number of -// assigned shards. -func least(c *ShardConfig) tester.Tgid { - _, _, lg, _ := analyze(c) - return lg -} - -// balance assignment of shards to groups. -// modifies c. -func (c *ShardConfig) Rebalance() { - // if no groups, un-assign all shards - if len(c.Groups) < 1 { - for s, _ := range c.Shards { - c.Shards[s] = 0 - } - return - } - - // assign all unassigned shards - for s, g := range c.Shards { - _, ok := c.Groups[g] - if ok == false { - lg := least(c) - c.Shards[s] = lg - } - } - - // move shards from most to least heavily loaded - for { - mg, mn, lg, ln := analyze(c) - if mn < ln+2 { - break - } - // move 1 shard from mg to lg - for s, g := range c.Shards { - if g == mg { - c.Shards[s] = lg - break - } - } - } -} - -func (cfg *ShardConfig) Join(servers map[tester.Tgid][]string) { - changed := false - for gid, servers := range servers { - _, ok := cfg.Groups[gid] - if ok { - log.Fatalf("re-Join %v", gid) - } - for xgid, xservers := range cfg.Groups { - for _, s1 := range xservers { - for _, s2 := range servers { - if s1 == s2 { - log.Fatalf("Join(%v) puts server %v in groups %v and %v", gid, s1, xgid, gid) - } - } - } - } - // new GID - // modify cfg to reflect the Join() - cfg.Groups[gid] = servers - changed = true - } - if changed == false { - log.Fatalf("Join but no change") - } - cfg.Num += 1 -} - -func (cfg *ShardConfig) Leave(gids []tester.Tgid) { - changed := false - for _, gid := range gids { - _, ok := cfg.Groups[gid] - if ok == false { - // already no GID! - debug.PrintStack() - log.Fatalf("Leave(%v) but not in config", gid) - } else { - // modify op.Config to reflect the Leave() - delete(cfg.Groups, gid) - changed = true - } - } - if changed == false { - debug.PrintStack() - log.Fatalf("Leave but no change") - } - cfg.Num += 1 -} - -func (cfg *ShardConfig) JoinBalance(servers map[tester.Tgid][]string) { - cfg.Join(servers) - cfg.Rebalance() -} - -func (cfg *ShardConfig) LeaveBalance(gids []tester.Tgid) { - cfg.Leave(gids) - cfg.Rebalance() -} - -func (cfg *ShardConfig) GidServers(sh Tshid) (tester.Tgid, []string, bool) { - gid := cfg.Shards[sh] - srvs, ok := cfg.Groups[gid] - return gid, srvs, ok -} - -func (cfg *ShardConfig) IsMember(gid tester.Tgid) bool { - for _, g := range cfg.Shards { - if g == gid { - return true - } - } - return false -} - -func (cfg *ShardConfig) CheckConfig(t *testing.T, groups []tester.Tgid) { - if len(cfg.Groups) != len(groups) { - fatalf(t, "wanted %v groups, got %v", len(groups), len(cfg.Groups)) - } - - // are the groups as expected? - for _, g := range groups { - _, ok := cfg.Groups[g] - if ok != true { - fatalf(t, "missing group %v", g) - } - } - - // any un-allocated shards? - if len(groups) > 0 { - for s, g := range cfg.Shards { - _, ok := cfg.Groups[g] - if ok == false { - fatalf(t, "shard %v -> invalid group %v", s, g) - } - } - } - - // more or less balanced sharding? - counts := map[tester.Tgid]int{} - for _, g := range cfg.Shards { - counts[g] += 1 - } - min := 257 - max := 0 - for g, _ := range cfg.Groups { - if counts[g] > max { - max = counts[g] - } - if counts[g] < min { - min = counts[g] - } - } - if max > min+1 { - fatalf(t, "max %v too much larger than min %v", max, min) - } -} - -func fatalf(t *testing.T, format string, args ...any) { - debug.PrintStack() - t.Fatalf(format, args...) -} diff --git a/src/shardkv1/shardcfg/shardcfg_test.go b/src/shardkv1/shardcfg/shardcfg_test.go deleted file mode 100644 index 4d45204..0000000 --- a/src/shardkv1/shardcfg/shardcfg_test.go +++ /dev/null @@ -1,62 +0,0 @@ -package shardcfg - -import ( - "testing" - - "6.5840/tester1" -) - -func check_same_config(t *testing.T, c1 ShardConfig, c2 ShardConfig) { - if c1.Num != c2.Num { - t.Fatalf("Num wrong") - } - if c1.Shards != c2.Shards { - t.Fatalf("Shards wrong") - } - if len(c1.Groups) != len(c2.Groups) { - t.Fatalf("number of Groups is wrong") - } - for gid, sa := range c1.Groups { - sa1, ok := c2.Groups[gid] - if ok == false || len(sa1) != len(sa) { - t.Fatalf("len(Groups) wrong") - } - if ok && len(sa1) == len(sa) { - for j := 0; j < len(sa); j++ { - if sa[j] != sa1[j] { - t.Fatalf("Groups wrong") - } - } - } - } -} - -func TestBasic(t *testing.T) { - const ( - Gid1 = 1 - Gid2 = 2 - ) - cfg := MakeShardConfig() - cfg.CheckConfig(t, []tester.Tgid{}) - - cfg.JoinBalance(map[tester.Tgid][]string{Gid1: []string{"x", "y", "z"}}) - cfg.CheckConfig(t, []tester.Tgid{Gid1}) - - cfg.JoinBalance(map[tester.Tgid][]string{Gid2: []string{"a", "b", "c"}}) - cfg.CheckConfig(t, []tester.Tgid{Gid1, Gid2}) - - sa1 := cfg.Groups[Gid1] - if len(sa1) != 3 || sa1[0] != "x" || sa1[1] != "y" || sa1[2] != "z" { - t.Fatalf("wrong servers for gid %v: %v\n", Gid1, sa1) - } - sa2 := cfg.Groups[Gid2] - if len(sa2) != 3 || sa2[0] != "a" || sa2[1] != "b" || sa2[2] != "c" { - t.Fatalf("wrong servers for gid %v: %v\n", Gid2, sa2) - } - - cfg.LeaveBalance([]tester.Tgid{Gid1}) - cfg.CheckConfig(t, []tester.Tgid{Gid2}) - - cfg.LeaveBalance([]tester.Tgid{Gid2}) - cfg.CheckConfig(t, []tester.Tgid{}) -} diff --git a/src/shardkv1/shardctrler/client.go b/src/shardkv1/shardctrler/client.go deleted file mode 100644 index 55ecaaa..0000000 --- a/src/shardkv1/shardctrler/client.go +++ /dev/null @@ -1,49 +0,0 @@ -package shardctrler - -import ( - // "log" - "sync/atomic" - - "6.5840/kvsrv1/rpc" - "6.5840/tester1" -) - -type Clerk struct { - clnt *tester.Clnt - servers []string - deposed *int32 - // You will have to modify this struct. -} - -// The shard controller can use MakeClerk to make a clerk for the kvraft -// group with the servers `servers`. -func MakeClerk(clnt *tester.Clnt, servers []string, deposed *int32) *Clerk { - ck := &Clerk{clnt: clnt, servers: servers, deposed: deposed} - // You may add code here. - return ck -} - -func (ck *Clerk) isDeposed() bool { - z := atomic.LoadInt32(ck.deposed) - return z == 1 -} - -// You can reuse your kvraft Get -func (ck *Clerk) Get(key string) (string, rpc.Tversion, rpc.Err) { - args := rpc.GetArgs{} - args.Key = key - - // You'll have to add code here. - return "", 0, "" -} - -// You can reuse your kvraft Put -func (ck *Clerk) Put(key string, value string, version rpc.Tversion) rpc.Err { - args := rpc.PutArgs{} - args.Key = key - args.Value = value - args.Version = version - - // You'll have to add code here. - return "" -} diff --git a/src/shardkv1/shardctrler/lock/lock.go b/src/shardkv1/shardctrler/lock/lock.go deleted file mode 100644 index 0b76170..0000000 --- a/src/shardkv1/shardctrler/lock/lock.go +++ /dev/null @@ -1,58 +0,0 @@ -package lock - -import ( - "log" - "time" - - "6.5840/kvsrv1/rpc" - "6.5840/kvtest1" -) - -type Lock struct { - kvtest.IKVClerk - l string - id string - ver rpc.Tversion -} - -func MakeLock(ck kvtest.IKVClerk, l string) *Lock { - lk := &Lock{IKVClerk: ck} - // You may add core here - return lk -} - -func (lk *Lock) AcquireLeadership() { - for { - if val, ver, err := lk.Get(lk.l); err == rpc.OK { - if val == "" { // put only when lock is free - if err := lk.Put(lk.l, lk.id, ver); err == rpc.OK { - lk.ver = ver + 1 - return - } else if err == rpc.ErrMaybe { // check if put succeeded? - if val, ver, err := lk.Get(lk.l); err == rpc.OK { - if val == lk.id { - lk.ver = ver - return - } - } - } - } - time.Sleep(1 * time.Millisecond) - } - } -} - -// for two testing purposes: 1) for the ctrler that is a leader to -// give up its leadership; 2) to take back leadership from a -// partitioned/deposed ctrler using a new ctrler. -func (lk *Lock) ReleaseLeadership() rpc.Err { - _, ver, err := lk.Get(lk.l) - if err != rpc.OK { - log.Printf("ResetLock: %v err %v", lk.l, err) - } - if err := lk.Put(lk.l, "", ver); err == rpc.OK || err == rpc.ErrMaybe { - return rpc.OK - } else { - return err - } -} diff --git a/src/shardkv1/shardctrler/shardctrler.go b/src/shardkv1/shardctrler/shardctrler.go deleted file mode 100644 index 29332fd..0000000 --- a/src/shardkv1/shardctrler/shardctrler.go +++ /dev/null @@ -1,115 +0,0 @@ -package shardctrler - -// -// Shardctrler implemented as a clerk. -// - -import ( - - "sync/atomic" - - "6.5840/kvraft1" - "6.5840/kvsrv1/rpc" - "6.5840/kvtest1" - "6.5840/shardkv1/shardcfg" - "6.5840/tester1" -) - -const ( - ErrDeposed = "ErrDeposed" -) - - -// The query clerk must support only Query(); it is intended for use -// by shardkv clerks to read the current configuration (see -// ../client.go). -type QueryClerk struct { - kvtest.IKVClerk - // Your data here. -} - -// Make a query clerk for controller's kvraft group to invoke just -// Query() -func MakeQueryClerk(clnt *tester.Clnt, servers []string) *QueryClerk { - qck := &QueryClerk{ - IKVClerk: kvraft.MakeClerk(clnt, servers), - } - // Your code here. - return qck -} - -// Return the current configuration. You can use Get() to retrieve -// the string representing the configuration and shardcfg.ToShardCfg -// to unmarshal the string into a ShardConfig. -func (qck *QueryClerk) Query() (*shardcfg.ShardConfig, rpc.Tversion) { - // Your code here. - return nil, 0 -} - -// ShardCtrlerClerk for the shard controller. It implements the -// methods for Init(), Join(), Leave(), etc. -type ShardCtrlerClerk struct { - clnt *tester.Clnt - deposed int32 // set by Stepdown() - - // Your data here. -} - -// Make a ShardCltlerClerk for the shard controller, which stores its -// state in a kvraft group. You can call (and implement) the -// MakeClerk method in client.go to make a kvraft clerk for the kvraft -// group with the servers `servers`. -func MakeShardCtrlerClerk(clnt *tester.Clnt, servers []string) *ShardCtrlerClerk { - sck := &ShardCtrlerClerk{clnt: clnt} - // Your code here. - return sck -} - - -// Called once by the tester to supply the first configuration. You -// can marshal ShardConfig into a string using shardcfg.String(), and -// then Put it in the kvraft group for the controller at version 0. -// You can pick the key to name the configuration. -func (sck *ShardCtrlerClerk) Init(cfg *shardcfg.ShardConfig) rpc.Err { - // Your code here - return rpc.OK -} - -// Add group gid. Use shardcfg.JoinBalance() to compute the new -// configuration; the supplied `srvrs` are the servers for the new -// group. You can find the servers for existing groups in the -// configuration (which you can retrieve using Query()) and you can -// make a clerk for a group by calling shardgrp.MakeClerk(sck.clnt, -// servers), and then invoke its Freeze/InstallShard methods. -func (sck *ShardCtrlerClerk) Join(gid tester.Tgid, srvs []string) rpc.Err { - // Your code here - return rpc.ErrNoKey -} - -// Group gid leaves. You can use shardcfg.LeaveBalance() to compute -// the new configuration. -func (sck *ShardCtrlerClerk) Leave(gid tester.Tgid) rpc.Err { - // Your code here - return rpc.ErrNoKey -} - -// the tester calls Stepdown() to force a ctrler to step down while it -// is perhaps in the middle of a join/move. for your convenience, we -// also supply isDeposed() method to test rf.dead in long-running -// loops -func (sck *ShardCtrlerClerk) Stepdown() { - atomic.StoreInt32(&sck.deposed, 1) -} - -func (sck *ShardCtrlerClerk) isDeposed() bool { - z := atomic.LoadInt32(&sck.deposed) - return z == 1 -} - - -// Return the current configuration -func (sck *ShardCtrlerClerk) Query() (*shardcfg.ShardConfig, rpc.Tversion, rpc.Err) { - // Your code here. - return nil, 0, "" -} - diff --git a/src/shardkv1/shardgrp/client.go b/src/shardkv1/shardgrp/client.go deleted file mode 100644 index edebbdb..0000000 --- a/src/shardkv1/shardgrp/client.go +++ /dev/null @@ -1,38 +0,0 @@ -package shardgrp - -import ( - - - "6.5840/kvsrv1/rpc" - "6.5840/shardkv1/shardcfg" - "6.5840/tester1" -) - -type Clerk struct { - clnt *tester.Clnt - servers []string - leader int // last successful leader (index into servers[]) -} - -func MakeClerk(clnt *tester.Clnt, servers []string) *Clerk { - ck := &Clerk{clnt: clnt, servers: servers} - return ck -} - -func (ck *Clerk) Get(cid shardcfg.Tnum, key string, n shardcfg.Tnum) (string, rpc.Tversion, rpc.Err) { - // Your code here - return "", 0, "" -} - -func (ck *Clerk) Put(key string, value string, version rpc.Tversion, n shardcfg.Tnum) (bool, rpc.Err) { - // Your code here - return false, "" -} - -func (ck *Clerk) Freeze(s shardcfg.Tshid, num shardcfg.Tnum) ([]byte, rpc.Err) { - return nil, "" -} - -func (ck *Clerk) InstallShard(s shardcfg.Tshid, state []byte, num shardcfg.Tnum) rpc.Err { - return "" -} diff --git a/src/shardkv1/shardgrp/server.go b/src/shardkv1/shardgrp/server.go deleted file mode 100644 index ead33c5..0000000 --- a/src/shardkv1/shardgrp/server.go +++ /dev/null @@ -1,94 +0,0 @@ -package shardgrp - -import ( - "sync/atomic" - - - "6.5840/kvraft1/rsm" - "6.5840/kvsrv1/rpc" - "6.5840/labgob" - "6.5840/labrpc" - "6.5840/shardkv1/shardgrp/shardrpc" - "6.5840/tester1" -) - - -type KVServer struct { - gid tester.Tgid - me int - dead int32 // set by Kill() - rsm *rsm.RSM - -} - - -func (kv *KVServer) DoOp(req any) any { - // Your code here - return nil -} - - -func (kv *KVServer) Snapshot() []byte { - // Your code here - return nil -} - -func (kv *KVServer) Restore(data []byte) { - // Your code here -} - -func (kv *KVServer) Get(args *shardrpc.GetArgs, reply *rpc.GetReply) { - // Your code here -} - -func (kv *KVServer) Put(args *shardrpc.PutArgs, reply *rpc.PutReply) { - // Your code here -} - -// Freeze the specified shard (i.e., reject future Get/Puts for this -// shard) and return the key/values stored in that shard. -func (kv *KVServer) Freeze(args *shardrpc.FreezeArgs, reply *shardrpc.FreezeReply) { - // Your code here -} - -// Install the supplied state for the specified shard. -func (kv *KVServer) InstallShard(args *shardrpc.InstallShardArgs, reply *shardrpc.InstallShardReply) { - // Your code here -} - -// the tester calls Kill() when a KVServer instance won't -// be needed again. for your convenience, we supply -// code to set rf.dead (without needing a lock), -// and a killed() method to test rf.dead in -// long-running loops. you can also add your own -// code to Kill(). you're not required to do anything -// about this, but it may be convenient (for example) -// to suppress debug output from a Kill()ed instance. -func (kv *KVServer) Kill() { - atomic.StoreInt32(&kv.dead, 1) - // Your code here, if desired. -} - -func (kv *KVServer) killed() bool { - z := atomic.LoadInt32(&kv.dead) - return z == 1 -} - -// StartKVServer() and MakeRSM() must return quickly, so they should -// start goroutines for any long-running work. -func StartKVServer(servers []*labrpc.ClientEnd, gid tester.Tgid, me int, persister *tester.Persister, maxraftstate int) []tester.IService { - // call labgob.Register on structures you want - // Go's RPC library to marshall/unmarshall. - labgob.Register(shardrpc.PutArgs{}) - labgob.Register(shardrpc.GetArgs{}) - labgob.Register(shardrpc.FreezeArgs{}) - labgob.Register(shardrpc.InstallShardArgs{}) - labgob.Register(shardrpc.DeleteShardArgs{}) - labgob.Register(rsm.Op{}) - - kv := &KVServer{gid: gid, me: me} - kv.rsm = rsm.MakeRSM(servers, me, persister, maxraftstate, kv) - - // Your code here - return []tester.IService{kv, kv.rsm.Raft()} -} diff --git a/src/shardkv1/shardgrp/shardrpc/shardrpc.go b/src/shardkv1/shardgrp/shardrpc/shardrpc.go deleted file mode 100644 index d8424ea..0000000 --- a/src/shardkv1/shardgrp/shardrpc/shardrpc.go +++ /dev/null @@ -1,50 +0,0 @@ -package shardrpc - -import ( - "6.5840/kvsrv1/rpc" - "6.5840/shardkv1/shardcfg" -) - -// Same as Put in kvsrv1/rpc, but with a configuration number. -type PutArgs struct { - Key string - Value string - Version rpc.Tversion - Num shardcfg.Tnum -} - -// Same as Get in kvsrv1/rpc, but with a configuration number. -type GetArgs struct { - Key string - Num shardcfg.Tnum -} - -type FreezeArgs struct { - Shard shardcfg.Tshid - Num shardcfg.Tnum -} - -type FreezeReply struct { - State []byte - Num shardcfg.Tnum - Err rpc.Err -} - -type InstallShardArgs struct { - Shard shardcfg.Tshid - State []byte - Num shardcfg.Tnum -} - -type InstallShardReply struct { - Err rpc.Err -} - -type DeleteShardArgs struct { - Shard shardcfg.Tshid - Num shardcfg.Tnum -} - -type DeleteShardReply struct { - Err rpc.Err -} diff --git a/src/shardkv1/shardkv_test.go b/src/shardkv1/shardkv_test.go deleted file mode 100644 index 24ae2c0..0000000 --- a/src/shardkv1/shardkv_test.go +++ /dev/null @@ -1,304 +0,0 @@ -package shardkv - -import ( - "log" - "testing" - "time" - - "6.5840/kvsrv1/rpc" - "6.5840/kvtest1" - "6.5840/shardkv1/shardcfg" - "6.5840/tester1" - // "6.5840/shardkv1/shardctrler" -) - -const ( - NGRP = 8 -) - -// Setup a k/v service with 1 shardgrp (group 0) for storing the -// controller to store its state and 1 shardgrp (group 1) to store all -// shards. Test's controller's Init() and Query(), and shardkv's -// Get/Put without reconfiguration. -func TestStaticOneShardGroup5A(t *testing.T) { - ts := MakeTest(t, "Test (5A): one shard group ...", true, false) - defer ts.Cleanup() - - // The tester's setupKVService() sets up a kvraft group for the - // controller to store configurations and calls the controller's - // Init() method to create the first configuration. - ts.setupKVService() - sck := ts.ShardCtrler() // get the controller clerk from tester - - // Read the initial configuration and check it - cfg, v, err := sck.Query() - if err != rpc.OK { - ts.t.Fatalf("Query failed %v", err) - } - if v != 1 || cfg.Num != 1 || cfg.Shards[0] != shardcfg.Gid1 { - ts.t.Fatalf("Static wrong %v %v", cfg, v) - } - cfg.CheckConfig(t, []tester.Tgid{shardcfg.Gid1}) - - ck := ts.MakeClerk() // make a shardkv clerk - ka, va := ts.SpreadPuts(ck, shardcfg.NShards) // do some puts - n := len(ka) - for i := 0; i < n; i++ { - ts.CheckGet(ck, ka[i], va[i], rpc.Tversion(1)) // check the puts - } -} - -// test shardctrler's join, which adds a new group Gid2 and must move -// shards to the new group and the old group should reject Get/Puts on -// shards that moved. -func TestJoinBasic5A(t *testing.T) { - ts := MakeTest(t, "Test (5A): a group joins...", true, false) - defer ts.Cleanup() - - gid1 := ts.setupKVService() - ck := ts.MakeClerk() - ka, va := ts.SpreadPuts(ck, shardcfg.NShards) - - sck := ts.ShardCtrler() - cfg, _, err := sck.Query() - if err != rpc.OK { - ts.t.Fatalf("Query: err %v", err) - } - - gid2 := ts.newGid() - err = ts.joinGroups(sck, []tester.Tgid{gid2}) - if err != rpc.OK { - ts.t.Fatalf("joinGroups: err %v", err) - } - - cfg1, _, err := sck.Query() - if err != rpc.OK { - ts.t.Fatalf("Query 1: err %v", err) - } - - if cfg.Num+1 != cfg1.Num { - ts.t.Fatalf("wrong num %d expected %d ", cfg1.Num, cfg.Num+1) - } - - if !cfg1.IsMember(gid2) { - ts.t.Fatalf("%d isn't a member of %v", gid2, cfg1) - } - - // check shards at shardcfg.Gid2 - ts.checkShutdownSharding(gid1, gid2, ka, va) - - for i := 0; i < len(ka); i++ { - ts.CheckGet(ck, ka[i], va[i], rpc.Tversion(1)) - } - - // check shards at shardcfg.Gid1 - ts.checkShutdownSharding(gid2, gid1, ka, va) - - for i := 0; i < len(ka); i++ { - ts.CheckGet(ck, ka[i], va[i], rpc.Tversion(1)) - } -} - -// test shardctrler's leave -func TestJoinLeaveBasic5A(t *testing.T) { - ts := MakeTest(t, "Test (5A): basic groups join/leave ...", true, false) - defer ts.Cleanup() - - gid1 := ts.setupKVService() - ck := ts.MakeClerk() - ka, va := ts.SpreadPuts(ck, shardcfg.NShards) - - sck := ts.ShardCtrler() - gid2 := ts.newGid() - err := ts.joinGroups(sck, []tester.Tgid{gid2}) - if err != rpc.OK { - ts.t.Fatalf("joinGroups: err %v", err) - } - - // check shards at shardcfg.Gid2 - ts.checkShutdownSharding(gid1, gid2, ka, va) - - for i := 0; i < len(ka); i++ { - ts.CheckGet(ck, ka[i], va[i], rpc.Tversion(1)) - } - - err = sck.Leave(shardcfg.Gid1) - if err != rpc.OK { - ts.t.Fatalf("Leave: err %v", err) - } - cfg, _, err := sck.Query() - if err != rpc.OK { - ts.t.Fatalf("Query err %v", err) - } - if cfg.IsMember(shardcfg.Gid1) { - ts.t.Fatalf("%d is a member of %v", shardcfg.Gid1, cfg) - } - - ts.Group(shardcfg.Gid1).Shutdown() - - for i := 0; i < len(ka); i++ { - ts.CheckGet(ck, ka[i], va[i], rpc.Tversion(1)) - } - - // bring the crashed shard/group back to life. - ts.Group(shardcfg.Gid1).StartServers() - - // Rejoin - sck.Join(shardcfg.Gid1, ts.Group(shardcfg.Gid1).SrvNames()) - - for i := 0; i < len(ka); i++ { - ts.CheckGet(ck, ka[i], va[i], rpc.Tversion(1)) - } - - // check shards at shardcfg.Gid2 - ts.checkShutdownSharding(gid2, gid1, ka, va) -} - -// test many groups joining and leaving, reliable or unreliable -func joinLeave5A(t *testing.T, reliable bool, part string) { - ts := MakeTest(t, "Test (5A): many groups join/leave ...", reliable, false) - defer ts.Cleanup() - - ts.setupKVService() - ck := ts.MakeClerk() - ka, va := ts.SpreadPuts(ck, shardcfg.NShards) - - sck := ts.ShardCtrler() - grps := ts.groups(NGRP) - - ts.joinGroups(sck, grps) - - ts.checkShutdownSharding(grps[0], grps[1], ka, va) - - for i := 0; i < len(ka); i++ { - ts.CheckGet(ck, ka[i], va[i], rpc.Tversion(1)) - } - - ts.leaveGroups(sck, grps) - - for i := 0; i < len(ka); i++ { - ts.CheckGet(ck, ka[i], va[i], rpc.Tversion(1)) - } -} - -func TestManyJoinLeaveReliable5A(t *testing.T) { - joinLeave5A(t, true, "Test (5A): many groups join/leave reliable...") -} - -func TestManyJoinLeaveUnreliable5A(t *testing.T) { - joinLeave5A(t, false, "Test (5A): many groups join/leave unreliable...") -} - -// Test we can recover from complete shutdown using snapshots -func TestSnapshot5A(t *testing.T) { - const NGRP = 3 - - ts := MakeTest(t, "Test (5A): snapshots ...", true, false) - defer ts.Cleanup() - - ts.setupKVService() - ck := ts.MakeClerk() - ka, va := ts.SpreadPuts(ck, shardcfg.NShards) - - sck := ts.ShardCtrler() - grps := ts.groups(2) - ts.joinGroups(sck, grps) - - // check shards at shardcfg.Gid2 - ts.checkShutdownSharding(grps[0], grps[1], ka, va) - - for i := 0; i < len(ka); i++ { - ts.CheckGet(ck, ka[i], va[i], rpc.Tversion(1)) - } - - for i := tester.Tgid(0); i < NGRP; i++ { - ts.Group(shardcfg.Gid1).Shutdown() - } - for i := tester.Tgid(0); i < NGRP; i++ { - ts.Group(shardcfg.Gid1).StartServers() - } - - for i := 0; i < len(ka); i++ { - ts.CheckGet(ck, ka[i], va[i], rpc.Tversion(1)) - } -} - -// Test linearizability with groups joining/leaving and `nclnt` -// concurrent clerks put/get's in `unreliable` net. -func concurrentClerk(t *testing.T, nclnt int, reliable bool, part string) { - const ( - NSEC = 20 - ) - - ts := MakeTest(t, part, reliable, true) - defer ts.Cleanup() - - ts.setupKVService() - - ka := kvtest.MakeKeys(shardcfg.NShards) - ch := make(chan []kvtest.ClntRes) - - start := time.Now() - - go func(ch chan []kvtest.ClntRes) { - rs := ts.SpawnClientsAndWait(nclnt, NSEC*time.Second, func(me int, ck kvtest.IKVClerk, done chan struct{}) kvtest.ClntRes { - return ts.OneClientPut(me, ck, ka, done) - }) - ch <- rs - }(ch) - - sck := ts.ShardCtrler() - grps := ts.groups(NGRP) - ts.joinGroups(sck, grps) - - ts.leaveGroups(sck, grps) - - log.Printf("time joining/leaving %v", time.Since(start)) - - rsa := <-ch - - log.Printf("rsa %v", rsa) - - ts.CheckPorcupine() -} - -// Test linearizability with groups joining/leaving and 1 concurrent clerks put/get's -func TestOneConcurrentClerkReliable5A(t *testing.T) { - concurrentClerk(t, 1, true, "Test (5A): one concurrent clerk reliable...") -} - -// Test linearizability with groups joining/leaving and many concurrent clerks put/get's -func TestManyConcurrentClerkReliable5A(t *testing.T) { - const NCLNT = 10 - concurrentClerk(t, NCLNT, true, "Test (5A): many concurrent clerks reliable...") -} - -// Test linearizability with groups joining/leaving and 1 concurrent clerks put/get's -func TestOneConcurrentClerkUnreliable5A(t *testing.T) { - concurrentClerk(t, 1, false, "Test (5A): one concurrent clerk unreliable ...") -} - -// Test linearizability with groups joining/leaving and many concurrent clerks put/get's -func TestManyConcurrentClerkUnreliable5A(t *testing.T) { - const NCLNT = 10 - concurrentClerk(t, NCLNT, false, "Test (5A): many concurrent clerks unreliable...") -} - -// test recovery of partitioned controlers -func TestRecoverCtrler5B(t *testing.T) { - const ( - NPARITITON = 10 - ) - - ts := MakeTest(t, "Test (5B): recover controler ...", true, false) - defer ts.Cleanup() - - ts.setupKVService() - ck := ts.MakeClerk() - ka, va := ts.SpreadPuts(ck, shardcfg.NShards) - - for i := 0; i < NPARITITON; i++ { - ts.partitionCtrler(ck, ka, va) - } -} - diff --git a/src/shardkv1/test.go b/src/shardkv1/test.go deleted file mode 100644 index 33b12f5..0000000 --- a/src/shardkv1/test.go +++ /dev/null @@ -1,313 +0,0 @@ -package shardkv - -import ( - "fmt" - "log" - "math/rand" - "sync" - "testing" - "time" - - "6.5840/kvraft1" - "6.5840/kvsrv1/rpc" - "6.5840/kvtest1" - "6.5840/labrpc" - "6.5840/shardkv1/shardcfg" - "6.5840/shardkv1/shardctrler" - "6.5840/shardkv1/shardgrp" - "6.5840/tester1" -) - -type Test struct { - t *testing.T - *kvtest.Test - - sck *shardctrler.ShardCtrlerClerk - part string - - maxraftstate int - mu sync.Mutex - ngid tester.Tgid -} - -const ( - Controler = tester.Tgid(0) // controler uses group 0 for a kvraft group - NSRV = 3 // servers per group - INTERGRPDELAY = 200 // time in ms between group changes -) - -// Setup a kvraft group (group 0) for the shard controller and make -// the controller clerk. -func MakeTest(t *testing.T, part string, reliable, randomkeys bool) *Test { - ts := &Test{ - ngid: shardcfg.Gid1 + 1, // Gid1 is in use - t: t, - maxraftstate: -1, - } - cfg := tester.MakeConfig(t, NSRV, reliable, ts.StartKVServerControler) - ts.Test = kvtest.MakeTest(t, cfg, randomkeys, ts) - ts.sck = ts.makeShardCtrlerClerk() - ts.Begin(part) - return ts -} - -func (ts *Test) StartKVServerControler(servers []*labrpc.ClientEnd, gid tester.Tgid, me int, persister *tester.Persister) []tester.IService { - return kvraft.StartKVServer(servers, gid, me, persister, ts.maxraftstate) -} - -func (ts *Test) MakeClerk() kvtest.IKVClerk { - clnt := ts.Config.MakeClient() - ck := MakeClerk(clnt, ts.makeQueryClerk()) - return &kvtest.TestClerk{ck, clnt} -} - -func (ts *Test) DeleteClerk(ck kvtest.IKVClerk) { - tck := ck.(*kvtest.TestClerk) - ts.DeleteClient(tck.Clnt) -} - -func (ts *Test) ShardCtrler() *shardctrler.ShardCtrlerClerk { - return ts.sck -} - -func (ts *Test) makeShardCtrlerClerk() *shardctrler.ShardCtrlerClerk { - ck, _ := ts.makeShardCtrlerClerkClnt() - return ck -} - -func (ts *Test) makeShardCtrlerClerkClnt() (*shardctrler.ShardCtrlerClerk, *tester.Clnt) { - srvs := ts.Group(Controler).SrvNames() - clnt := ts.Config.MakeClient() - return shardctrler.MakeShardCtrlerClerk(clnt, srvs), clnt -} - -func (ts *Test) makeQueryClerk() *shardctrler.QueryClerk { - srvs := ts.Group(Controler).SrvNames() - clnt := ts.Config.MakeClient() - return shardctrler.MakeQueryClerk(clnt, srvs) -} - -func (ts *Test) newGid() tester.Tgid { - ts.mu.Lock() - defer ts.mu.Unlock() - - gid := ts.ngid - ts.ngid += 1 - return gid -} - -func (ts *Test) groups(n int) []tester.Tgid { - grps := make([]tester.Tgid, n) - for i := 0; i < n; i++ { - grps[i] = ts.newGid() - } - return grps -} - -// Set up KVServervice with one group Gid1. Gid1 should initialize -// itself to own all shards. -func (ts *Test) setupKVService() tester.Tgid { - scfg := shardcfg.MakeShardConfig() - ts.Config.MakeGroupStart(shardcfg.Gid1, NSRV, ts.StartKVServerShard) - scfg.JoinBalance(map[tester.Tgid][]string{shardcfg.Gid1: ts.Group(shardcfg.Gid1).SrvNames()}) - if err := ts.sck.Init(scfg); err != rpc.OK { - ts.t.Fatalf("Init err %v", err) - } - //ts.sck.AcquireLeadership() - return shardcfg.Gid1 -} - -func (ts *Test) StartKVServerShard(servers []*labrpc.ClientEnd, gid tester.Tgid, me int, persister *tester.Persister) []tester.IService { - return shardgrp.StartKVServer(servers, gid, me, persister, ts.maxraftstate) -} - -func (ts *Test) joinGroups(sck *shardctrler.ShardCtrlerClerk, gids []tester.Tgid) rpc.Err { - for i, gid := range gids { - ts.Config.MakeGroupStart(gid, NSRV, ts.StartKVServerShard) - if err := sck.Join(gid, ts.Group(gid).SrvNames()); err != rpc.OK { - return err - } - if i < len(gids)-1 { - time.Sleep(INTERGRPDELAY * time.Millisecond) - } - } - return rpc.OK -} - -func (ts *Test) leaveGroups(sck *shardctrler.ShardCtrlerClerk, gids []tester.Tgid) rpc.Err { - for i, gid := range gids { - if err := sck.Leave(gid); err != rpc.OK { - return err - } - ts.Config.ExitGroup(gid) - if i < len(gids)-1 { - time.Sleep(INTERGRPDELAY * time.Millisecond) - } - } - return rpc.OK -} - -func (ts *Test) checkLogs(gids []tester.Tgid) { - for _, gid := range gids { - n := ts.Group(gid).LogSize() - s := ts.Group(gid).SnapshotSize() - if ts.maxraftstate >= 0 && n > 8*ts.maxraftstate { - ts.t.Fatalf("persister.RaftStateSize() %v, but maxraftstate %v", - n, ts.maxraftstate) - } - if ts.maxraftstate < 0 && s > 0 { - ts.t.Fatalf("maxraftstate is -1, but snapshot is non-empty!") - } - - } -} - -// make sure that the data really is sharded by -// shutting down one shard and checking that some -// Get()s don't succeed. -func (ts *Test) checkShutdownSharding(down, up tester.Tgid, ka []string, va []string) { - const NSEC = 2 - - ts.Group(down).Shutdown() - - ts.checkLogs([]tester.Tgid{down, up}) // forbid snapshots - - n := len(ka) - ch := make(chan string) - for xi := 0; xi < n; xi++ { - ck1 := ts.MakeClerk() - go func(i int) { - v, _, _ := ck1.Get(ka[i]) - if v != va[i] { - ch <- fmt.Sprintf("Get(%v): expected:\n%v\nreceived:\n%v", ka[i], va[i], v) - } else { - ch <- "" - } - }(xi) - } - - // wait a bit, only about half the Gets should succeed. - ndone := 0 - done := false - for done == false { - select { - case err := <-ch: - if err != "" { - ts.Fatalf(err) - } - ndone += 1 - case <-time.After(time.Second * NSEC): - done = true - break - } - } - - // log.Printf("%d completions out of %d with %d groups", ndone, n, ngrp) - if ndone >= n { - ts.Fatalf("expected less than %d completions with one shard dead\n", n) - } - - // bring the crashed shard/group back to life. - ts.Group(down).StartServers() -} - -// Run one controler and then partitioned it forever after some time -// Run another cntrler that must finish the first ctrler's unfinished -// shard moves, if there are any. -func (ts *Test) partitionCtrler(ck kvtest.IKVClerk, ka, va []string) { - const ( - MSEC = 20 - RAND = 2000 // maybe measure? - ) - - ch := make(chan tester.Tgid) - - sck, clnt := ts.makeShardCtrlerClerkClnt() - cfg, _, err := ts.ShardCtrler().Query() - num := cfg.Num - - go func() { - for true { - ngid := ts.newGid() - //log.Printf("join %d", ngid) - //s := time.Now() - ch <- ngid - err := ts.joinGroups(sck, []tester.Tgid{ngid}) - if err == rpc.OK { - err = ts.leaveGroups(sck, []tester.Tgid{ngid}) - } - //log.Printf("join err %v time %v", err, time.Since(s)) - if err == shardctrler.ErrDeposed { - log.Printf("disposed") - return - } - if err != rpc.OK { - ts.t.Fatalf("join/leave err %v", err) - } - time.Sleep(INTERGRPDELAY * time.Millisecond) - } - }() - - lastgid := <-ch - - d := time.Duration(rand.Int()%RAND) * time.Millisecond - time.Sleep(MSEC*time.Millisecond + d) - - log.Printf("disconnect sck %v", d) - - // partition sck forever - clnt.DisconnectAll() - - // force sck to step down - sck.Stepdown() - - // wait until sck has no more requests in the network - time.Sleep(labrpc.MAXDELAY) - - cfg, _, err = ts.ShardCtrler().Query() - if err != rpc.OK { - ts.Fatalf("Query err %v", err) - } - - recovery := false - present := cfg.IsMember(lastgid) - join := num == cfg.Num - leave := num+1 == cfg.Num - if !present && join { - recovery = true - } - if present && leave { - recovery = true - } - - // start new controler to pick up where sck left off - sck0, clnt0 := ts.makeShardCtrlerClerkClnt() - if err != rpc.OK { - ts.Fatalf("Query err %v", err) - } - - cfg, _, err = sck0.Query() - if recovery { - s := "join" - if leave { - s = "leave" - } - //log.Printf("%v in progress", s) - present = cfg.IsMember(lastgid) - if (join && !present) || (leave && present) { - ts.Fatalf("didn't recover %d correctly after %v", lastgid, s) - } - } - - if present { - // cleanup if disconnected after join but before leave - ts.leaveGroups(sck0, []tester.Tgid{lastgid}) - } - - for i := 0; i < len(ka); i++ { - ts.CheckGet(ck, ka[i], va[i], rpc.Tversion(1)) - } - - ts.Config.DeleteClient(clnt) - ts.Config.DeleteClient(clnt0) -}