From 62fb62d447f0e9e0c7d14d958e5daeaf0383d9ab Mon Sep 17 00:00:00 2001 From: kenctrl Date: Tue, 11 Mar 2025 01:31:16 -0400 Subject: [PATCH] update --- .check-build | 2 +- Makefile | 4 +- src/kvraft1/rsm/rsm.go | 4 +- src/kvraft1/rsm/rsm_test.go | 151 +++++++++++++++++++++++++- src/kvraft1/rsm/server.go | 33 ++++-- src/kvraft1/rsm/test.go | 27 ++++- src/labrpc/labrpc.go | 9 +- src/labrpc/test_test.go | 62 ++++++++--- src/raft1/raft.go | 17 +-- src/raft1/raft_test.go | 15 +-- src/raft1/test.go | 4 +- src/shardkv1/shardctrler/client.go | 49 +++++++++ src/shardkv1/shardctrler/lock/lock.go | 58 ++++++++++ src/tester1/srv.go | 2 +- 14 files changed, 380 insertions(+), 57 deletions(-) create mode 100644 src/shardkv1/shardctrler/client.go create mode 100644 src/shardkv1/shardctrler/lock/lock.go diff --git a/.check-build b/.check-build index e41ee3b..eaf7fb3 100755 --- a/.check-build +++ b/.check-build @@ -78,7 +78,7 @@ main() { "lab1") check_lab1;; "lab2") check_lab2;; "lab3a"|"lab3b"|"lab3c"|"lab3d") check_lab3;; - "lab4a"|"lab4b") check_lab4;; + "lab4a"|"lab4b"|"lab4c") check_lab4;; "lab5a") check_lab5a;; "lab5b") check_lab5b;; *) die "unknown lab: $labnum";; diff --git a/Makefile b/Makefile index 416d719..6863063 100644 --- a/Makefile +++ b/Makefile @@ -1,8 +1,8 @@ # This is the Makefile helping you submit the labs. # Submit your lab with the following command: -# $ make [lab1|lab2|lab3a|lab3b|lab3c|lab3d|lab4a|lab4b|lab5a|lab5b] +# $ make [lab1|lab2|lab3a|lab3b|lab3c|lab3d|lab4a|lab4b|lab4c|lab5a|lab5b] -LABS=" lab1 lab2 lab3a lab3b lab3c lab3d lab4a lab4b lab5a lab5b " +LABS=" lab1 lab2 lab3a lab3b lab3c lab3d lab4a lab4b lab4c lab5a lab5b " %: check-% @echo "Preparing $@-handin.tar.gz" diff --git a/src/kvraft1/rsm/rsm.go b/src/kvraft1/rsm/rsm.go index 68af211..c5dba42 100644 --- a/src/kvraft1/rsm/rsm.go +++ b/src/kvraft1/rsm/rsm.go @@ -80,8 +80,8 @@ func (rsm *RSM) Raft() raftapi.Raft { func (rsm *RSM) Submit(req any) (rpc.Err, any) { // Submit creates an Op structure to run a command through Raft; - // for example: op := Op{Id: rsm.nextId, Req: req}, where req is - // the argument to Submit and rsm.nextId a unique id for the op. + // for example: op := Op{Me: rsm.me, Id: id, Req: req}, where req + // is the argument to Submit and id is a unique id for the op. // your code here return rpc.ErrWrongLeader, nil // i'm dead, try another server. diff --git a/src/kvraft1/rsm/rsm_test.go b/src/kvraft1/rsm/rsm_test.go index f09203b..51fca2c 100644 --- a/src/kvraft1/rsm/rsm_test.go +++ b/src/kvraft1/rsm/rsm_test.go @@ -105,7 +105,7 @@ func TestLeaderPartition4A(t *testing.T) { time.Sleep(10 * time.Millisecond) // submit an Inc in the majority - rep := ts.oneIncPartition(p1) + rep := ts.onePartition(p1, Inc{}) select { case err := <-done: @@ -123,7 +123,154 @@ func TestLeaderPartition4A(t *testing.T) { } // check that all replicas have the same value for counter - ts.checkCounter(rep.N, NSRV) + ts.checkCounter(rep.(*IncRep).N, NSRV) +} + +// test that restart replays Incs +func TestRestartReplay4A(t *testing.T) { + const ( + NINC = 100 + NSUBMIT = 100 + ) + + ts := makeTest(t, -1) + defer ts.cleanup() + + ts.Begin("Test Restart") + + for i := 0; i < NINC; i++ { + r := ts.oneInc() + if r.N != i+1 { + ts.t.Fatalf("expected %d instead of %d", i, r.N) + } + ts.checkCounter(r.N, NSRV) + } + + ts.Group(Gid).Shutdown() + + time.Sleep(1 * time.Second) + + ts.Group(Gid).StartServers() + + // submit an Inc + r := ts.oneInc() + + if r.N != NINC+1 { + t.Fatalf("Expected %d got %d", NINC+1, r.N) + } + + time.Sleep(1 * time.Second) + + ts.checkCounter(r.N, NSRV) +} + +// Test if Submit() terminates after tester's Shutdown() has called +// raft's Kill(). Kill() should cause your raft to close the applyCh +// passed to it in Make(), which in turns allows rsm to know that it +// is done. +func TestShutdown4A(t *testing.T) { + const ( + NSUBMIT = 100 + ) + + ts := makeTest(t, -1) + defer ts.cleanup() + + ts.Begin("Test Shutdown") + + // Submit many Null's concurrently + done := make(chan struct{}) + go func() { + var wg sync.WaitGroup + for i := 0; i < NSUBMIT; i++ { + wg.Add(1) + go func(i int) { + defer wg.Done() + ts.oneNull() + }(i) + } + wg.Wait() + done <- struct{}{} + }() + + // give some time to submit + time.Sleep(20 * time.Millisecond) + + ts.Group(Gid).Shutdown() + + select { + case <-done: + case <-time.After((NSEC + 1) * time.Second): + ts.Fatalf("Submit didn't stop after shutdown") + } +} + +// Test if commands after restart don't get confused with ones +// submitted before Shutdown() +func TestRestartSubmit4A(t *testing.T) { + const ( + NINC = 100 + NSUBMIT = 100 + ) + + ts := makeTest(t, -1) + defer ts.cleanup() + + ts.Begin("Test Restart and submit") + + for i := 0; i < NINC; i++ { + r := ts.oneInc() + if r.N != i+1 { + ts.t.Fatalf("expected %d instead of %d", i, r.N) + } + ts.checkCounter(r.N, NSRV) + } + + ts.Group(Gid).Shutdown() + + time.Sleep(1 * time.Second) + + ts.Group(Gid).StartServers() + + // submit an Inc + r := ts.oneInc() + + if r.N != NINC+1 { + t.Fatalf("Expected %d got %d", NINC+1, r.N) + } + + time.Sleep(1 * time.Second) + + // Submit many Null's concurrently + done := make(chan struct{}) + go func() { + var wg sync.WaitGroup + for i := 0; i < NSUBMIT; i++ { + wg.Add(1) + go func(i int) { + defer wg.Done() + ts.oneNull() + }(i) + } + wg.Wait() + done <- struct{}{} + }() + + // give some time to submit + time.Sleep(20 * time.Millisecond) + + ts.Group(Gid).Shutdown() + + select { + case <-done: + case <-time.After((NSEC + 1) * time.Second): + ts.Fatalf("Submit didn't stop after shutdown") + } + + ts.Group(Gid).StartServers() + + r = ts.oneInc() + ts.checkCounter(r.N, NSRV) } // test snapshot and restore diff --git a/src/kvraft1/rsm/server.go b/src/kvraft1/rsm/server.go index aeed0eb..2632e17 100644 --- a/src/kvraft1/rsm/server.go +++ b/src/kvraft1/rsm/server.go @@ -14,11 +14,17 @@ import ( type Inc struct { } -type Dec struct { +type IncRep struct { + N int } -type Rep struct { - N int +type Null struct { +} + +type NullRep struct { +} + +type Dec struct { } type rsmSrv struct { @@ -33,7 +39,9 @@ func makeRsmSrv(ts *Test, srv int, ends []*labrpc.ClientEnd, persister *tester.P //log.Printf("mksrv %d", srv) labgob.Register(Op{}) labgob.Register(Inc{}) - labgob.Register(Rep{}) + labgob.Register(IncRep{}) + labgob.Register(Null{}) + labgob.Register(NullRep{}) labgob.Register(Dec{}) s := &rsmSrv{ ts: ts, @@ -44,15 +52,20 @@ func makeRsmSrv(ts *Test, srv int, ends []*labrpc.ClientEnd, persister *tester.P } func (rs *rsmSrv) DoOp(req any) any { - //log.Printf("%d: DoOp: %v", rs.me, req) - if _, ok := req.(Inc); ok == false { + //log.Printf("%d: DoOp: %T(%v)", rs.me, req, req) + switch req.(type) { + case Inc: + rs.mu.Lock() + rs.counter += 1 + rs.mu.Unlock() + return &IncRep{rs.counter} + case Null: + return &NullRep{} + default: // wrong type! expecting an Inc. log.Fatalf("DoOp should execute only Inc and not %T", req) } - rs.mu.Lock() - rs.counter += 1 - rs.mu.Unlock() - return &Rep{rs.counter} + return nil } func (rs *rsmSrv) Snapshot() []byte { diff --git a/src/kvraft1/rsm/test.go b/src/kvraft1/rsm/test.go index a0afefe..e5d2536 100644 --- a/src/kvraft1/rsm/test.go +++ b/src/kvraft1/rsm/test.go @@ -2,6 +2,7 @@ package rsm import ( //"log" + "sync" "testing" "time" @@ -13,6 +14,7 @@ import ( type Test struct { *tester.Config + mu sync.Mutex t *testing.T g *tester.ServerGrp maxraftstate int @@ -62,7 +64,7 @@ func inPartition(s int, p []int) bool { return false } -func (ts *Test) oneIncPartition(p []int) *Rep { +func (ts *Test) onePartition(p []int, req any) any { // try all the servers, maybe one is the leader but give up after NSEC t0 := time.Now() for time.Since(t0).Seconds() < NSEC { @@ -71,11 +73,13 @@ func (ts *Test) oneIncPartition(p []int) *Rep { if ts.g.IsConnected(index) { s := ts.srvs[index] if s.rsm != nil && inPartition(index, p) { - err, rep := s.rsm.Submit(Inc{}) + err, rep := s.rsm.Submit(req) if err == rpc.OK { + ts.mu.Lock() ts.leader = index + ts.mu.Unlock() //log.Printf("leader = %d", ts.leader) - return rep.(*Rep) + return rep } } } @@ -84,12 +88,23 @@ func (ts *Test) oneIncPartition(p []int) *Rep { time.Sleep(50 * time.Millisecond) //log.Printf("try again: no leader") } - ts.Fatalf("one: took too long") return nil } -func (ts *Test) oneInc() *Rep { - return ts.oneIncPartition(nil) +func (ts *Test) oneInc() *IncRep { + rep := ts.onePartition(nil, Inc{}) + if rep == nil { + return nil + } + return rep.(*IncRep) +} + +func (ts *Test) oneNull() *NullRep { + rep := ts.onePartition(nil, Null{}) + if rep == nil { + return nil + } + return rep.(*NullRep) } func (ts *Test) checkCounter(v int, nsrv int) { diff --git a/src/labrpc/labrpc.go b/src/labrpc/labrpc.go index 15ea0bc..000f833 100644 --- a/src/labrpc/labrpc.go +++ b/src/labrpc/labrpc.go @@ -202,6 +202,13 @@ func (rn *Network) LongDelays(yes bool) { rn.longDelays = yes } +func (rn *Network) IsLongDelays() bool { + rn.mu.Lock() + defer rn.mu.Unlock() + + return rn.longDelays +} + func (rn *Network) readEndnameInfo(endname interface{}) (enabled bool, servername interface{}, server *Server, reliable bool, longreordering bool, ) { @@ -305,7 +312,7 @@ func (rn *Network) processReq(req reqMsg) { } else { // simulate no reply and eventual timeout. ms := 0 - if rn.longDelays { + if rn.IsLongDelays() { // let Raft tests check that leader doesn't send // RPCs synchronously. ms = (rand.Int() % LONGDELAY) diff --git a/src/labrpc/test_test.go b/src/labrpc/test_test.go index 1ec3e65..424dce4 100644 --- a/src/labrpc/test_test.go +++ b/src/labrpc/test_test.go @@ -140,9 +140,7 @@ func TestTypes(t *testing.T) { } } -// // does net.Enable(endname, false) really disconnect a client? -// func TestDisconnect(t *testing.T) { runtime.GOMAXPROCS(4) @@ -179,9 +177,7 @@ func TestDisconnect(t *testing.T) { } } -// // test net.GetCount() -// func TestCounts(t *testing.T) { runtime.GOMAXPROCS(4) @@ -215,9 +211,7 @@ func TestCounts(t *testing.T) { } } -// // test net.GetTotalBytes() -// func TestBytes(t *testing.T) { runtime.GOMAXPROCS(4) @@ -269,9 +263,7 @@ func TestBytes(t *testing.T) { } } -// // test RPCs from concurrent ClientEnds -// func TestConcurrentMany(t *testing.T) { runtime.GOMAXPROCS(4) @@ -327,9 +319,7 @@ func TestConcurrentMany(t *testing.T) { } } -// // test unreliable -// func TestUnreliable(t *testing.T) { runtime.GOMAXPROCS(4) @@ -380,9 +370,7 @@ func TestUnreliable(t *testing.T) { } } -// // test concurrent RPCs from a single ClientEnd -// func TestConcurrentOne(t *testing.T) { runtime.GOMAXPROCS(4) @@ -441,10 +429,8 @@ func TestConcurrentOne(t *testing.T) { } } -// // regression: an RPC that's delayed during Enabled=false // should not delay subsequent RPCs (e.g. after Enabled=true). -// func TestRegression1(t *testing.T) { runtime.GOMAXPROCS(4) @@ -515,11 +501,9 @@ func TestRegression1(t *testing.T) { } } -// // if an RPC is stuck in a server, and the server // is killed with DeleteServer(), does the RPC // get un-stuck? -// func TestKilled(t *testing.T) { runtime.GOMAXPROCS(4) @@ -595,3 +579,49 @@ func TestBenchmark(t *testing.T) { fmt.Printf("%v for %v\n", time.Since(t0), n) // march 2016, rtm laptop, 22 microseconds per RPC } + +// regression: a race between write of rn.longDelays in LongDelays() +// and a read in processReq(). +func TestLongDelayRace(t *testing.T) { + runtime.GOMAXPROCS(4) + + rn := MakeNetwork() + defer rn.Cleanup() + + e := rn.MakeEnd("end1-99") + + js := &JunkServer{} + svc := MakeService(js) + + rs := MakeServer() + rs.AddService(svc) + rn.AddServer("server99", rs) + + rn.Connect("end1-99", "server99") + rn.Enable("end1-99", true) + + { + reply := 0 + e.Call("JunkServer.Handler1", "9099", &reply) + } + + rn.LongDelays(true) + + rn.DeleteServer("server99") + + doneCh := make(chan bool) + go func() { + reply := 0 + e.Call("JunkServer.Handler1", "9099", &reply) + doneCh <- true + }() + + rn.LongDelays(true) + + select { + case <-doneCh: + case <-time.After(200 * time.Millisecond): + } + + rn.LongDelays(true) +} diff --git a/src/raft1/raft.go b/src/raft1/raft.go index d93487f..fd35de5 100644 --- a/src/raft1/raft.go +++ b/src/raft1/raft.go @@ -174,15 +174,16 @@ func (rf *Raft) Start(command interface{}) (int, int, bool) { return index, term, isLeader } -// the tester doesn't halt goroutines created by Raft after each test, -// but it does call the Kill() method. your code can use killed() to -// check whether Kill() has been called. the use of atomic avoids the -// need for a lock. +// The tester calls Kill() when it is done with a Raft instance (e.g., +// when it simulates a crash and restarts the peer or when the test is +// done). Kill allows your implementation (1) to close the applyCh so +// that the application on top of Raft can clean up, and (2) to return +// out of long-running goroutines. // -// the issue is that long-running goroutines use memory and may chew -// up CPU time, perhaps causing later tests to fail and generating -// confusing debug output. any goroutine with a long-running loop -// should call killed() to check whether it should stop. +// Long-running goroutines use memory and may chew up CPU time, +// perhaps causing later tests to fail and generating confusing debug +// output. any goroutine with a long-running loop should call killed() +// to check whether it should stop. func (rf *Raft) Kill() { atomic.StoreInt32(&rf.dead, 1) // Your code here, if desired. diff --git a/src/raft1/raft_test.go b/src/raft1/raft_test.go index d11b9d1..a56e9b2 100644 --- a/src/raft1/raft_test.go +++ b/src/raft1/raft_test.go @@ -675,7 +675,7 @@ loop: desp := fmt.Sprintf("leader %v adds the command at the wrong index", leader) details := fmt.Sprintf( "the command should locate at index %v, but the leader puts it at %v", - starti + i, index1) + starti+i, index1) tester.AnnotateCheckerFailure(desp, details) t.Fatalf("Start() failed") } @@ -688,13 +688,13 @@ loop: // term changed -- try again details := fmt.Sprintf( "term changed while waiting for %v servers to commit index %v", - servers, starti + i) + servers, starti+i) tester.AnnotateCheckerNeutral(despretry, details) continue loop } details := fmt.Sprintf( "the command submitted at index %v in term %v is %v, but read %v", - starti + i, term, cmds[i - 1], cmd) + starti+i, term, cmds[i-1], cmd) tester.AnnotateCheckerFailure("incorrect command committed", details) t.Fatalf("wrong value %v committed for index %v; expected %v\n", cmd, starti+i, cmds) } @@ -750,12 +750,12 @@ loop: if total3-total2 > 3*20 { details := fmt.Sprintf("number of RPC used for 1 second of idleness = %v > %v", - total3-total2, 3 * 20) + total3-total2, 3*20) tester.AnnotateCheckerFailure("used too many RPCs in idle", details) t.Fatalf("too many RPCs (%v) for 1 second of idleness\n", total3-total2) } details := fmt.Sprintf("number of RPC used for 1 second of idleness = %v <= %v", - total3-total2, 3 * 20) + total3-total2, 3*20) tester.AnnotateCheckerSuccess( "used a reasonable number of RPCs in idle", details) } @@ -913,9 +913,10 @@ func TestFigure83C(t *testing.T) { for iters := 0; iters < 1000; iters++ { leader := -1 for i := 0; i < servers; i++ { - if ts.srvs[i].Raft() != nil { + rf := ts.srvs[i].Raft() + if rf != nil { cmd := rand.Int() - _, _, ok := ts.srvs[i].Raft().Start(cmd) + _, _, ok := rf.Start(cmd) if ok { text := fmt.Sprintf("submitted command %v to server %v", cmd, i) tester.AnnotateInfo(text, text) diff --git a/src/raft1/test.go b/src/raft1/test.go index 8e2207f..789cede 100644 --- a/src/raft1/test.go +++ b/src/raft1/test.go @@ -111,7 +111,7 @@ func (ts *Test) checkTerms() int { term = xterm } else if term != xterm { details := fmt.Sprintf("node ids -> terms = { %v -> %v; %v -> %v }", - i - 1, term, i, xterm) + i-1, term, i, xterm) tester.AnnotateCheckerFailure("term disagreed", details) ts.Fatalf("servers disagree on term") } @@ -237,7 +237,9 @@ func (ts *Test) one(cmd any, expectedServers int, retry bool) int { starts = (starts + 1) % len(ts.srvs) var rf raftapi.Raft if ts.g.IsConnected(starts) { + ts.srvs[starts].mu.Lock() rf = ts.srvs[starts].raft + ts.srvs[starts].mu.Unlock() } if rf != nil { //log.Printf("peer %d Start %v", starts, cmd) diff --git a/src/shardkv1/shardctrler/client.go b/src/shardkv1/shardctrler/client.go new file mode 100644 index 0000000..55ecaaa --- /dev/null +++ b/src/shardkv1/shardctrler/client.go @@ -0,0 +1,49 @@ +package shardctrler + +import ( + // "log" + "sync/atomic" + + "6.5840/kvsrv1/rpc" + "6.5840/tester1" +) + +type Clerk struct { + clnt *tester.Clnt + servers []string + deposed *int32 + // You will have to modify this struct. +} + +// The shard controller can use MakeClerk to make a clerk for the kvraft +// group with the servers `servers`. +func MakeClerk(clnt *tester.Clnt, servers []string, deposed *int32) *Clerk { + ck := &Clerk{clnt: clnt, servers: servers, deposed: deposed} + // You may add code here. + return ck +} + +func (ck *Clerk) isDeposed() bool { + z := atomic.LoadInt32(ck.deposed) + return z == 1 +} + +// You can reuse your kvraft Get +func (ck *Clerk) Get(key string) (string, rpc.Tversion, rpc.Err) { + args := rpc.GetArgs{} + args.Key = key + + // You'll have to add code here. + return "", 0, "" +} + +// You can reuse your kvraft Put +func (ck *Clerk) Put(key string, value string, version rpc.Tversion) rpc.Err { + args := rpc.PutArgs{} + args.Key = key + args.Value = value + args.Version = version + + // You'll have to add code here. + return "" +} diff --git a/src/shardkv1/shardctrler/lock/lock.go b/src/shardkv1/shardctrler/lock/lock.go new file mode 100644 index 0000000..0b76170 --- /dev/null +++ b/src/shardkv1/shardctrler/lock/lock.go @@ -0,0 +1,58 @@ +package lock + +import ( + "log" + "time" + + "6.5840/kvsrv1/rpc" + "6.5840/kvtest1" +) + +type Lock struct { + kvtest.IKVClerk + l string + id string + ver rpc.Tversion +} + +func MakeLock(ck kvtest.IKVClerk, l string) *Lock { + lk := &Lock{IKVClerk: ck} + // You may add core here + return lk +} + +func (lk *Lock) AcquireLeadership() { + for { + if val, ver, err := lk.Get(lk.l); err == rpc.OK { + if val == "" { // put only when lock is free + if err := lk.Put(lk.l, lk.id, ver); err == rpc.OK { + lk.ver = ver + 1 + return + } else if err == rpc.ErrMaybe { // check if put succeeded? + if val, ver, err := lk.Get(lk.l); err == rpc.OK { + if val == lk.id { + lk.ver = ver + return + } + } + } + } + time.Sleep(1 * time.Millisecond) + } + } +} + +// for two testing purposes: 1) for the ctrler that is a leader to +// give up its leadership; 2) to take back leadership from a +// partitioned/deposed ctrler using a new ctrler. +func (lk *Lock) ReleaseLeadership() rpc.Err { + _, ver, err := lk.Get(lk.l) + if err != rpc.OK { + log.Printf("ResetLock: %v err %v", lk.l, err) + } + if err := lk.Put(lk.l, "", ver); err == rpc.OK || err == rpc.ErrMaybe { + return rpc.OK + } else { + return err + } +} diff --git a/src/tester1/srv.go b/src/tester1/srv.go index 045ae3f..4703b3f 100644 --- a/src/tester1/srv.go +++ b/src/tester1/srv.go @@ -83,7 +83,7 @@ func (s *Server) shutdownServer() { // inform all services to stop for _, svc := range s.svcs { if svc != nil { - svc.Kill() + go svc.Kill() } } s.svcs = nil