From c8e7d779c29c68fa42edba2537f065752fa3cdc3 Mon Sep 17 00:00:00 2001 From: Frans Kaashoek Date: Tue, 18 Feb 2025 07:04:18 -0500 Subject: [PATCH] update --- src/kvraft1/kvraft_test.go | 12 +++-- src/kvraft1/rsm/rsm.go | 25 +++++---- src/kvraft1/rsm/rsm_test.go | 57 ++++++++++++++++---- src/kvraft1/rsm/server.go | 10 ++-- src/kvraft1/rsm/test.go | 4 +- src/kvtest1/kvtest.go | 15 ++++-- src/raft1/raft.go | 50 +++++------------- src/raft1/server.go | 19 ++++--- src/raft1/test.go | 5 +- src/raftapi/raftapi.go | 40 ++++++++++++++ src/shardkv1/shardctrler/client.go | 49 +++++++++++++++++ src/tester1/clnts.go | 10 ++-- src/tester1/tester_test.go | 84 ++++++++++++++++++++++++++++++ 13 files changed, 298 insertions(+), 82 deletions(-) create mode 100644 src/raftapi/raftapi.go create mode 100644 src/shardkv1/shardctrler/client.go create mode 100644 src/tester1/tester_test.go diff --git a/src/kvraft1/kvraft_test.go b/src/kvraft1/kvraft_test.go index 8a77607..d3c40c7 100644 --- a/src/kvraft1/kvraft_test.go +++ b/src/kvraft1/kvraft_test.go @@ -1,7 +1,7 @@ package kvraft import ( - // "log" + //"log" "strconv" "testing" "time" @@ -288,7 +288,11 @@ func TestPersistPartitionUnreliableLinearizable4B(t *testing.T) { // also checks that majority discards committed log entries // even if minority doesn't respond. func TestSnapshotRPC4C(t *testing.T) { - ts := MakeTest(t, "4C SnapshotsRPC", 0, 3, false, false, false, 1000, false) + const ( + NSRV = 3 + MAXRAFTSTATE = 1000 + ) + ts := MakeTest(t, "4C SnapshotsRPC", 0, NSRV, false, false, false, MAXRAFTSTATE, false) defer ts.Cleanup() ck := ts.MakeClerk() @@ -304,10 +308,10 @@ func TestSnapshotRPC4C(t *testing.T) { { ck1 := ts.MakeClerkTo([]int{0, 1}) for i := 0; i < 50; i++ { - verb = ts.PutAtLeastOnce(ck1, strconv.Itoa(i), strconv.Itoa(i), rpc.Tversion(0), -1) + ts.PutAtLeastOnce(ck1, strconv.Itoa(i), strconv.Itoa(i), rpc.Tversion(0), -1) } time.Sleep(kvtest.ElectionTimeout) - verb = ts.PutAtLeastOnce(ck1, "b", "B", verb, -1) + verb = ts.PutAtLeastOnce(ck1, "b", "B", rpc.Tversion(0), -1) } // check that the majority partition has thrown away diff --git a/src/kvraft1/rsm/rsm.go b/src/kvraft1/rsm/rsm.go index bd9cdc0..68af211 100644 --- a/src/kvraft1/rsm/rsm.go +++ b/src/kvraft1/rsm/rsm.go @@ -6,10 +6,14 @@ import ( "6.5840/kvsrv1/rpc" "6.5840/labrpc" "6.5840/raft1" + "6.5840/raftapi" "6.5840/tester1" ) +var useRaftStateMachine bool // to plug in another raft besided raft1 + + type Op struct { // Your definitions here. // Field names must start with capital letters, @@ -32,8 +36,8 @@ type StateMachine interface { type RSM struct { mu sync.Mutex me int - rf *raft.Raft - applyCh chan raft.ApplyMsg + rf raftapi.Raft + applyCh chan raftapi.ApplyMsg maxraftstate int // snapshot if log grows this big sm StateMachine // Your definitions here. @@ -56,24 +60,23 @@ func MakeRSM(servers []*labrpc.ClientEnd, me int, persister *tester.Persister, m rsm := &RSM{ me: me, maxraftstate: maxraftstate, - applyCh: make(chan raft.ApplyMsg), + applyCh: make(chan raftapi.ApplyMsg), sm: sm, } - rsm.rf = raft.Make(servers, me, persister, rsm.applyCh) + if !useRaftStateMachine { + rsm.rf = raft.Make(servers, me, persister, rsm.applyCh) + } return rsm } -func (rsm *RSM) Raft() *raft.Raft { +func (rsm *RSM) Raft() raftapi.Raft { return rsm.rf } -// submit a command to Raft, -// and wait for it to be committed. -// -// returns (executeError, executeResult) -// if executeError==ErrWrongLeader, client should find new leader -// and try again. +// Submit a command to Raft, and wait for it to be committed. It +// should return ErrWrongLeader if client should find new leader and +// try again. func (rsm *RSM) Submit(req any) (rpc.Err, any) { // Submit creates an Op structure to run a command through Raft; diff --git a/src/kvraft1/rsm/rsm_test.go b/src/kvraft1/rsm/rsm_test.go index fa56f1c..f09203b 100644 --- a/src/kvraft1/rsm/rsm_test.go +++ b/src/kvraft1/rsm/rsm_test.go @@ -2,6 +2,7 @@ package rsm import ( //"log" + "sync" "testing" "time" @@ -10,11 +11,12 @@ import ( // test that each server executes increments and updates its counter. func TestBasic4A(t *testing.T) { + const NINC = 10 ts := makeTest(t, -1) defer ts.cleanup() ts.Begin("Test RSM basic") - for i := 0; i < 10; i++ { + for i := 0; i < NINC; i++ { r := ts.oneInc() if r.N != i+1 { ts.t.Fatalf("expected %d instead of %d", i, r.N) @@ -23,6 +25,26 @@ func TestBasic4A(t *testing.T) { } } +// test submitting concurrently +func TestConcurrent4A(t *testing.T) { + const NINC = 50 + ts := makeTest(t, -1) + defer ts.cleanup() + + ts.Begin("Test concurrent submit") + + var wg sync.WaitGroup + for i := 0; i < NINC; i++ { + wg.Add(1) + go func() { + defer wg.Done() + ts.oneInc() + }() + } + wg.Wait() + ts.checkCounter(NINC, NSRV) +} + // test that each server executes increments after disconnecting and // reconnecting leader func TestLeaderFailure4A(t *testing.T) { @@ -43,7 +65,10 @@ func TestLeaderFailure4A(t *testing.T) { ts.checkCounter(r.N, NSRV) } +// test that partitioned leader doesn't commit an operation func TestLeaderPartition4A(t *testing.T) { + const NSUBMIT = 100 + ts := makeTest(t, -1) defer ts.cleanup() @@ -58,18 +83,33 @@ func TestLeaderPartition4A(t *testing.T) { p1, p2 := ts.Group(Gid).MakePartition(l) ts.Group(Gid).Partition(p1, p2) - done := make(chan rpc.Err) + done := make(chan struct{}) go func() { - err, _ := ts.srvs[l].rsm.Submit(Inc{}) - done <- err + // Submit many Dec's concurrently, which will results in many + // raft.Start()'s, but none should commit and be executed. + var wg sync.WaitGroup + for i := 0; i < NSUBMIT; i++ { + wg.Add(1) + go func(i int) { + defer wg.Done() + if err, rep := ts.srvs[l].rsm.Submit(Dec{}); err == rpc.OK { + t.Fatalf("Submit %d in minority completed %v", i, rep) + } + }(i) + } + wg.Wait() + done <- struct{}{} }() + // give old leader some time to submit + time.Sleep(10 * time.Millisecond) + // submit an Inc in the majority rep := ts.oneIncPartition(p1) select { - case ver := <-done: - ts.Fatalf("Inc in minority completed %v", ver) + case err := <-done: + ts.Fatalf("Dec's in minority completed %v", err) case <-time.After(time.Second): } @@ -77,10 +117,7 @@ func TestLeaderPartition4A(t *testing.T) { ts.connect(l) select { - case err := <-done: - if err == rpc.OK { - ts.Fatalf("Inc in minority didn't fail") - } + case <-done: case <-time.After(time.Second): ts.Fatalf("Submit after healing didn't return") } diff --git a/src/kvraft1/rsm/server.go b/src/kvraft1/rsm/server.go index 307c257..aeed0eb 100644 --- a/src/kvraft1/rsm/server.go +++ b/src/kvraft1/rsm/server.go @@ -7,13 +7,16 @@ import ( "6.5840/labgob" "6.5840/labrpc" - "6.5840/raft1" + "6.5840/raftapi" "6.5840/tester1" ) type Inc struct { } +type Dec struct { +} + type Rep struct { N int } @@ -31,6 +34,7 @@ func makeRsmSrv(ts *Test, srv int, ends []*labrpc.ClientEnd, persister *tester.P labgob.Register(Op{}) labgob.Register(Inc{}) labgob.Register(Rep{}) + labgob.Register(Dec{}) s := &rsmSrv{ ts: ts, me: srv, @@ -43,7 +47,7 @@ func (rs *rsmSrv) DoOp(req any) any { //log.Printf("%d: DoOp: %v", rs.me, req) if _, ok := req.(Inc); ok == false { // wrong type! expecting an Inc. - log.Fatalf("DoOp called with the wrong type") + log.Fatalf("DoOp should execute only Inc and not %T", req) } rs.mu.Lock() rs.counter += 1 @@ -76,7 +80,7 @@ func (rs *rsmSrv) Kill() { rs.rsm = nil } -func (rs *rsmSrv) Raft() *raft.Raft { +func (rs *rsmSrv) Raft() raftapi.Raft { rs.mu.Lock() defer rs.mu.Unlock() return rs.rsm.Raft() diff --git a/src/kvraft1/rsm/test.go b/src/kvraft1/rsm/test.go index 1ad62a9..a0afefe 100644 --- a/src/kvraft1/rsm/test.go +++ b/src/kvraft1/rsm/test.go @@ -7,7 +7,7 @@ import ( "6.5840/kvsrv1/rpc" "6.5840/labrpc" - "6.5840/raft1" + "6.5840/raftapi" "6.5840/tester1" ) @@ -135,7 +135,7 @@ func Leader(cfg *tester.Config, gid tester.Tgid) (bool, int) { for i, ss := range cfg.Group(gid).Services() { for _, s := range ss { switch r := s.(type) { - case *raft.Raft: + case raftapi.Raft: _, isLeader := r.GetState() if isLeader { return true, i diff --git a/src/kvtest1/kvtest.go b/src/kvtest1/kvtest.go index 6e5562d..ddf07cd 100644 --- a/src/kvtest1/kvtest.go +++ b/src/kvtest1/kvtest.go @@ -2,7 +2,7 @@ package kvtest import ( "encoding/json" - // "log" + //"log" "math/rand" "strconv" "testing" @@ -76,12 +76,21 @@ func (ts *Test) MakeClerk() IKVClerk { return ts.mck.MakeClerk() } +// Assumes different ck's put to different keys func (ts *Test) PutAtLeastOnce(ck IKVClerk, key, value string, ver rpc.Tversion, me int) rpc.Tversion { for true { - if err := ts.Put(ck, key, value, ver, me); err == rpc.OK { + err := ts.Put(ck, key, value, ver, me) + if err == rpc.OK { break } - ver += 1 + if err == rpc.ErrMaybe || err == rpc.ErrVersion { + ver += 1 + } else { + // if failed with ver = 0, retry + if ver != 0 { // check that ver is indeed 0 + ts.Fatalf("Put %v ver %d err %v", key, ver, err) + } + } } return ver } diff --git a/src/raft1/raft.go b/src/raft1/raft.go index 6c8e39e..d93487f 100644 --- a/src/raft1/raft.go +++ b/src/raft1/raft.go @@ -1,21 +1,10 @@ package raft +// The file raftapi/raft.go defines the interface that raft must +// expose to servers (or the tester), but see comments below for each +// of these functions for more details. // -// this is an outline of the API that raft must expose to -// the service (or tester). see comments below for -// each of these functions for more details. -// -// rf = Make(...) -// create a new Raft server. -// rf.Start(command interface{}) (index, term, isleader) -// start agreement on a new log entry -// rf.GetState() (term, isLeader) -// ask a Raft for its current term, and whether it thinks it is leader -// ApplyMsg -// each time a new entry is committed to the log, each Raft peer -// should send an ApplyMsg to the service (or tester) -// in the same server. -// +// Make() creates a new raft peer that implements the raft interface. import ( // "bytes" @@ -26,31 +15,11 @@ import ( // "6.5840/labgob" "6.5840/labrpc" + "6.5840/raftapi" "6.5840/tester1" ) -// as each Raft peer becomes aware that successive log entries are -// committed, the peer should send an ApplyMsg to the service (or -// tester) on the same server, via the applyCh passed to Make(). set -// CommandValid to true to indicate that the ApplyMsg contains a newly -// committed log entry. -// -// in part 3D you'll want to send other kinds of messages (e.g., -// snapshots) on the applyCh, but set CommandValid to false for these -// other uses. -type ApplyMsg struct { - CommandValid bool - Command interface{} - CommandIndex int - - // For 3D: - SnapshotValid bool - Snapshot []byte - SnapshotTerm int - SnapshotIndex int -} - // A Go object implementing a single Raft peer. type Raft struct { mu sync.Mutex // Lock to protect shared access to this peer's state @@ -114,6 +83,13 @@ func (rf *Raft) readPersist(data []byte) { // } } +// how many bytes in Raft's persisted log? +func (rf *Raft) PersistBytes() int { + rf.mu.Lock() + defer rf.mu.Unlock() + return rf.persister.RaftStateSize() +} + // the service says it has created a snapshot that has // all info up to and including index. this means the @@ -241,7 +217,7 @@ func (rf *Raft) ticker() { // Make() must return quickly, so it should start goroutines // for any long-running work. func Make(peers []*labrpc.ClientEnd, me int, - persister *tester.Persister, applyCh chan ApplyMsg) *Raft { + persister *tester.Persister, applyCh chan raftapi.ApplyMsg) raftapi.Raft { rf := &Raft{} rf.peers = peers rf.persister = persister diff --git a/src/raft1/server.go b/src/raft1/server.go index 032b98a..2cac4a9 100644 --- a/src/raft1/server.go +++ b/src/raft1/server.go @@ -8,13 +8,18 @@ import ( "6.5840/labgob" "6.5840/labrpc" + "6.5840/raftapi" "6.5840/tester1" + ) const ( SnapShotInterval = 10 ) +var useRaftStateMachine bool // to plug in another raft besided raft1 + + type rfsrv struct { ts *Test me int @@ -23,7 +28,7 @@ type rfsrv struct { persister *tester.Persister mu sync.Mutex - raft *Raft + raft raftapi.Raft logs map[int]any // copy of each server's committed entries } @@ -35,8 +40,10 @@ func newRfsrv(ts *Test, srv int, ends []*labrpc.ClientEnd, persister *tester.Per logs: map[int]any{}, persister: persister, } - applyCh := make(chan ApplyMsg) - s.raft = Make(ends, srv, persister, applyCh) + applyCh := make(chan raftapi.ApplyMsg) + if !useRaftStateMachine { + s.raft = Make(ends, srv, persister, applyCh) + } if snapshot { snapshot := persister.ReadSnapshot() if snapshot != nil && len(snapshot) > 0 { @@ -74,7 +81,7 @@ func (rs *rfsrv) GetState() (int, bool) { return rs.raft.GetState() } -func (rs *rfsrv) Raft() *Raft { +func (rs *rfsrv) Raft() raftapi.Raft { rs.mu.Lock() defer rs.mu.Unlock() return rs.raft @@ -89,7 +96,7 @@ func (rs *rfsrv) Logs(i int) (any, bool) { // applier reads message from apply ch and checks that they match the log // contents -func (rs *rfsrv) applier(applyCh chan ApplyMsg) { +func (rs *rfsrv) applier(applyCh chan raftapi.ApplyMsg) { for m := range applyCh { if m.CommandValid == false { // ignore other types of ApplyMsg @@ -109,7 +116,7 @@ func (rs *rfsrv) applier(applyCh chan ApplyMsg) { } // periodically snapshot raft state -func (rs *rfsrv) applierSnap(applyCh chan ApplyMsg) { +func (rs *rfsrv) applierSnap(applyCh chan raftapi.ApplyMsg) { if rs.raft == nil { return // ??? } diff --git a/src/raft1/test.go b/src/raft1/test.go index 68bae44..0eb1aac 100644 --- a/src/raft1/test.go +++ b/src/raft1/test.go @@ -10,6 +10,7 @@ import ( "time" "6.5840/labrpc" + "6.5840/raftapi" "6.5840/tester1" ) @@ -107,7 +108,7 @@ func (ts *Test) checkTerms() int { return term } -func (ts *Test) checkLogs(i int, m ApplyMsg) (string, bool) { +func (ts *Test) checkLogs(i int, m raftapi.ApplyMsg) (string, bool) { ts.mu.Lock() defer ts.mu.Unlock() @@ -189,7 +190,7 @@ func (ts *Test) one(cmd any, expectedServers int, retry bool) int { index := -1 for range ts.srvs { starts = (starts + 1) % len(ts.srvs) - var rf *Raft + var rf raftapi.Raft if ts.g.IsConnected(starts) { rf = ts.srvs[starts].raft } diff --git a/src/raftapi/raftapi.go b/src/raftapi/raftapi.go new file mode 100644 index 0000000..6a72898 --- /dev/null +++ b/src/raftapi/raftapi.go @@ -0,0 +1,40 @@ +package raftapi + +// The Raft interface +type Raft interface { + // Start agreement on a new log entry, and return the log index + // for that entry, the term, and whether the peer is the leader. + Start(command interface{}) (int, int, bool) + + // Ask a Raft for its current term, and whether it thinks it is + // leader + GetState() (int, bool) + + // For Snaphots (3D) + Snapshot(index int, snapshot []byte) + PersistBytes() int + + // For the tester to indicate to your code that is should cleanup + // any long-running go routines. + Kill() +} + +// As each Raft peer becomes aware that successive log entries are +// committed, the peer should send an ApplyMsg to the service (or +// tester) on the same server, via the applyCh passed to Make(). set +// CommandValid to true to indicate that the ApplyMsg contains a newly +// committed log entry. +// +// In Lab 3 you'll want to send other kinds of messages (e.g., +// snapshots) on the applyCh; at that point you can add fields to +// ApplyMsg, but set CommandValid to false for these other uses. +type ApplyMsg struct { + CommandValid bool + Command interface{} + CommandIndex int + + SnapshotValid bool + Snapshot []byte + SnapshotTerm int + SnapshotIndex int +} diff --git a/src/shardkv1/shardctrler/client.go b/src/shardkv1/shardctrler/client.go new file mode 100644 index 0000000..55ecaaa --- /dev/null +++ b/src/shardkv1/shardctrler/client.go @@ -0,0 +1,49 @@ +package shardctrler + +import ( + // "log" + "sync/atomic" + + "6.5840/kvsrv1/rpc" + "6.5840/tester1" +) + +type Clerk struct { + clnt *tester.Clnt + servers []string + deposed *int32 + // You will have to modify this struct. +} + +// The shard controller can use MakeClerk to make a clerk for the kvraft +// group with the servers `servers`. +func MakeClerk(clnt *tester.Clnt, servers []string, deposed *int32) *Clerk { + ck := &Clerk{clnt: clnt, servers: servers, deposed: deposed} + // You may add code here. + return ck +} + +func (ck *Clerk) isDeposed() bool { + z := atomic.LoadInt32(ck.deposed) + return z == 1 +} + +// You can reuse your kvraft Get +func (ck *Clerk) Get(key string) (string, rpc.Tversion, rpc.Err) { + args := rpc.GetArgs{} + args.Key = key + + // You'll have to add code here. + return "", 0, "" +} + +// You can reuse your kvraft Put +func (ck *Clerk) Put(key string, value string, version rpc.Tversion) rpc.Err { + args := rpc.PutArgs{} + args.Key = key + args.Value = value + args.Version = version + + // You'll have to add code here. + return "" +} diff --git a/src/tester1/clnts.go b/src/tester1/clnts.go index 47806c6..dbbdcca 100644 --- a/src/tester1/clnts.go +++ b/src/tester1/clnts.go @@ -2,7 +2,6 @@ package tester import ( //"log" - "os" "sync" "6.5840/labrpc" @@ -98,7 +97,7 @@ func (clnt *Clnt) remove() { defer clnt.mu.Unlock() for _, e := range clnt.ends { - os.Remove(e.name) + clnt.net.DeleteEnd(e.name) } } @@ -142,12 +141,15 @@ func (clnts *Clnts) cleanup() { for clnt, _ := range clnts.clerks { clnt.remove() } + clnts.clerks = nil } func (clnts *Clnts) DeleteClient(clnt *Clnt) { clnts.mu.Lock() defer clnts.mu.Unlock() - clnt.remove() - delete(clnts.clerks, clnt) + if _, ok := clnts.clerks[clnt]; ok { + clnt.remove() + delete(clnts.clerks, clnt) + } } diff --git a/src/tester1/tester_test.go b/src/tester1/tester_test.go new file mode 100644 index 0000000..fbc40b7 --- /dev/null +++ b/src/tester1/tester_test.go @@ -0,0 +1,84 @@ +package tester_test + +import ( + "testing" + + "6.5840/kvsrv1/rpc" + "6.5840/labrpc" + "6.5840/tester1" +) + +type Server struct { + n rpc.Tversion + killed bool +} + +func newSrv() *Server { + return &Server{} +} + +func (s *Server) Kill() { + s.killed = true +} + +func (s *Server) Get(args *rpc.GetArgs, reply *rpc.GetReply) { + s.n += 1 + reply.Version = s.n +} + +type Test struct { + t *testing.T + s *Server + *tester.Config + clnt *tester.Clnt + sn string +} + +func makeTest(t *testing.T, nsrv int) *Test { + ts := &Test{t: t, sn: tester.ServerName(tester.GRP0, 0)} + cfg := tester.MakeConfig(t, nsrv, true, ts.startServer) + ts.Config = cfg + ts.clnt = ts.Config.MakeClient() + return ts +} + +func (ts *Test) startServer(servers []*labrpc.ClientEnd, gid tester.Tgid, me int, persister *tester.Persister) []tester.IService { + ts.s = newSrv() + return []tester.IService{ts.s} +} + +func (ts *Test) cleanup() { + ts.Cleanup() +} + +func (ts *Test) oneRPC() bool { + args := rpc.GetArgs{"xxx"} + var reply rpc.GetReply + if ok := ts.clnt.Call(ts.sn, "Server.Get", &args, &reply); !ok { + return false + } + if reply.Version != ts.s.n { + ts.Fatalf("Wrong version") + } + return true +} + +func TestBasic(t *testing.T) { + ts := makeTest(t, 1) + defer ts.cleanup() + ts.oneRPC() +} + +func TestShutdownServer(t *testing.T) { + ts := makeTest(t, 1) + defer ts.cleanup() + + ts.oneRPC() + ts.Group(tester.GRP0).Shutdown() + if !ts.s.killed { + ts.Fatalf("Not killed") + } + if ok := ts.oneRPC(); ok { + ts.Fatalf("RPC succeeded") + } +}