From ea04b4c78f79d5e180368a70d19e0e567d08ee65 Mon Sep 17 00:00:00 2001 From: Yun-Sheng Chang Date: Sat, 15 Mar 2025 22:11:08 -0400 Subject: [PATCH] update --- src/kvraft1/rsm/test.go | 2 + src/raft1/raft_test.go | 7 ++-- src/shardkv1/shardctrler/shardctrler.go | 21 ++++++----- src/shardkv1/shardkv_test.go | 39 +++++++++----------- src/shardkv1/test.go | 49 +++++++------------------ src/tester1/group.go | 5 +++ src/tester1/srv.go | 2 +- 7 files changed, 55 insertions(+), 70 deletions(-) diff --git a/src/kvraft1/rsm/test.go b/src/kvraft1/rsm/test.go index e5d2536..1fb3f8d 100644 --- a/src/kvraft1/rsm/test.go +++ b/src/kvraft1/rsm/test.go @@ -68,7 +68,9 @@ func (ts *Test) onePartition(p []int, req any) any { // try all the servers, maybe one is the leader but give up after NSEC t0 := time.Now() for time.Since(t0).Seconds() < NSEC { + ts.mu.Lock() index := ts.leader + ts.mu.Unlock() for range ts.srvs { if ts.g.IsConnected(index) { s := ts.srvs[index] diff --git a/src/raft1/raft_test.go b/src/raft1/raft_test.go index a56e9b2..b435241 100644 --- a/src/raft1/raft_test.go +++ b/src/raft1/raft_test.go @@ -878,10 +878,11 @@ func TestPersist33C(t *testing.T) { ts.g.ShutdownServer((leader + 0) % servers) ts.g.ShutdownServer((leader + 1) % servers) tester.AnnotateShutdown([]int{(leader + 0) % servers, (leader + 1) % servers}) - ts.restart((leader + 2) % servers) - ts.restart((leader + 0) % servers) - tester.AnnotateRestart([]int{(leader + 2) % servers, (leader + 0) % servers}) + ts.g.ConnectOne((leader + 2) % servers) tester.AnnotateConnection(ts.g.GetConnected()) + ts.restart((leader + 0) % servers) + tester.AnnotateRestart([]int{(leader + 0) % servers}) + ts.one(103, 2, true) diff --git a/src/shardkv1/shardctrler/shardctrler.go b/src/shardkv1/shardctrler/shardctrler.go index 516bdd9..b0f351c 100644 --- a/src/shardkv1/shardctrler/shardctrler.go +++ b/src/shardkv1/shardctrler/shardctrler.go @@ -37,12 +37,14 @@ func MakeShardCtrler(clnt *tester.Clnt, leases bool) *ShardCtrler { } // The tester calls InitController() before starting a new -// controller. In this method you can implement recovery (part B) and -// use a lock to become leader (part C). InitController may fail when -// another controller supersedes (e.g., when this controller is -// partitioned during recovery). +// controller. In part A, this method doesn't need to do anything. In +// B and C, this method implements recovery (part B) and uses a lock +// to become leader (part C). InitController should return +// rpc.ErrVersion when another controller supersedes it (e.g., when +// this controller is partitioned during recovery); this happens only +// in Part C. Otherwise, it returns rpc.OK. func (sck *ShardCtrler) InitController() rpc.Err { - return rpc.ErrNoKey + return rpc.ErrVersion } // The tester calls ExitController to exit a controller. In part B and @@ -59,9 +61,10 @@ func (sck *ShardCtrler) InitConfig(cfg *shardcfg.ShardConfig) { } // Called by the tester to ask the controller to change the -// configuration from the current one to new. It may return an error -// if this controller is disconnected for a while and another -// controller takes over in the mean time, as in part C. +// configuration from the current one to new. It should return +// rpc.ErrVersion if this controller is superseded by another +// controller, as in part C. In all other cases, it should return +// rpc.OK. func (sck *ShardCtrler) ChangeConfigTo(new *shardcfg.ShardConfig) rpc.Err { return rpc.OK } @@ -79,7 +82,7 @@ func (sck *ShardCtrler) isKilled() bool { } -// Return the current configuration +// Return the current configuration and its version number func (sck *ShardCtrler) Query() (*shardcfg.ShardConfig, rpc.Tversion) { // Your code here. return nil, 0 diff --git a/src/shardkv1/shardkv_test.go b/src/shardkv1/shardkv_test.go index 74eac41..14d9ede 100644 --- a/src/shardkv1/shardkv_test.go +++ b/src/shardkv1/shardkv_test.go @@ -104,15 +104,13 @@ func TestJoinBasic5A(t *testing.T) { ts.t.Fatalf("%d isn't a member of %v", gid2, cfg1) } - // check shards at shardcfg.Gid2 - ts.checkShutdownSharding(gid1, gid2, ka, va) + ts.checkShutdownSharding(gid1, ka, va) for i := 0; i < len(ka); i++ { ts.CheckGet(ck, ka[i], va[i], rpc.Tversion(1)) } - // check shards at shardcfg.Gid1 - ts.checkShutdownSharding(gid2, gid1, ka, va) + ts.checkShutdownSharding(gid2, ka, va) for i := 0; i < len(ka); i++ { ts.CheckGet(ck, ka[i], va[i], rpc.Tversion(1)) @@ -172,8 +170,7 @@ func TestJoinLeaveBasic5A(t *testing.T) { ts.t.Fatalf("joinGroups: err %v", err) } - // check shards at shardcfg.Gid2 - ts.checkShutdownSharding(gid1, gid2, ka, va) + ts.checkShutdownSharding(gid1, ka, va) for i := 0; i < len(ka); i++ { ts.CheckGet(ck, ka[i], va[i], rpc.Tversion(1)) @@ -204,8 +201,7 @@ func TestJoinLeaveBasic5A(t *testing.T) { ts.CheckGet(ck, ka[i], va[i], rpc.Tversion(1)) } - // check shards at shardcfg.Gid2 - ts.checkShutdownSharding(gid2, gid1, ka, va) + ts.checkShutdownSharding(gid2, ka, va) } // test many groups joining and leaving, reliable or unreliable @@ -222,7 +218,7 @@ func joinLeave5A(t *testing.T, reliable bool, part string) { ts.joinGroups(sck, grps) - ts.checkShutdownSharding(grps[0], grps[1], ka, va) + ts.checkShutdownSharding(grps[0], ka, va) for i := 0; i < len(ka); i++ { ts.CheckGet(ck, ka[i], va[i], rpc.Tversion(1)) @@ -260,7 +256,7 @@ func TestShutdown5A(t *testing.T) { grps := ts.groups(NJOIN) ts.joinGroups(sck, grps) - ts.checkShutdownSharding(grps[0], grps[1], ka, va) + ts.checkShutdownSharding(grps[0], ka, va) for i := 0; i < len(ka); i++ { ts.CheckGet(ck, ka[i], va[i], rpc.Tversion(1)) @@ -569,13 +565,13 @@ func TestJoinLeave5B(t *testing.T) { } } -// test recovery of partitioned controlers +// test recovery of partitioned controllers func TestRecoverCtrler5B(t *testing.T) { const ( NPARTITION = 5 ) - ts := MakeTest(t, "Test (5B): recover controler ...", true) + ts := MakeTest(t, "Test (5B): recover controller ...", true) defer ts.Cleanup() gid := ts.setupKVService() @@ -733,7 +729,7 @@ func TestLeaseBasicRefresh5C(t *testing.T) { // Test if old leader is fenced off when reconnecting while it is in // the middle of a Join. -func TestPartitionControlerJoin5C(t *testing.T) { +func TestPartitionControllerJoin5C(t *testing.T) { const ( NSLEEP = 2 RAND = 1000 @@ -759,8 +755,8 @@ func TestPartitionControlerJoin5C(t *testing.T) { ch <- ts.join(sck, ngid, ts.Group(ngid).SrvNames()) }() - // sleep for a while to get the chance for the controler to get stuck - // in join or leave, because gid is down + // sleep for a while to get the chance for the controller to get + // stuck in join, because gid is down time.Sleep(1 * time.Second) // partition sck @@ -771,19 +767,20 @@ func TestPartitionControlerJoin5C(t *testing.T) { ts.Group(ngid).StartServers() - // start new controler to supersede partitioned one, - // it will also be stuck + // start new controller to supersede partitioned one, sck0 := ts.makeShardCtrler() if err := sck0.InitController(); err != rpc.OK { t.Fatalf("failed to init controller %v", err) } + scfg, _ := sck0.Query() + if !scfg.IsMember(ngid) { + t.Fatalf("Didn't recover gid %d", ngid) + } + sck0.ExitController() - //log.Printf("reconnect") - - // reconnect old controller, which shouldn't be able - // to do anything + // reconnect old controller, which shouldn't finish ChangeConfigTo clnt.ConnectAll() err := <-ch diff --git a/src/shardkv1/test.go b/src/shardkv1/test.go index f37ee5c..14974a8 100644 --- a/src/shardkv1/test.go +++ b/src/shardkv1/test.go @@ -2,7 +2,7 @@ package shardkv import ( "fmt" - //"log" + "log" "math/rand" "sync" "sync/atomic" @@ -131,21 +131,18 @@ func (ts *Test) join(sck *shardctrler.ShardCtrler, gid tester.Tgid, srvs []strin newcfg := cfg.Copy() ok := newcfg.JoinBalance(map[tester.Tgid][]string{gid: srvs}) if !ok { - return rpc.ErrVersion + log.Fatalf("join: group %d is already present", gid) } - err := sck.ChangeConfigTo(newcfg) - return err + return sck.ChangeConfigTo(newcfg) } func (ts *Test) joinGroups(sck *shardctrler.ShardCtrler, gids []tester.Tgid) rpc.Err { - for i, gid := range gids { + for _, gid := range gids { ts.Config.MakeGroupStart(gid, NSRV, ts.StartServerShardGrp) if err := ts.join(sck, gid, ts.Group(gid).SrvNames()); err != rpc.OK { return err } - if i < len(gids)-1 { - time.Sleep(INTERGRPDELAY * time.Millisecond) - } + time.Sleep(INTERGRPDELAY * time.Millisecond) } return rpc.OK } @@ -156,20 +153,18 @@ func (ts *Test) leave(sck *shardctrler.ShardCtrler, gid tester.Tgid) rpc.Err { newcfg := cfg.Copy() ok := newcfg.LeaveBalance([]tester.Tgid{gid}) if !ok { - return rpc.ErrVersion + log.Fatalf("leave: group %d is already not present", gid) } return sck.ChangeConfigTo(newcfg) } func (ts *Test) leaveGroups(sck *shardctrler.ShardCtrler, gids []tester.Tgid) rpc.Err { - for i, gid := range gids { + for _, gid := range gids { if err := ts.leave(sck, gid); err != rpc.OK { return err } ts.Config.ExitGroup(gid) - if i < len(gids)-1 { - time.Sleep(INTERGRPDELAY * time.Millisecond) - } + time.Sleep(INTERGRPDELAY * time.Millisecond) } return rpc.OK } @@ -196,31 +191,14 @@ func (ts *Test) disconnectClntFromLeader(clnt *tester.Clnt, gid tester.Tgid) int return l } -func (ts *Test) checkLogs(gids []tester.Tgid) { - for _, gid := range gids { - n := ts.Group(gid).LogSize() - s := ts.Group(gid).SnapshotSize() - if ts.maxraftstate >= 0 && n > 8*ts.maxraftstate { - ts.t.Fatalf("persister.RaftStateSize() %v, but maxraftstate %v", - n, ts.maxraftstate) - } - if ts.maxraftstate < 0 && s > 0 { - ts.t.Fatalf("maxraftstate is -1, but snapshot is non-empty!") - } - - } -} - // make sure that the data really is sharded by // shutting down one shard and checking that some // Get()s don't succeed. -func (ts *Test) checkShutdownSharding(down, up tester.Tgid, ka []string, va []string) { +func (ts *Test) checkShutdownSharding(down tester.Tgid, ka []string, va []string) { const NSEC = 2 ts.Group(down).Shutdown() - ts.checkLogs([]tester.Tgid{down, up}) // forbid snapshots - n := len(ka) ch := make(chan string) done := int32(0) @@ -239,7 +217,6 @@ func (ts *Test) checkShutdownSharding(down, up tester.Tgid, ka []string, va []st }(xi) } - // wait a bit, only about half the Gets should succeed. ndone := 0 for atomic.LoadInt32(&done) != 1 { select { @@ -254,9 +231,9 @@ func (ts *Test) checkShutdownSharding(down, up tester.Tgid, ka []string, va []st } } - //log.Printf("%d completions out of %d; down %d", ndone, n, down) + // log.Printf("%d completions out of %d; down %d", ndone, n, down) if ndone >= n { - ts.Fatalf("expected less than %d completions with one shard dead\n", n) + ts.Fatalf("expected less than %d completions with shard %d down\n", n, down) } // bring the crashed shard/group back to life. @@ -360,8 +337,8 @@ func (ts *Test) killCtrler(ck kvtest.IKVClerk, gid tester.Tgid, ka, va []string) sck0.ExitController() if ts.leases { - // reconnect old controller, which shouldn't be able - // to do anything + // reconnect old controller, which should bail out, because + // it has been superseded. clnt.ConnectAll() time.Sleep(1 * time.Second) diff --git a/src/tester1/group.go b/src/tester1/group.go index d12a32c..6f0488b 100644 --- a/src/tester1/group.go +++ b/src/tester1/group.go @@ -74,6 +74,7 @@ type ServerGrp struct { gid Tgid connected []bool // whether each server is on the net mks FstartServer + mu sync.Mutex } func makeSrvGrp(net *labrpc.Network, gid Tgid, n int, mks FstartServer) *ServerGrp { @@ -174,7 +175,9 @@ func (sg *ServerGrp) connect(i int, to []int) { func (sg *ServerGrp) disconnect(i int, from []int) { // log.Printf("%p: disconnect peer %d from %v\n", sg, i, from) + sg.mu.Lock() sg.connected[i] = false + sg.mu.Unlock() // outgoing socket files sg.srvs[i].disconnect(from) @@ -195,6 +198,8 @@ func (sg *ServerGrp) DisconnectAll(i int) { } func (sg *ServerGrp) IsConnected(i int) bool { + defer sg.mu.Unlock() + sg.mu.Lock() return sg.connected[i] } diff --git a/src/tester1/srv.go b/src/tester1/srv.go index e31a55c..78a4b08 100644 --- a/src/tester1/srv.go +++ b/src/tester1/srv.go @@ -87,7 +87,7 @@ func (s *Server) shutdownServer() { // inform all services to stop for _, svc := range s.svcs { if svc != nil { - go svc.Kill() + svc.Kill() } } s.svcs = nil