From 2f72ad504b5abe01626126155e709ddbb83e5a8c Mon Sep 17 00:00:00 2001 From: Frans Kaashoek Date: Sat, 1 Feb 2025 08:16:49 -0500 Subject: [PATCH] update --- src/kvraft1/kvraft_test.go | 3 +- src/kvraft1/rsm/rsm.go | 5 +- src/kvraft1/rsm/server.go | 6 +- src/kvraft1/rsm/test.go | 8 +- src/kvraft1/test.go | 25 +- src/kvsrv1/server.go | 10 +- src/kvsrv1/test.go | 2 +- src/raft1/raft.go | 260 +++++ src/raft1/raft_test.go | 1209 ++++++++++++++++++++++++ src/raft1/server.go | 185 ++++ src/raft1/test.go | 266 ++++++ src/raft1/test.out | 164 ++++ src/raft1/util.go | 12 + src/shardkv1/shardcfg/shardcfg_test.go | 20 +- src/shardkv1/shardgrp/server.go | 10 +- src/shardkv1/test.go | 30 +- src/tester1/config.go | 14 +- src/tester1/group.go | 70 +- src/tester1/persister.go | 70 ++ src/tester1/srv.go | 28 +- 20 files changed, 2293 insertions(+), 104 deletions(-) create mode 100644 src/raft1/raft.go create mode 100644 src/raft1/raft_test.go create mode 100644 src/raft1/server.go create mode 100644 src/raft1/test.go create mode 100644 src/raft1/test.out create mode 100644 src/raft1/util.go create mode 100644 src/tester1/persister.go diff --git a/src/kvraft1/kvraft_test.go b/src/kvraft1/kvraft_test.go index 00dd95e..597b3b0 100644 --- a/src/kvraft1/kvraft_test.go +++ b/src/kvraft1/kvraft_test.go @@ -178,7 +178,8 @@ func TestOnePartition4A(t *testing.T) { ver0 := ts.PutAtLeastOnce(ck, "1", "13", rpc.Tversion(0), -1) - p1, p2 := ts.Group(Gid).MakePartition() + _, l := ts.Leader() + p1, p2 := ts.Group(Gid).MakePartition(l) ts.Group(Gid).Partition(p1, p2) ckp1 := ts.MakeClerkTo(p1) // connect ckp1 to p1 diff --git a/src/kvraft1/rsm/rsm.go b/src/kvraft1/rsm/rsm.go index eb3676f..f7d711f 100644 --- a/src/kvraft1/rsm/rsm.go +++ b/src/kvraft1/rsm/rsm.go @@ -5,7 +5,8 @@ import ( "6.5840/kvsrv1/rpc" "6.5840/labrpc" - "6.5840/raft" + "6.5840/raft1" + "6.5840/tester1" ) @@ -51,7 +52,7 @@ type RSM struct { // // MakeRSM() must return quickly, so it should start goroutines for // any long-running work. -func MakeRSM(servers []*labrpc.ClientEnd, me int, persister *raft.Persister, maxraftstate int, sm StateMachine) *RSM { +func MakeRSM(servers []*labrpc.ClientEnd, me int, persister *tester.Persister, maxraftstate int, sm StateMachine) *RSM { rsm := &RSM{ me: me, maxraftstate: maxraftstate, diff --git a/src/kvraft1/rsm/server.go b/src/kvraft1/rsm/server.go index 8695445..8d91f16 100644 --- a/src/kvraft1/rsm/server.go +++ b/src/kvraft1/rsm/server.go @@ -6,9 +6,9 @@ import ( "sync" "6.5840/labgob" - // "6.5840/kvtest1" "6.5840/labrpc" - "6.5840/raft" + "6.5840/raft1" + "6.5840/tester1" ) type Inc struct { @@ -26,7 +26,7 @@ type rsmSrv struct { counter int } -func makeRsmSrv(ts *Test, srv int, ends []*labrpc.ClientEnd, persister *raft.Persister, snapshot bool) *rsmSrv { +func makeRsmSrv(ts *Test, srv int, ends []*labrpc.ClientEnd, persister *tester.Persister, snapshot bool) *rsmSrv { //log.Printf("mksrv %d", srv) labgob.Register(Op{}) labgob.Register(Inc{}) diff --git a/src/kvraft1/rsm/test.go b/src/kvraft1/rsm/test.go index 340525d..0f6040f 100644 --- a/src/kvraft1/rsm/test.go +++ b/src/kvraft1/rsm/test.go @@ -7,7 +7,7 @@ import ( "6.5840/kvsrv1/rpc" "6.5840/labrpc" - "6.5840/raft" + //"6.5840/raft1" "6.5840/tester1" ) @@ -31,7 +31,7 @@ func makeTest(t *testing.T, maxraftstate int) *Test { maxraftstate: maxraftstate, srvs: make([]*rsmSrv, NSRV), } - ts.Config = tester.MakeConfig(t, NSRV, true, maxraftstate, ts.mksrv) + ts.Config = tester.MakeConfig(t, NSRV, true, ts.mksrv) ts.g = ts.Group(tester.GRP0) return ts } @@ -42,10 +42,10 @@ func (ts *Test) cleanup() { ts.CheckTimeout() } -func (ts *Test) mksrv(ends []*labrpc.ClientEnd, grp tester.Tgid, srv int, persister *raft.Persister, maxraftstate int) tester.IKVServer { +func (ts *Test) mksrv(ends []*labrpc.ClientEnd, grp tester.Tgid, srv int, persister *tester.Persister) []tester.IService { s := makeRsmSrv(ts, srv, ends, persister, false) ts.srvs[srv] = s - return s + return []tester.IService{s.rsm.Raft()} } func (ts *Test) one() *Rep { diff --git a/src/kvraft1/test.go b/src/kvraft1/test.go index 09c68d1..ef1dbfa 100644 --- a/src/kvraft1/test.go +++ b/src/kvraft1/test.go @@ -4,6 +4,8 @@ import ( "testing" "6.5840/kvtest1" + "6.5840/labrpc" + "6.5840/raft1" "6.5840/tester1" ) @@ -22,7 +24,6 @@ type Test struct { const Gid = tester.GRP0 func MakeTest(t *testing.T, part string, nclients, nservers int, reliable bool, crash bool, partitions bool, maxraftstate int, randomkeys bool) *Test { - cfg := tester.MakeConfig(t, nservers, reliable, maxraftstate, StartKVServer) ts := &Test{ t: t, part: part, @@ -33,17 +34,39 @@ func MakeTest(t *testing.T, part string, nclients, nservers int, reliable bool, maxraftstate: maxraftstate, randomkeys: randomkeys, } + cfg := tester.MakeConfig(t, nservers, reliable, ts.StartKVServer) ts.Test = kvtest.MakeTest(t, cfg, randomkeys, ts) ts.Begin(ts.makeTitle()) return ts } +func (ts *Test) StartKVServer(servers []*labrpc.ClientEnd, gid tester.Tgid, me int, persister *tester.Persister) []tester.IService { + return StartKVServer(servers, gid, me, persister, ts.maxraftstate) + +} + func (ts *Test) MakeClerk() kvtest.IKVClerk { clnt := ts.Config.MakeClient() ck := MakeClerk(clnt, ts.Group(Gid).SrvNames()) return &kvtest.TestClerk{ck, clnt} } +func (ts *Test) Leader() (bool, int) { + for i, ss := range ts.Group(Gid).Services() { + for _, s := range ss { + switch r := s.(type) { + case *raft.Raft: + _, is_leader := r.GetState() + if is_leader { + return true, i + } + default: + } + } + } + return false, 0 +} + func (ts *Test) DeleteClerk(ck kvtest.IKVClerk) { tck := ck.(*kvtest.TestClerk) ts.DeleteClient(tck.Clnt) diff --git a/src/kvsrv1/server.go b/src/kvsrv1/server.go index 3937fbf..ae056e1 100644 --- a/src/kvsrv1/server.go +++ b/src/kvsrv1/server.go @@ -6,7 +6,6 @@ import ( "6.5840/kvsrv1/rpc" "6.5840/labrpc" - "6.5840/raft" "6.5840/tester1" ) @@ -50,14 +49,9 @@ func (kv *KVServer) Put(args *rpc.PutArgs, reply *rpc.PutReply) { func (kv *KVServer) Kill() { } -// You can ignore for this lab -func (kv *KVServer) Raft() *raft.Raft { - return nil -} - // You can ignore all arguments; they are for replicated KVservers in lab 4 -func StartKVServer(ends []*labrpc.ClientEnd, gid tester.Tgid, srv int, persister *raft.Persister, maxraftstate int) tester.IKVServer { +func StartKVServer(ends []*labrpc.ClientEnd, gid tester.Tgid, srv int, persister *tester.Persister) []tester.IService { kv := MakeKVServer() - return kv + return []tester.IService{kv} } diff --git a/src/kvsrv1/test.go b/src/kvsrv1/test.go index 6fddd1a..bd8269b 100644 --- a/src/kvsrv1/test.go +++ b/src/kvsrv1/test.go @@ -15,7 +15,7 @@ type TestKV struct { } func MakeTestKV(t *testing.T, reliable bool) *TestKV { - cfg := tester.MakeConfig(t, 1, reliable, -1, StartKVServer) + cfg := tester.MakeConfig(t, 1, reliable, StartKVServer) ts := &TestKV{ t: t, reliable: reliable, diff --git a/src/raft1/raft.go b/src/raft1/raft.go new file mode 100644 index 0000000..6c8e39e --- /dev/null +++ b/src/raft1/raft.go @@ -0,0 +1,260 @@ +package raft + +// +// this is an outline of the API that raft must expose to +// the service (or tester). see comments below for +// each of these functions for more details. +// +// rf = Make(...) +// create a new Raft server. +// rf.Start(command interface{}) (index, term, isleader) +// start agreement on a new log entry +// rf.GetState() (term, isLeader) +// ask a Raft for its current term, and whether it thinks it is leader +// ApplyMsg +// each time a new entry is committed to the log, each Raft peer +// should send an ApplyMsg to the service (or tester) +// in the same server. +// + +import ( + // "bytes" + "math/rand" + "sync" + "sync/atomic" + "time" + + // "6.5840/labgob" + "6.5840/labrpc" + "6.5840/tester1" +) + + +// as each Raft peer becomes aware that successive log entries are +// committed, the peer should send an ApplyMsg to the service (or +// tester) on the same server, via the applyCh passed to Make(). set +// CommandValid to true to indicate that the ApplyMsg contains a newly +// committed log entry. +// +// in part 3D you'll want to send other kinds of messages (e.g., +// snapshots) on the applyCh, but set CommandValid to false for these +// other uses. +type ApplyMsg struct { + CommandValid bool + Command interface{} + CommandIndex int + + // For 3D: + SnapshotValid bool + Snapshot []byte + SnapshotTerm int + SnapshotIndex int +} + +// A Go object implementing a single Raft peer. +type Raft struct { + mu sync.Mutex // Lock to protect shared access to this peer's state + peers []*labrpc.ClientEnd // RPC end points of all peers + persister *tester.Persister // Object to hold this peer's persisted state + me int // this peer's index into peers[] + dead int32 // set by Kill() + + // Your data here (3A, 3B, 3C). + // Look at the paper's Figure 2 for a description of what + // state a Raft server must maintain. + +} + +// return currentTerm and whether this server +// believes it is the leader. +func (rf *Raft) GetState() (int, bool) { + + var term int + var isleader bool + // Your code here (3A). + return term, isleader +} + +// save Raft's persistent state to stable storage, +// where it can later be retrieved after a crash and restart. +// see paper's Figure 2 for a description of what should be persistent. +// before you've implemented snapshots, you should pass nil as the +// second argument to persister.Save(). +// after you've implemented snapshots, pass the current snapshot +// (or nil if there's not yet a snapshot). +func (rf *Raft) persist() { + // Your code here (3C). + // Example: + // w := new(bytes.Buffer) + // e := labgob.NewEncoder(w) + // e.Encode(rf.xxx) + // e.Encode(rf.yyy) + // raftstate := w.Bytes() + // rf.persister.Save(raftstate, nil) +} + + +// restore previously persisted state. +func (rf *Raft) readPersist(data []byte) { + if data == nil || len(data) < 1 { // bootstrap without any state? + return + } + // Your code here (3C). + // Example: + // r := bytes.NewBuffer(data) + // d := labgob.NewDecoder(r) + // var xxx + // var yyy + // if d.Decode(&xxx) != nil || + // d.Decode(&yyy) != nil { + // error... + // } else { + // rf.xxx = xxx + // rf.yyy = yyy + // } +} + + +// the service says it has created a snapshot that has +// all info up to and including index. this means the +// service no longer needs the log through (and including) +// that index. Raft should now trim its log as much as possible. +func (rf *Raft) Snapshot(index int, snapshot []byte) { + // Your code here (3D). + +} + + +// example RequestVote RPC arguments structure. +// field names must start with capital letters! +type RequestVoteArgs struct { + // Your data here (3A, 3B). +} + +// example RequestVote RPC reply structure. +// field names must start with capital letters! +type RequestVoteReply struct { + // Your data here (3A). +} + +// example RequestVote RPC handler. +func (rf *Raft) RequestVote(args *RequestVoteArgs, reply *RequestVoteReply) { + // Your code here (3A, 3B). +} + +// example code to send a RequestVote RPC to a server. +// server is the index of the target server in rf.peers[]. +// expects RPC arguments in args. +// fills in *reply with RPC reply, so caller should +// pass &reply. +// the types of the args and reply passed to Call() must be +// the same as the types of the arguments declared in the +// handler function (including whether they are pointers). +// +// The labrpc package simulates a lossy network, in which servers +// may be unreachable, and in which requests and replies may be lost. +// Call() sends a request and waits for a reply. If a reply arrives +// within a timeout interval, Call() returns true; otherwise +// Call() returns false. Thus Call() may not return for a while. +// A false return can be caused by a dead server, a live server that +// can't be reached, a lost request, or a lost reply. +// +// Call() is guaranteed to return (perhaps after a delay) *except* if the +// handler function on the server side does not return. Thus there +// is no need to implement your own timeouts around Call(). +// +// look at the comments in ../labrpc/labrpc.go for more details. +// +// if you're having trouble getting RPC to work, check that you've +// capitalized all field names in structs passed over RPC, and +// that the caller passes the address of the reply struct with &, not +// the struct itself. +func (rf *Raft) sendRequestVote(server int, args *RequestVoteArgs, reply *RequestVoteReply) bool { + ok := rf.peers[server].Call("Raft.RequestVote", args, reply) + return ok +} + + +// the service using Raft (e.g. a k/v server) wants to start +// agreement on the next command to be appended to Raft's log. if this +// server isn't the leader, returns false. otherwise start the +// agreement and return immediately. there is no guarantee that this +// command will ever be committed to the Raft log, since the leader +// may fail or lose an election. even if the Raft instance has been killed, +// this function should return gracefully. +// +// the first return value is the index that the command will appear at +// if it's ever committed. the second return value is the current +// term. the third return value is true if this server believes it is +// the leader. +func (rf *Raft) Start(command interface{}) (int, int, bool) { + index := -1 + term := -1 + isLeader := true + + // Your code here (3B). + + + return index, term, isLeader +} + +// the tester doesn't halt goroutines created by Raft after each test, +// but it does call the Kill() method. your code can use killed() to +// check whether Kill() has been called. the use of atomic avoids the +// need for a lock. +// +// the issue is that long-running goroutines use memory and may chew +// up CPU time, perhaps causing later tests to fail and generating +// confusing debug output. any goroutine with a long-running loop +// should call killed() to check whether it should stop. +func (rf *Raft) Kill() { + atomic.StoreInt32(&rf.dead, 1) + // Your code here, if desired. +} + +func (rf *Raft) killed() bool { + z := atomic.LoadInt32(&rf.dead) + return z == 1 +} + +func (rf *Raft) ticker() { + for rf.killed() == false { + + // Your code here (3A) + // Check if a leader election should be started. + + + // pause for a random amount of time between 50 and 350 + // milliseconds. + ms := 50 + (rand.Int63() % 300) + time.Sleep(time.Duration(ms) * time.Millisecond) + } +} + +// the service or tester wants to create a Raft server. the ports +// of all the Raft servers (including this one) are in peers[]. this +// server's port is peers[me]. all the servers' peers[] arrays +// have the same order. persister is a place for this server to +// save its persistent state, and also initially holds the most +// recent saved state, if any. applyCh is a channel on which the +// tester or service expects Raft to send ApplyMsg messages. +// Make() must return quickly, so it should start goroutines +// for any long-running work. +func Make(peers []*labrpc.ClientEnd, me int, + persister *tester.Persister, applyCh chan ApplyMsg) *Raft { + rf := &Raft{} + rf.peers = peers + rf.persister = persister + rf.me = me + + // Your initialization code here (3A, 3B, 3C). + + // initialize from state persisted before a crash + rf.readPersist(persister.ReadRaftState()) + + // start ticker goroutine to start elections + go rf.ticker() + + + return rf +} diff --git a/src/raft1/raft_test.go b/src/raft1/raft_test.go new file mode 100644 index 0000000..df573ec --- /dev/null +++ b/src/raft1/raft_test.go @@ -0,0 +1,1209 @@ +package raft + +// +// Raft tests. +// +// we will use the original raft_test.go to test your code for grading. +// so, while you can modify this code to help you debug, please +// test with the original before submitting. +// + +import ( + "fmt" + // "log" + "math/rand" + "sync" + "sync/atomic" + "testing" + "time" + + "6.5840/tester1" +) + +// The tester generously allows solutions to complete elections in one second +// (much more than the paper's range of timeouts). +const RaftElectionTimeout = 1000 * time.Millisecond + +func TestInitialElection3A(t *testing.T) { + servers := 3 + ts := makeTest(t, servers, true, false) + defer ts.cleanup() + + ts.Begin("Test (3A): initial election") + + // is a leader elected? + ts.checkOneLeader() + + // sleep a bit to avoid racing with followers learning of the + // election, then check that all peers agree on the term. + time.Sleep(50 * time.Millisecond) + term1 := ts.checkTerms() + if term1 < 1 { + ts.t.Fatalf("term is %v, but should be at least 1", term1) + } + + // does the leader+term stay the same if there is no network failure? + time.Sleep(2 * RaftElectionTimeout) + term2 := ts.checkTerms() + if term1 != term2 { + fmt.Printf("warning: term changed even though there were no failures") + } + + // there should still be a leader. + ts.checkOneLeader() +} + +func TestReElection3A(t *testing.T) { + servers := 3 + ts := makeTest(t, servers, true, false) + defer ts.cleanup() + + ts.Begin("Test (3A): election after network failure") + + leader1 := ts.checkOneLeader() + + // if the leader disconnects, a new one should be elected. + ts.g.DisconnectAll(leader1) + ts.checkOneLeader() + + // if the old leader rejoins, that shouldn't + // disturb the new leader. and the old leader + // should switch to follower. + ts.g.ConnectOne(leader1) + leader2 := ts.checkOneLeader() + + // if there's no quorum, no new leader should + // be elected. + ts.g.DisconnectAll(leader2) + ts.g.DisconnectAll((leader2 + 1) % servers) + time.Sleep(2 * RaftElectionTimeout) + + // check that the one connected server + // does not think it is the leader. + ts.checkNoLeader() + + // if a quorum arises, it should elect a leader. + ts.g.ConnectOne((leader2 + 1) % servers) + ts.checkOneLeader() + + // re-join of last node shouldn't prevent leader from existing. + ts.g.ConnectOne(leader2) + ts.checkOneLeader() +} + +func TestManyElections3A(t *testing.T) { + servers := 7 + ts := makeTest(t, servers, true, false) + defer ts.cleanup() + + ts.Begin("Test (3A): multiple elections") + + ts.checkOneLeader() + + iters := 10 + for ii := 1; ii < iters; ii++ { + // disconnect three nodes + i1 := rand.Int() % servers + i2 := rand.Int() % servers + i3 := rand.Int() % servers + ts.g.DisconnectAll(i1) + ts.g.DisconnectAll(i2) + ts.g.DisconnectAll(i3) + + // either the current leader should still be alive, + // or the remaining four should elect a new one. + ts.checkOneLeader() + + ts.g.ConnectOne(i1) + ts.g.ConnectOne(i2) + ts.g.ConnectOne(i3) + } + ts.checkOneLeader() +} + +func TestBasicAgree3B(t *testing.T) { + servers := 3 + ts := makeTest(t, servers, true, false) + defer ts.cleanup() + + ts.Begin("Test (3B): basic agreement") + + iters := 3 + for index := 1; index < iters+1; index++ { + nd, _ := ts.nCommitted(index) + if nd > 0 { + t.Fatalf("some have committed before Start()") + } + + xindex := ts.one(index*100, servers, false) + if xindex != index { + t.Fatalf("got index %v but expected %v", xindex, index) + } + } +} + +// check, based on counting bytes of RPCs, that +// each command is sent to each peer just once. +func TestRPCBytes3B(t *testing.T) { + servers := 3 + ts := makeTest(t, servers, true, false) + defer ts.cleanup() + + ts.Begin("Test (3B): RPC byte count") + + ts.one(99, servers, false) + bytes0 := ts.BytesTotal() + + iters := 10 + var sent int64 = 0 + for index := 2; index < iters+2; index++ { + cmd := tester.Randstring(5000) + xindex := ts.one(cmd, servers, false) + if xindex != index { + t.Fatalf("got index %v but expected %v", xindex, index) + } + sent += int64(len(cmd)) + } + + bytes1 := ts.BytesTotal() + got := bytes1 - bytes0 + expected := int64(servers) * sent + if got > expected+50000 { + t.Fatalf("too many RPC bytes; got %v, expected %v", got, expected) + } + +} + +// test just failure of followers. +func TestFollowerFailure3B(t *testing.T) { + servers := 3 + ts := makeTest(t, servers, true, false) + defer ts.cleanup() + + ts.Begin("Test (3B): test progressive failure of followers") + + ts.one(101, servers, false) + + // disconnect one follower from the network. + leader1 := ts.checkOneLeader() + ts.g.DisconnectAll((leader1 + 1) % servers) + + // the leader and remaining follower should be + // able to agree despite the disconnected follower. + ts.one(102, servers-1, false) + time.Sleep(RaftElectionTimeout) + ts.one(103, servers-1, false) + + // disconnect the remaining follower + leader2 := ts.checkOneLeader() + ts.g.DisconnectAll((leader2 + 1) % servers) + ts.g.DisconnectAll((leader2 + 2) % servers) + + // submit a command. + index, _, ok := ts.srvs[leader2].Raft().Start(104) + if ok != true { + t.Fatalf("leader rejected Start()") + } + if index != 4 { + t.Fatalf("expected index 4, got %v", index) + } + + time.Sleep(2 * RaftElectionTimeout) + + // check that command 104 did not commit. + n, _ := ts.nCommitted(index) + if n > 0 { + t.Fatalf("%v committed but no majority", n) + } + +} + +// test just failure of leaders. +func TestLeaderFailure3B(t *testing.T) { + servers := 3 + ts := makeTest(t, servers, true, false) + defer ts.cleanup() + + ts.Begin("Test (3B): test failure of leaders") + + ts.one(101, servers, false) + + // disconnect the first leader. + leader1 := ts.checkOneLeader() + ts.g.DisconnectAll(leader1) + + // the remaining followers should elect + // a new leader. + ts.one(102, servers-1, false) + time.Sleep(RaftElectionTimeout) + ts.one(103, servers-1, false) + + // disconnect the new leader. + leader2 := ts.checkOneLeader() + ts.g.DisconnectAll(leader2) + + // submit a command to each server. + for i := 0; i < servers; i++ { + ts.srvs[i].Raft().Start(104) + } + + time.Sleep(2 * RaftElectionTimeout) + + // check that command 104 did not commit. + n, _ := ts.nCommitted(4) + if n > 0 { + t.Fatalf("%v committed but no majority", n) + } + +} + +// test that a follower participates after +// disconnect and re-connect. +func TestFailAgree3B(t *testing.T) { + servers := 3 + ts := makeTest(t, servers, true, false) + defer ts.cleanup() + + ts.Begin("Test (3B): agreement after follower reconnects") + + ts.one(101, servers, false) + + // disconnect one follower from the network. + leader := ts.checkOneLeader() + ts.g.DisconnectAll((leader + 1) % servers) + + // the leader and remaining follower should be + // able to agree despite the disconnected follower. + ts.one(102, servers-1, false) + ts.one(103, servers-1, false) + time.Sleep(RaftElectionTimeout) + ts.one(104, servers-1, false) + ts.one(105, servers-1, false) + + // re-connect + ts.g.ConnectOne((leader + 1) % servers) + + // the full set of servers should preserve + // previous agreements, and be able to agree + // on new commands. + ts.one(106, servers, true) + time.Sleep(RaftElectionTimeout) + ts.one(107, servers, true) + +} + +func TestFailNoAgree3B(t *testing.T) { + servers := 5 + ts := makeTest(t, servers, true, false) + defer ts.cleanup() + + ts.Begin("Test (3B): no agreement if too many followers disconnect") + + ts.one(10, servers, false) + + // 3 of 5 followers disconnect + leader := ts.checkOneLeader() + ts.g.DisconnectAll((leader + 1) % servers) + ts.g.DisconnectAll((leader + 2) % servers) + ts.g.DisconnectAll((leader + 3) % servers) + + index, _, ok := ts.srvs[leader].Raft().Start(20) + if ok != true { + t.Fatalf("leader rejected Start()") + } + if index != 2 { + t.Fatalf("expected index 2, got %v", index) + } + + time.Sleep(2 * RaftElectionTimeout) + + n, _ := ts.nCommitted(index) + if n > 0 { + t.Fatalf("%v committed but no majority", n) + } + + // repair + ts.g.ConnectOne((leader + 1) % servers) + ts.g.ConnectOne((leader + 2) % servers) + ts.g.ConnectOne((leader + 3) % servers) + + // the disconnected majority may have chosen a leader from + // among their own ranks, forgetting index 2. + leader2 := ts.checkOneLeader() + index2, _, ok2 := ts.srvs[leader2].Raft().Start(30) + if ok2 == false { + t.Fatalf("leader2 rejected Start()") + } + if index2 < 2 || index2 > 3 { + t.Fatalf("unexpected index %v", index2) + } + + ts.one(1000, servers, true) + +} + +func TestConcurrentStarts3B(t *testing.T) { + servers := 3 + ts := makeTest(t, servers, true, false) + defer ts.cleanup() + + ts.Begin("Test (3B): concurrent Start()s") + + var success bool +loop: + for try := 0; try < 5; try++ { + if try > 0 { + // give solution some time to settle + time.Sleep(3 * time.Second) + } + + leader := ts.checkOneLeader() + _, term, ok := ts.srvs[leader].Raft().Start(1) + if !ok { + // leader moved on really quickly + continue + } + + iters := 5 + var wg sync.WaitGroup + is := make(chan int, iters) + for ii := 0; ii < iters; ii++ { + wg.Add(1) + go func(i int) { + defer wg.Done() + i, term1, ok := ts.srvs[leader].Raft().Start(100 + i) + if term1 != term { + return + } + if ok != true { + return + } + is <- i + }(ii) + } + + wg.Wait() + close(is) + + for j := 0; j < servers; j++ { + if t, _ := ts.srvs[j].Raft().GetState(); t != term { + // term changed -- can't expect low RPC counts + continue loop + } + } + + failed := false + cmds := []int{} + for index := range is { + cmd := ts.wait(index, servers, term) + if ix, ok := cmd.(int); ok { + if ix == -1 { + // peers have moved on to later terms + // so we can't expect all Start()s to + // have succeeded + failed = true + break + } + cmds = append(cmds, ix) + } else { + t.Fatalf("value %v is not an int", cmd) + } + } + + if failed { + // avoid leaking goroutines + go func() { + for range is { + } + }() + continue + } + + for ii := 0; ii < iters; ii++ { + x := 100 + ii + ok := false + for j := 0; j < len(cmds); j++ { + if cmds[j] == x { + ok = true + } + } + if ok == false { + t.Fatalf("cmd %v missing in %v", x, cmds) + } + } + + success = true + break + } + + if !success { + t.Fatalf("term changed too often") + } + +} + +func TestRejoin3B(t *testing.T) { + servers := 3 + ts := makeTest(t, servers, true, false) + defer ts.cleanup() + + ts.Begin("Test (3B): rejoin of partitioned leader") + + ts.one(101, servers, true) + + // leader network failure + leader1 := ts.checkOneLeader() + ts.g.DisconnectAll(leader1) + + // make old leader try to agree on some entries + ts.srvs[leader1].Raft().Start(102) + ts.srvs[leader1].Raft().Start(103) + ts.srvs[leader1].Raft().Start(104) + + // new leader commits, also for index=2 + ts.one(103, 2, true) + + // new leader network failure + leader2 := ts.checkOneLeader() + ts.g.DisconnectAll(leader2) + + // old leader connected again + ts.g.ConnectOne(leader1) + + ts.one(104, 2, true) + + // all together now + ts.g.ConnectOne(leader2) + + ts.one(105, servers, true) + +} + +func TestBackup3B(t *testing.T) { + servers := 5 + ts := makeTest(t, servers, true, false) + defer ts.cleanup() + + ts.Begin("Test (3B): leader backs up quickly over incorrect follower logs") + + ts.one(rand.Int(), servers, true) + + // put leader and one follower in a partition + leader1 := ts.checkOneLeader() + ts.g.DisconnectAll((leader1 + 2) % servers) + ts.g.DisconnectAll((leader1 + 3) % servers) + ts.g.DisconnectAll((leader1 + 4) % servers) + + // submit lots of commands that won't commit + for i := 0; i < 50; i++ { + ts.srvs[leader1].Raft().Start(rand.Int()) + } + + time.Sleep(RaftElectionTimeout / 2) + + ts.g.DisconnectAll((leader1 + 0) % servers) + ts.g.DisconnectAll((leader1 + 1) % servers) + + // allow other partition to recover + ts.g.ConnectOne((leader1 + 2) % servers) + ts.g.ConnectOne((leader1 + 3) % servers) + ts.g.ConnectOne((leader1 + 4) % servers) + + // lots of successful commands to new group. + for i := 0; i < 50; i++ { + ts.one(rand.Int(), 3, true) + } + + // now another partitioned leader and one follower + leader2 := ts.checkOneLeader() + other := (leader1 + 2) % servers + if leader2 == other { + other = (leader2 + 1) % servers + } + ts.g.DisconnectAll(other) + + // lots more commands that won't commit + for i := 0; i < 50; i++ { + ts.srvs[leader2].Raft().Start(rand.Int()) + } + + time.Sleep(RaftElectionTimeout / 2) + + // bring original leader back to life, + for i := 0; i < servers; i++ { + ts.g.DisconnectAll(i) + } + ts.g.ConnectOne((leader1 + 0) % servers) + ts.g.ConnectOne((leader1 + 1) % servers) + ts.g.ConnectOne(other) + + // lots of successful commands to new group. + for i := 0; i < 50; i++ { + ts.one(rand.Int(), 3, true) + } + + // now everyone + for i := 0; i < servers; i++ { + ts.g.ConnectOne(i) + } + ts.one(rand.Int(), servers, true) +} + +func TestCount3B(t *testing.T) { + servers := 3 + ts := makeTest(t, servers, true, false) + defer ts.cleanup() + + ts.Begin("Test (3B): RPC counts aren't too high") + + rpcs := func() (n int) { + for j := 0; j < servers; j++ { + n += ts.g.RpcCount(j) + } + return + } + + leader := ts.checkOneLeader() + + total1 := rpcs() + + if total1 > 30 || total1 < 1 { + t.Fatalf("too many or few RPCs (%v) to elect initial leader\n", total1) + } + + var total2 int + var success bool +loop: + for try := 0; try < 5; try++ { + if try > 0 { + // give solution some time to settle + time.Sleep(3 * time.Second) + } + + leader = ts.checkOneLeader() + total1 = rpcs() + + iters := 10 + starti, term, ok := ts.srvs[leader].Raft().Start(1) + if !ok { + // leader moved on really quickly + continue + } + cmds := []int{} + for i := 1; i < iters+2; i++ { + x := int(rand.Int31()) + cmds = append(cmds, x) + index1, term1, ok := ts.srvs[leader].Raft().Start(x) + if term1 != term { + // Term changed while starting + continue loop + } + if !ok { + // No longer the leader, so term has changed + continue loop + } + if starti+i != index1 { + t.Fatalf("Start() failed") + } + } + + for i := 1; i < iters+1; i++ { + cmd := ts.wait(starti+i, servers, term) + if ix, ok := cmd.(int); ok == false || ix != cmds[i-1] { + if ix == -1 { + // term changed -- try again + continue loop + } + t.Fatalf("wrong value %v committed for index %v; expected %v\n", cmd, starti+i, cmds) + } + } + + failed := false + total2 = 0 + for j := 0; j < servers; j++ { + if t, _ := ts.srvs[j].Raft().GetState(); t != term { + // term changed -- can't expect low RPC counts + // need to keep going to update total2 + failed = true + } + total2 += ts.g.RpcCount(j) + } + + if failed { + continue loop + } + + if total2-total1 > (iters+1+3)*3 { + t.Fatalf("too many RPCs (%v) for %v entries\n", total2-total1, iters) + } + + success = true + break + } + + if !success { + t.Fatalf("term changed too often") + } + + time.Sleep(RaftElectionTimeout) + + total3 := 0 + for j := 0; j < servers; j++ { + total3 += ts.g.RpcCount(j) + } + + if total3-total2 > 3*20 { + t.Fatalf("too many RPCs (%v) for 1 second of idleness\n", total3-total2) + } + +} + +func TestPersist13C(t *testing.T) { + servers := 3 + ts := makeTest(t, servers, true, false) + defer ts.cleanup() + + ts.Begin("Test (3C): basic persistence") + + ts.one(11, servers, true) + + ts.g.Shutdown() + ts.g.StartServers() + + ts.one(12, servers, true) + + leader1 := ts.checkOneLeader() + ts.g.ShutdownServer(leader1) + ts.restart(leader1) + + ts.one(13, servers, true) + + leader2 := ts.checkOneLeader() + ts.g.ShutdownServer(leader2) + + ts.one(14, servers-1, true) + + ts.restart(leader2) + + ts.wait(4, servers, -1) // wait for leader2 to join before killing i3 + + i3 := (ts.checkOneLeader() + 1) % servers + ts.g.ShutdownServer(i3) + + ts.one(15, servers-1, true) + + ts.restart(i3) + + ts.one(16, servers, true) +} + +func TestPersist23C(t *testing.T) { + servers := 5 + ts := makeTest(t, servers, true, false) + defer ts.cleanup() + + ts.Begin("Test (3C): more persistence") + + index := 1 + for iters := 0; iters < 5; iters++ { + ts.one(10+index, servers, true) + index++ + + leader1 := ts.checkOneLeader() + + ts.g.ShutdownServer((leader1 + 1) % servers) + ts.g.ShutdownServer((leader1 + 2) % servers) + + ts.one(10+index, servers-2, true) + index++ + + ts.g.ShutdownServer((leader1 + 0) % servers) + ts.g.ShutdownServer((leader1 + 3) % servers) + ts.g.ShutdownServer((leader1 + 4) % servers) + + ts.restart((leader1 + 1) % servers) + ts.restart((leader1 + 2) % servers) + + time.Sleep(RaftElectionTimeout) + + ts.restart((leader1 + 3) % servers) + + ts.one(10+index, servers-2, true) + index++ + + ts.restart((leader1 + 4) % servers) + ts.restart((leader1 + 0) % servers) + } + + ts.one(1000, servers, true) +} + +func TestPersist33C(t *testing.T) { + servers := 3 + ts := makeTest(t, servers, true, false) + defer ts.cleanup() + + ts.Begin("Test (3C): partitioned leader and one follower crash, leader restarts") + + ts.one(101, 3, true) + + leader := ts.checkOneLeader() + ts.g.DisconnectAll((leader + 2) % servers) + + ts.one(102, 2, true) + + ts.g.ShutdownServer((leader + 0) % servers) + ts.g.ShutdownServer((leader + 1) % servers) + ts.restart((leader + 2) % servers) + ts.restart((leader + 0) % servers) + + ts.one(103, 2, true) + + ts.restart((leader + 1) % servers) + + ts.one(104, servers, true) +} + +// Test the scenarios described in Figure 8 of the extended Raft paper. Each +// iteration asks a leader, if there is one, to insert a command in the Raft +// log. If there is a leader, that leader will fail quickly with a high +// probability (perhaps without committing the command), or crash after a while +// with low probability (most likey committing the command). If the number of +// alive servers isn't enough to form a majority, perhaps start a new server. +// The leader in a new term may try to finish replicating log entries that +// haven't been committed yet. +func TestFigure83C(t *testing.T) { + servers := 5 + ts := makeTest(t, servers, true, false) + defer ts.cleanup() + + ts.Begin("Test (3C): Figure 8") + + ts.one(rand.Int(), 1, true) + + nup := servers + for iters := 0; iters < 1000; iters++ { + leader := -1 + for i := 0; i < servers; i++ { + if ts.srvs[i].Raft() != nil { + _, _, ok := ts.srvs[i].Raft().Start(rand.Int()) + if ok { + leader = i + } + } + } + + if (rand.Int() % 1000) < 100 { + ms := rand.Int63() % (int64(RaftElectionTimeout/time.Millisecond) / 2) + time.Sleep(time.Duration(ms) * time.Millisecond) + } else { + ms := (rand.Int63() % 13) + time.Sleep(time.Duration(ms) * time.Millisecond) + } + + if leader != -1 { + ts.g.ShutdownServer(leader) + nup -= 1 + } + + if nup < 3 { + s := rand.Int() % servers + if ts.srvs[s].Raft() == nil { + ts.restart(s) + nup += 1 + } + } + } + + for i := 0; i < servers; i++ { + if ts.srvs[i].Raft() == nil { + ts.restart(i) + } + } + + ts.one(rand.Int(), servers, true) + +} + +func TestUnreliableAgree3C(t *testing.T) { + servers := 5 + ts := makeTest(t, servers, false, false) + defer ts.cleanup() + + ts.Begin("Test (3C): unreliable agreement") + + var wg sync.WaitGroup + + for iters := 1; iters < 50; iters++ { + for j := 0; j < 4; j++ { + wg.Add(1) + go func(iters, j int) { + defer wg.Done() + ts.one((100*iters)+j, 1, true) + }(iters, j) + } + ts.one(iters, 1, true) + } + + ts.SetReliable(true) + + wg.Wait() + + ts.one(100, servers, true) + +} + +func TestFigure8Unreliable3C(t *testing.T) { + servers := 5 + ts := makeTest(t, servers, false, false) + defer ts.cleanup() + + ts.Begin("Test (3C): Figure 8 (unreliable)") + + ts.one(rand.Int()%10000, 1, true) + + nup := servers + for iters := 0; iters < 1000; iters++ { + if iters == 200 { + ts.SetLongReordering(true) + } + leader := -1 + for i := 0; i < servers; i++ { + _, _, ok := ts.srvs[i].Raft().Start(rand.Int() % 10000) + if ok && ts.g.IsConnected(i) { + leader = i + } + } + + if (rand.Int() % 1000) < 100 { + ms := rand.Int63() % (int64(RaftElectionTimeout/time.Millisecond) / 2) + time.Sleep(time.Duration(ms) * time.Millisecond) + } else { + ms := (rand.Int63() % 13) + time.Sleep(time.Duration(ms) * time.Millisecond) + } + + if leader != -1 && (rand.Int()%1000) < int(RaftElectionTimeout/time.Millisecond)/2 { + ts.g.DisconnectAll(leader) + nup -= 1 + } + + if nup < 3 { + s := rand.Int() % servers + if !ts.g.IsConnected(s) { + ts.g.ConnectOne(s) + nup += 1 + } + } + } + + for i := 0; i < servers; i++ { + if !ts.g.IsConnected(i) { + ts.g.ConnectOne(i) + } + } + + ts.one(rand.Int()%10000, servers, true) + +} + +func internalChurn(t *testing.T, reliable bool) { + + servers := 5 + ts := makeTest(t, servers, reliable, false) + defer ts.cleanup() + + if ts.IsReliable() { + ts.Begin("Test (3C): churn") + } else { + ts.Begin("Test (3C): unreliable churn") + } + + stop := int32(0) + + // create concurrent clients + cfn := func(me int, ch chan []int) { + var ret []int + ret = nil + defer func() { ch <- ret }() + values := []int{} + for atomic.LoadInt32(&stop) == 0 { + x := rand.Int() + index := -1 + ok := false + for i := 0; i < servers; i++ { + // try them all, maybe one of them is a leader + ts.mu.Lock() + rf := ts.srvs[i].Raft() + ts.mu.Unlock() + if rf != nil { + index1, _, ok1 := rf.Start(x) + if ok1 { + ok = ok1 + index = index1 + } + } + } + if ok { + // maybe leader will commit our value, maybe not. + // but don't wait forever. + for _, to := range []int{10, 20, 50, 100, 200} { + nd, cmd := ts.nCommitted(index) + if nd > 0 { + if xx, ok := cmd.(int); ok { + if xx == x { + values = append(values, x) + } + } else { + ts.t.Fatalf("wrong command type") + } + break + } + time.Sleep(time.Duration(to) * time.Millisecond) + } + } else { + time.Sleep(time.Duration(79+me*17) * time.Millisecond) + } + } + ret = values + } + + ncli := 3 + cha := []chan []int{} + for i := 0; i < ncli; i++ { + cha = append(cha, make(chan []int)) + go cfn(i, cha[i]) + } + + for iters := 0; iters < 20; iters++ { + if (rand.Int() % 1000) < 200 { + i := rand.Int() % servers + ts.g.DisconnectAll(i) + } + + if (rand.Int() % 1000) < 500 { + i := rand.Int() % servers + if ts.srvs[i].raft == nil { + ts.restart(i) + } + ts.g.ConnectOne(i) + } + + if (rand.Int() % 1000) < 200 { + i := rand.Int() % servers + if ts.srvs[i].raft != nil { + ts.g.ShutdownServer(i) + } + } + + // Make crash/restart infrequent enough that the peers can often + // keep up, but not so infrequent that everything has settled + // down from one change to the next. Pick a value smaller than + // the election timeout, but not hugely smaller. + time.Sleep((RaftElectionTimeout * 7) / 10) + } + + time.Sleep(RaftElectionTimeout) + ts.SetReliable(true) + for i := 0; i < servers; i++ { + if ts.srvs[i].raft == nil { + ts.restart(i) + } + ts.g.ConnectOne(i) + } + + atomic.StoreInt32(&stop, 1) + + values := []int{} + for i := 0; i < ncli; i++ { + vv := <-cha[i] + if vv == nil { + t.Fatal("client failed") + } + values = append(values, vv...) + } + + time.Sleep(RaftElectionTimeout) + + lastIndex := ts.one(rand.Int(), servers, true) + + really := make([]int, lastIndex+1) + for index := 1; index <= lastIndex; index++ { + v := ts.wait(index, servers, -1) + if vi, ok := v.(int); ok { + really = append(really, vi) + } else { + t.Fatalf("not an int") + } + } + + for _, v1 := range values { + ok := false + for _, v2 := range really { + if v1 == v2 { + ok = true + } + } + if ok == false { + ts.t.Fatalf("didn't find a value") + } + } + +} + +func TestReliableChurn3C(t *testing.T) { + internalChurn(t, true) +} + +func TestUnreliableChurn3C(t *testing.T) { + internalChurn(t, false) +} + +const ( + MAXLOGSIZE = 2000 +) + +func snapcommon(t *testing.T, name string, disconnect bool, reliable bool, crash bool) { + iters := 30 + servers := 3 + ts := makeTest(t, servers, reliable, true) + defer ts.cleanup() + + ts.Begin(name) + + ts.one(rand.Int(), servers, true) + leader1 := ts.checkOneLeader() + + for i := 0; i < iters; i++ { + victim := (leader1 + 1) % servers + sender := leader1 + if i%3 == 1 { + sender = (leader1 + 1) % servers + victim = leader1 + } + + if disconnect { + ts.g.DisconnectAll(victim) + ts.one(rand.Int(), servers-1, true) + } + if crash { + ts.g.ShutdownServer(victim) + ts.one(rand.Int(), servers-1, true) + } + + // perhaps send enough to get a snapshot + nn := (SnapShotInterval / 2) + (rand.Int() % SnapShotInterval) + for i := 0; i < nn; i++ { + ts.srvs[sender].Raft().Start(rand.Int()) + } + + // let applier threads catch up with the Start()'s + if disconnect == false && crash == false { + // make sure all followers have caught up, so that + // an InstallSnapshot RPC isn't required for + // TestSnapshotBasic3D(). + ts.one(rand.Int(), servers, true) + } else { + ts.one(rand.Int(), servers-1, true) + } + + if ts.g.LogSize() >= MAXLOGSIZE { + ts.t.Fatalf("Log size too large") + } + if disconnect { + // reconnect a follower, who maybe behind and + // needs to rceive a snapshot to catch up. + ts.g.ConnectOne(victim) + ts.one(rand.Int(), servers, true) + leader1 = ts.checkOneLeader() + } + if crash { + ts.restart(victim) + ts.one(rand.Int(), servers, true) + leader1 = ts.checkOneLeader() + } + } +} + +func TestSnapshotBasic3D(t *testing.T) { + snapcommon(t, "Test (3D): snapshots basic", false, true, false) +} + +func TestSnapshotInstall3D(t *testing.T) { + snapcommon(t, "Test (3D): install snapshots (disconnect)", true, true, false) +} + +func TestSnapshotInstallUnreliable3D(t *testing.T) { + snapcommon(t, "Test (3D): install snapshots (disconnect)", + true, false, false) +} + +func TestSnapshotInstallCrash3D(t *testing.T) { + snapcommon(t, "Test (3D): install snapshots (crash)", false, true, true) +} + +func TestSnapshotInstallUnCrash3D(t *testing.T) { + snapcommon(t, "Test (3D): install snapshots (crash)", false, false, true) +} + +// do the servers persist the snapshots, and +// restart using snapshot along with the +// tail of the log? +func TestSnapshotAllCrash3D(t *testing.T) { + servers := 3 + iters := 5 + ts := makeTest(t, servers, false, true) + defer ts.cleanup() + + ts.Begin("Test (3D): crash and restart all servers") + + ts.one(rand.Int(), servers, true) + + for i := 0; i < iters; i++ { + // perhaps enough to get a snapshot + nn := (SnapShotInterval / 2) + (rand.Int() % SnapShotInterval) + for i := 0; i < nn; i++ { + ts.one(rand.Int(), servers, true) + } + + index1 := ts.one(rand.Int(), servers, true) + + // crash all + ts.g.Shutdown() + ts.g.StartServers() + + index2 := ts.one(rand.Int(), servers, true) + if index2 < index1+1 { + t.Fatalf("index decreased from %v to %v", index1, index2) + } + } +} + +// do servers correctly initialize their in-memory copy of the snapshot, making +// sure that future writes to persistent state don't lose state? +func TestSnapshotInit3D(t *testing.T) { + servers := 3 + ts := makeTest(t, servers, false, true) + defer ts.cleanup() + + ts.Begin("Test (3D): snapshot initialization after crash") + ts.one(rand.Int(), servers, true) + + // enough ops to make a snapshot + nn := SnapShotInterval + 1 + for i := 0; i < nn; i++ { + ts.one(rand.Int(), servers, true) + } + + ts.g.Shutdown() + ts.g.StartServers() + + // a single op, to get something to be written back to persistent storage. + ts.one(rand.Int(), servers, true) + + ts.g.Shutdown() + ts.g.StartServers() + + // do another op to trigger potential bug + ts.one(rand.Int(), servers, true) +} diff --git a/src/raft1/server.go b/src/raft1/server.go new file mode 100644 index 0000000..249080d --- /dev/null +++ b/src/raft1/server.go @@ -0,0 +1,185 @@ +package raft + +import ( + "bytes" + "fmt" + "log" + "sync" + + "6.5840/labgob" + "6.5840/labrpc" + "6.5840/tester1" +) + +const ( + SnapShotInterval = 10 +) + +type rfsrv struct { + ts *Test + me int + applyErr string // from apply channel readers + lastApplied int + persister *tester.Persister + + mu sync.Mutex + raft *Raft + logs map[int]any // copy of each server's committed entries +} + +func newRfsrv(ts *Test, srv int, ends []*labrpc.ClientEnd, persister *tester.Persister, snapshot bool) *rfsrv { + //log.Printf("mksrv %d", srv) + s := &rfsrv{ + ts: ts, + me: srv, + logs: map[int]any{}, + persister: persister, + } + applyCh := make(chan ApplyMsg) + s.raft = Make(ends, srv, persister, applyCh) + if snapshot { + snapshot := persister.ReadSnapshot() + if snapshot != nil && len(snapshot) > 0 { + // mimic KV server and process snapshot now. + // ideally Raft should send it up on applyCh... + err := s.ingestSnap(snapshot, -1) + if err != "" { + ts.t.Fatal(err) + } + } + go s.applierSnap(applyCh) + } else { + go s.applier(applyCh) + } + return s +} + +func (rs *rfsrv) Kill() { + //log.Printf("rs kill %d", rs.me) + rs.raft = nil // tester will call Kill() on rs.raft + if rs.persister != nil { + // mimic KV server that saves its persistent state in case it + // restarts. + raftlog := rs.persister.ReadRaftState() + snapshot := rs.persister.ReadSnapshot() + rs.persister.Save(raftlog, snapshot) + } +} + +func (rs *rfsrv) GetState() (int, bool) { + rs.mu.Lock() + defer rs.mu.Unlock() + return rs.raft.GetState() +} + +func (rs *rfsrv) Raft() *Raft { + rs.mu.Lock() + defer rs.mu.Unlock() + return rs.raft +} + +func (rs *rfsrv) Logs(i int) (any, bool) { + rs.mu.Lock() + defer rs.mu.Unlock() + v, ok := rs.logs[i] + return v, ok +} + +// applier reads message from apply ch and checks that they match the log +// contents +func (rs *rfsrv) applier(applyCh chan ApplyMsg) { + for m := range applyCh { + if m.CommandValid == false { + // ignore other types of ApplyMsg + } else { + err_msg, prevok := rs.ts.checkLogs(rs.me, m) + if m.CommandIndex > 1 && prevok == false { + err_msg = fmt.Sprintf("server %v apply out of order %v", rs.me, m.CommandIndex) + } + if err_msg != "" { + log.Fatalf("apply error: %v", err_msg) + rs.applyErr = err_msg + // keep reading after error so that Raft doesn't block + // holding locks... + } + } + } +} + +// periodically snapshot raft state +func (rs *rfsrv) applierSnap(applyCh chan ApplyMsg) { + if rs.raft == nil { + return // ??? + } + + for m := range applyCh { + err_msg := "" + if m.SnapshotValid { + err_msg = rs.ingestSnap(m.Snapshot, m.SnapshotIndex) + } else if m.CommandValid { + if m.CommandIndex != rs.lastApplied+1 { + err_msg = fmt.Sprintf("server %v apply out of order, expected index %v, got %v", rs.me, rs.lastApplied+1, m.CommandIndex) + } + + if err_msg == "" { + var prevok bool + err_msg, prevok = rs.ts.checkLogs(rs.me, m) + if m.CommandIndex > 1 && prevok == false { + err_msg = fmt.Sprintf("server %v apply out of order %v", rs.me, m.CommandIndex) + } + } + + rs.lastApplied = m.CommandIndex + + if (m.CommandIndex+1)%SnapShotInterval == 0 { + w := new(bytes.Buffer) + e := labgob.NewEncoder(w) + e.Encode(m.CommandIndex) + var xlog []any + for j := 0; j <= m.CommandIndex; j++ { + xlog = append(xlog, rs.logs[j]) + } + e.Encode(xlog) + rs.raft.Snapshot(m.CommandIndex, w.Bytes()) + } + } else { + // Ignore other types of ApplyMsg. + } + if err_msg != "" { + log.Fatalf("apply error: %v", err_msg) + rs.applyErr = err_msg + // keep reading after error so that Raft doesn't block + // holding locks... + } + } +} + +// returns "" or error string +func (rs *rfsrv) ingestSnap(snapshot []byte, index int) string { + rs.mu.Lock() + defer rs.mu.Unlock() + + if snapshot == nil { + log.Fatalf("nil snapshot") + return "nil snapshot" + } + r := bytes.NewBuffer(snapshot) + d := labgob.NewDecoder(r) + var lastIncludedIndex int + var xlog []any + if d.Decode(&lastIncludedIndex) != nil || + d.Decode(&xlog) != nil { + log.Fatalf("snapshot decode error") + return "snapshot Decode() error" + } + if index != -1 && index != lastIncludedIndex { + err := fmt.Sprintf("server %v snapshot doesn't match m.SnapshotIndex", rs.me) + return err + } + rs.logs = map[int]any{} + for j := 0; j < len(xlog); j++ { + rs.logs[j] = xlog[j] + } + rs.lastApplied = lastIncludedIndex + return "" +} diff --git a/src/raft1/test.go b/src/raft1/test.go new file mode 100644 index 0000000..05d85e2 --- /dev/null +++ b/src/raft1/test.go @@ -0,0 +1,266 @@ +package raft + +import ( + "fmt" + //log + "math/rand" + "sync" + "sync/atomic" + "testing" + "time" + + "6.5840/labrpc" + "6.5840/tester1" +) + +type Test struct { + *tester.Config + t *testing.T + n int + g *tester.ServerGrp + + finished int32 + + mu sync.Mutex + srvs []*rfsrv + maxIndex int + snapshot bool +} + +func makeTest(t *testing.T, n int, reliable bool, snapshot bool) *Test { + ts := &Test{ + t: t, + n: n, + srvs: make([]*rfsrv, n), + snapshot: snapshot, + } + ts.Config = tester.MakeConfig(t, n, reliable, ts.mksrv) + ts.Config.SetLongDelays(true) + ts.g = ts.Group(tester.GRP0) + return ts +} + +func (ts *Test) cleanup() { + atomic.StoreInt32(&ts.finished, 1) + ts.End() + ts.Config.Cleanup() + ts.CheckTimeout() +} + +func (ts *Test) mksrv(ends []*labrpc.ClientEnd, grp tester.Tgid, srv int, persister *tester.Persister) []tester.IService { + s := newRfsrv(ts, srv, ends, persister, ts.snapshot) + ts.srvs[srv] = s + return []tester.IService{s, s.raft} +} + +func (ts *Test) restart(i int) { + ts.g.StartServer(i) // which will call mksrv to make a new server + ts.Group(tester.GRP0).ConnectAll() +} + +func (ts *Test) checkOneLeader() int { + for iters := 0; iters < 10; iters++ { + ms := 450 + (rand.Int63() % 100) + time.Sleep(time.Duration(ms) * time.Millisecond) + + leaders := make(map[int][]int) + for i := 0; i < ts.n; i++ { + if ts.g.IsConnected(i) { + if term, leader := ts.srvs[i].GetState(); leader { + leaders[term] = append(leaders[term], i) + } + } + } + + lastTermWithLeader := -1 + for term, leaders := range leaders { + if len(leaders) > 1 { + ts.Fatalf("term %d has %d (>1) leaders", term, len(leaders)) + } + if term > lastTermWithLeader { + lastTermWithLeader = term + } + } + + if len(leaders) != 0 { + return leaders[lastTermWithLeader][0] + } + } + ts.Fatalf("expected one leader, got none") + return -1 +} + +func (ts *Test) checkTerms() int { + term := -1 + for i := 0; i < ts.n; i++ { + if ts.g.IsConnected(i) { + xterm, _ := ts.srvs[i].GetState() + if term == -1 { + term = xterm + } else if term != xterm { + ts.Fatalf("servers disagree on term") + } + } + } + return term +} + +func (ts *Test) checkLogs(i int, m ApplyMsg) (string, bool) { + ts.mu.Lock() + defer ts.mu.Unlock() + + err_msg := "" + v := m.Command + me := ts.srvs[i] + for j, rs := range ts.srvs { + if old, oldok := rs.Logs(m.CommandIndex); oldok && old != v { + //log.Printf("%v: log %v; server %v\n", i, me.logs, rs.logs) + // some server has already committed a different value for this entry! + err_msg = fmt.Sprintf("commit index=%v server=%v %v != server=%v %v", + m.CommandIndex, i, m.Command, j, old) + } + } + _, prevok := me.logs[m.CommandIndex-1] + me.logs[m.CommandIndex] = v + if m.CommandIndex > ts.maxIndex { + ts.maxIndex = m.CommandIndex + } + return err_msg, prevok +} + +// check that none of the connected servers +// thinks it is the leader. +func (ts *Test) checkNoLeader() { + for i := 0; i < ts.n; i++ { + if ts.g.IsConnected(i) { + _, is_leader := ts.srvs[i].GetState() + if is_leader { + ts.Fatalf("expected no leader among connected servers, but %v claims to be leader", i) + } + } + } +} + +// how many servers think a log entry is committed? +func (ts *Test) nCommitted(index int) (int, any) { + ts.mu.Lock() + defer ts.mu.Unlock() + + count := 0 + var cmd any = nil + for _, rs := range ts.srvs { + if rs.applyErr != "" { + ts.t.Fatal(rs.applyErr) + } + + cmd1, ok := rs.Logs(index) + + if ok { + if count > 0 && cmd != cmd1 { + ts.Fatalf("committed values do not match: index %v, %v, %v", + index, cmd, cmd1) + } + count += 1 + cmd = cmd1 + } + } + return count, cmd +} + +// do a complete agreement. +// it might choose the wrong leader initially, +// and have to re-submit after giving up. +// entirely gives up after about 10 seconds. +// indirectly checks that the servers agree on the +// same value, since nCommitted() checks this, +// as do the threads that read from applyCh. +// returns index. +// if retry==true, may submit the command multiple +// times, in case a leader fails just after Start(). +// if retry==false, calls Start() only once, in order +// to simplify the early Lab 3B tests. +func (ts *Test) one(cmd any, expectedServers int, retry bool) int { + t0 := time.Now() + starts := 0 + for time.Since(t0).Seconds() < 10 && ts.checkFinished() == false { + // try all the servers, maybe one is the leader. + index := -1 + for range ts.srvs { + starts = (starts + 1) % len(ts.srvs) + var rf *Raft + if ts.g.IsConnected(starts) { + rf = ts.srvs[starts].raft + } + if rf != nil { + //log.Printf("peer %d Start %v", starts, cmd) + index1, _, ok := rf.Start(cmd) + if ok { + index = index1 + break + } + } + } + + if index != -1 { + // somebody claimed to be the leader and to have + // submitted our command; wait a while for agreement. + t1 := time.Now() + for time.Since(t1).Seconds() < 2 { + nd, cmd1 := ts.nCommitted(index) + if nd > 0 && nd >= expectedServers { + // committed + if cmd1 == cmd { + // and it was the command we submitted. + return index + } + } + time.Sleep(20 * time.Millisecond) + } + if retry == false { + ts.Fatalf("one(%v) failed to reach agreement", cmd) + } + } else { + time.Sleep(50 * time.Millisecond) + } + } + if ts.checkFinished() == false { + ts.Fatalf("one(%v) failed to reach agreement", cmd) + } + return -1 +} + +func (ts *Test) checkFinished() bool { + z := atomic.LoadInt32(&ts.finished) + return z != 0 +} + +// wait for at least n servers to commit. +// but don't wait forever. +func (ts *Test) wait(index int, n int, startTerm int) any { + to := 10 * time.Millisecond + for iters := 0; iters < 30; iters++ { + nd, _ := ts.nCommitted(index) + if nd >= n { + break + } + time.Sleep(to) + if to < time.Second { + to *= 2 + } + if startTerm > -1 { + for _, rs := range ts.srvs { + if t, _ := rs.raft.GetState(); t > startTerm { + // someone has moved on + // can no longer guarantee that we'll "win" + return -1 + } + } + } + } + nd, cmd := ts.nCommitted(index) + if nd < n { + ts.Fatalf("only %d decided for index %d; wanted %d", + nd, index, n) + } + return cmd +} diff --git a/src/raft1/test.out b/src/raft1/test.out new file mode 100644 index 0000000..2225471 --- /dev/null +++ b/src/raft1/test.out @@ -0,0 +1,164 @@ +=== RUN TestInitialElection3A +Test (3A): initial election (reliable network)... + ... Passed -- 3.9 3 38 0 +--- PASS: TestInitialElection3A (3.95s) +=== RUN TestReElection3A +Test (3A): election after network failure (reliable network)... + ... Passed -- 6.4 3 68 0 +--- PASS: TestReElection3A (6.43s) +=== RUN TestManyElections3A +Test (3A): multiple elections (reliable network)... + ... Passed -- 9.7 7 414 0 +--- PASS: TestManyElections3A (9.66s) +=== RUN TestBasicAgree3B +Test (3B): basic agreement (reliable network)... + ... Passed -- 1.9 3 18 0 +--- PASS: TestBasicAgree3B (1.91s) +=== RUN TestRPCBytes3B +Test (3B): RPC byte count (reliable network)... + ... Passed -- 3.5 3 50 0 +--- PASS: TestRPCBytes3B (3.50s) +=== RUN TestFollowerFailure3B +Test (3B): test progressive failure of followers (reliable network)... + ... Passed -- 5.9 3 60 0 +--- PASS: TestFollowerFailure3B (5.90s) +=== RUN TestLeaderFailure3B +Test (3B): test failure of leaders (reliable network)... + ... Passed -- 6.5 3 113 0 +--- PASS: TestLeaderFailure3B (6.52s) +=== RUN TestFailAgree3B +Test (3B): agreement after follower reconnects (reliable network)... + ... Passed -- 6.4 3 68 0 +--- PASS: TestFailAgree3B (6.43s) +=== RUN TestFailNoAgree3B +Test (3B): no agreement if too many followers disconnect (reliable network)... + ... Passed -- 4.7 5 116 0 +--- PASS: TestFailNoAgree3B (4.69s) +=== RUN TestConcurrentStarts3B +Test (3B): concurrent Start()s (reliable network)... + ... Passed -- 2.1 3 20 0 +--- PASS: TestConcurrentStarts3B (2.06s) +=== RUN TestRejoin3B +Test (3B): rejoin of partitioned leader (reliable network)... + ... Passed -- 7.9 3 102 0 +--- PASS: TestRejoin3B (7.91s) +=== RUN TestBackup3B +Test (3B): leader backs up quickly over incorrect follower logs (reliable network)... + ... Passed -- 26.0 5 944 0 +--- PASS: TestBackup3B (26.04s) +=== RUN TestCount3B +Test (3B): RPC counts aren't too high (reliable network)... + ... Passed -- 3.5 3 34 0 +--- PASS: TestCount3B (3.55s) +=== RUN TestPersist13C +Test (3C): basic persistence (reliable network)... + ... Passed -- 6.8 3 58 0 +--- PASS: TestPersist13C (6.77s) +=== RUN TestPersist23C +Test (3C): more persistence (reliable network)... + ... Passed -- 21.9 5 337 0 +--- PASS: TestPersist23C (21.89s) +=== RUN TestPersist33C +Test (3C): partitioned leader and one follower crash, leader restarts (reliable network)... + ... Passed -- 3.6 3 32 0 +--- PASS: TestPersist33C (3.63s) +=== RUN TestFigure83C +Test (3C): Figure 8 (reliable netwTestFigure83Cork)... +goroutine 7711 [running]: +runtime/debug.Stack() + /usr/lib/go/src/runtime/debug/stack.go:26 +0x67 +runtime/debug.PrintStack() + /usr/lib/go/src/runtime/debug/stack.go:18 +0x1d +6.5840/tester1.(*Config).Fatalf(0xc0001d0c00, {0x7d12ea, 0x21}, {0xc000519df0, 0x1, 0x1}) + /home/kaashoek/classes/65840-2025/staff/mygo/src/tester1/config.go:145 +0x3b +6.5840/raft1.(*Test).one(0xc0001d0ba0, {0x777260, 0xc0003a9bd8}, 0x5, 0x1) + /home/kaashoek/classes/65840-2025/staff/mygo/src/raft1/test.go:227 +0x227 +6.5840/raft1.TestFigure83C(0xc00030cea0) + /home/kaashoek/classes/65840-2025/staff/mygo/src/raft1/raft_test.go:824 +0x3f0 +testing.tRunner(0xc00030cea0, 0x7db020) + /usr/lib/go/src/testing/testing.go:1690 +0x227 +created by testing.(*T).Run in goroutine 1 + /usr/lib/go/src/testing/testing.go:1743 +0x826 + config.go:146: one(3082010368482628662) failed to reach agreement +--- FAIL: TestFigure83C (59.13s) +=== RUN TestUnreliableAgree3C +Test (3C): unreliable agreement (unreliable network)... + ... Passed -- 4.4 5 245 0 +--- PASS: TestUnreliableAgree3C (4.42s) +=== RUN TestFigure8Unreliable3C +Test (3C): Figure 8 (unreliable) (unreliable network)... + ... Passed -- 34.3 5 845 0 +--- PASS: TestFigure8Unreliable3C (34.26s) +=== RUN TestReliableChurn3C +Test (3C): churn (reliable network)... +goroutine 21747 [running]: +runtime/debug.Stack() + /usr/lib/go/src/runtime/debug/stack.go:26 +0x67 +runtime/debug.PrintStack() + /usr/lib/go/src/runtime/debug/stack.go:18 +0x1d +6.5840/tester1.(*Config).Fatalf(0xc000845d40, {0x7d12ea, 0x21}, {0xc000519cd0, 0x1, 0x1}) + /home/kaashoek/classes/65840-2025/staff/mygo/src/tester1/config.go:145 +0x3b +6.5840/raft1.(*Test).one(0xc000845ce0, {0x777260, 0xc000735788}, 0x5, 0x1) + /home/kaashoek/classes/65840-2025/staff/mygo/src/raft1/test.go:227 +0x227 +6.5840/raft1.internalChurn(0xc000258000, 0x1) + /home/kaashoek/classes/65840-2025/staff/mygo/src/raft1/raft_test.go:1028 +0xbd0 +6.5840/raft1.TestReliableChurn3C(0xc000258000) + /home/kaashoek/classes/65840-2025/staff/mygo/src/raft1/raft_test.go:1055 +0x2c +testing.tRunner(0xc000258000, 0x7db080) + /usr/lib/go/src/testing/testing.go:1690 +0x227 +created by testing.(*T).Run in goroutine 1 + /usr/lib/go/src/testing/testing.go:1743 +0x826 + config.go:146: one(4060683027750831645) failed to reach agreement +--- FAIL: TestReliableChurn3C (26.10s) +=== RUN TestUnreliableChurn3C +Test (3C): unreliable churn (unreliable network)... +goroutine 26074 [running]: +runtime/debug.Stack() + /usr/lib/go/src/runtime/debug/stack.go:26 +0x67 +runtime/debug.PrintStack() + /usr/lib/go/src/runtime/debug/stack.go:18 +0x1d +6.5840/tester1.(*Config).Fatalf(0xc00074db00, {0x7d12ea, 0x21}, {0xc000519cd0, 0x1, 0x1}) + /home/kaashoek/classes/65840-2025/staff/mygo/src/tester1/config.go:145 +0x3b +6.5840/raft1.(*Test).one(0xc00074daa0, {0x777260, 0xc000634008}, 0x5, 0x1) + /home/kaashoek/classes/65840-2025/staff/mygo/src/raft1/test.go:227 +0x227 +6.5840/raft1.internalChurn(0xc0001c2000, 0x0) + /home/kaashoek/classes/65840-2025/staff/mygo/src/raft1/raft_test.go:1028 +0xbd0 +6.5840/raft1.TestUnreliableChurn3C(0xc0001c2000) + /home/kaashoek/classes/65840-2025/staff/mygo/src/raft1/raft_test.go:1059 +0x29 +testing.tRunner(0xc0001c2000, 0x7db0c8) + /usr/lib/go/src/testing/testing.go:1690 +0x227 +created by testing.(*T).Run in goroutine 1 + /usr/lib/go/src/testing/testing.go:1743 +0x826 + config.go:146: one(8736026451642885096) failed to reach agreement +--- FAIL: TestUnreliableChurn3C (26.11s) +=== RUN TestSnapshotBasic3D +Test (3D): snapshots basic (reliable network)... + ... Passed -- 11.0 3 201 0 +--- PASS: TestSnapshotBasic3D (11.04s) +=== RUN TestSnapshotInstall3D +Test (3D): install snapshots (disconnect) (reliable network)... + ... Passed -- 65.5 3 909 0 +--- PASS: TestSnapshotInstall3D (65.46s) +=== RUN TestSnapshotInstallUnreliable3D +Test (3D): install snapshots (disconnect) (unreliable network)... + ... Passed -- 88.1 3 1159 0 +--- PASS: TestSnapshotInstallUnreliable3D (88.10s) +=== RUN TestSnapshotInstallCrash3D +Test (3D): install snapshots (crash) (reliable network)... + ... Passed -- 53.4 3 590 0 +--- PASS: TestSnapshotInstallCrash3D (53.38s) +=== RUN TestSnapshotInstallUnCrash3D +Test (3D): install snapshots (crash) (unreliable network)... + ... Passed -- 65.6 3 690 0 +--- PASS: TestSnapshotInstallUnCrash3D (65.60s) +=== RUN TestSnapshotAllCrash3D +Test (3D): crash and restart all servers (unreliable network)... + ... Passed -- 23.2 3 336 0 +--- PASS: TestSnapshotAllCrash3D (23.20s) +=== RUN TestSnapshotInit3D +Test (3D): snapshot initialization after crash (unreliable network)... + ... Passed -- 6.3 3 75 0 +--- PASS: TestSnapshotInit3D (6.34s) +FAIL +exit status 1 +FAIL 6.5840/raft1 584.014s diff --git a/src/raft1/util.go b/src/raft1/util.go new file mode 100644 index 0000000..e064403 --- /dev/null +++ b/src/raft1/util.go @@ -0,0 +1,12 @@ +package raft + +import "log" + +// Debugging +const Debug = false + +func DPrintf(format string, a ...interface{}) { + if Debug { + log.Printf(format, a...) + } +} diff --git a/src/shardkv1/shardcfg/shardcfg_test.go b/src/shardkv1/shardcfg/shardcfg_test.go index 1e62793..4d45204 100644 --- a/src/shardkv1/shardcfg/shardcfg_test.go +++ b/src/shardkv1/shardcfg/shardcfg_test.go @@ -3,7 +3,7 @@ package shardcfg import ( "testing" - "6.5840/kvtest1" + "6.5840/tester1" ) func check_same_config(t *testing.T, c1 ShardConfig, c2 ShardConfig) { @@ -37,13 +37,13 @@ func TestBasic(t *testing.T) { Gid2 = 2 ) cfg := MakeShardConfig() - cfg.CheckConfig(t, []kvtest.Tgid{}) + cfg.CheckConfig(t, []tester.Tgid{}) - cfg.JoinBalance(map[kvtest.Tgid][]string{Gid1: []string{"x", "y", "z"}}) - cfg.CheckConfig(t, []kvtest.Tgid{Gid1}) + cfg.JoinBalance(map[tester.Tgid][]string{Gid1: []string{"x", "y", "z"}}) + cfg.CheckConfig(t, []tester.Tgid{Gid1}) - cfg.JoinBalance(map[kvtest.Tgid][]string{Gid2: []string{"a", "b", "c"}}) - cfg.CheckConfig(t, []kvtest.Tgid{Gid1, Gid2}) + cfg.JoinBalance(map[tester.Tgid][]string{Gid2: []string{"a", "b", "c"}}) + cfg.CheckConfig(t, []tester.Tgid{Gid1, Gid2}) sa1 := cfg.Groups[Gid1] if len(sa1) != 3 || sa1[0] != "x" || sa1[1] != "y" || sa1[2] != "z" { @@ -54,9 +54,9 @@ func TestBasic(t *testing.T) { t.Fatalf("wrong servers for gid %v: %v\n", Gid2, sa2) } - cfg.LeaveBalance([]kvtest.Tgid{Gid1}) - cfg.CheckConfig(t, []kvtest.Tgid{Gid2}) + cfg.LeaveBalance([]tester.Tgid{Gid1}) + cfg.CheckConfig(t, []tester.Tgid{Gid2}) - cfg.LeaveBalance([]kvtest.Tgid{Gid2}) - cfg.CheckConfig(t, []kvtest.Tgid{}) + cfg.LeaveBalance([]tester.Tgid{Gid2}) + cfg.CheckConfig(t, []tester.Tgid{}) } diff --git a/src/shardkv1/shardgrp/server.go b/src/shardkv1/shardgrp/server.go index 5f21832..20a8e5b 100644 --- a/src/shardkv1/shardgrp/server.go +++ b/src/shardkv1/shardgrp/server.go @@ -8,7 +8,6 @@ import ( "6.5840/kvsrv1/rpc" "6.5840/labgob" "6.5840/labrpc" - "6.5840/raft" "6.5840/shardkv1/shardgrp/shardrpc" ) @@ -69,11 +68,6 @@ func (kv *KVServer) Kill() { // Your code here, if desired. } -// Return kv's raft struct -func (kv *KVServer) Raft() *raft.Raft { - return kv.rsm.Raft() -} - func (kv *KVServer) killed() bool { z := atomic.LoadInt32(&kv.dead) return z == 1 @@ -81,7 +75,7 @@ func (kv *KVServer) killed() bool { // StartKVServer() and MakeRSM() must return quickly, so they should // start goroutines for any long-running work. -func StartKVServer(servers []*labrpc.ClientEnd, gid tester.Tgid, me int, persister *raft.Persister, maxraftstate int) tester.IKVServer { +func StartKVServer(servers []*labrpc.ClientEnd, gid tester.Tgid, me int, persister *tester.Persister, maxraftstate int) []tester.IService { // call labgob.Register on structures you want // Go's RPC library to marshall/unmarshall. labgob.Register(shardrpc.PutArgs{}) @@ -95,5 +89,5 @@ func StartKVServer(servers []*labrpc.ClientEnd, gid tester.Tgid, me int, persist kv.rsm = rsm.MakeRSM(servers, me, persister, maxraftstate, kv) // Your code here - return kv + return []tester.IService{kv, kv.rsm.Raft()} } diff --git a/src/shardkv1/test.go b/src/shardkv1/test.go index e6b287b..33b12f5 100644 --- a/src/shardkv1/test.go +++ b/src/shardkv1/test.go @@ -25,8 +25,9 @@ type Test struct { sck *shardctrler.ShardCtrlerClerk part string - mu sync.Mutex - ngid tester.Tgid + maxraftstate int + mu sync.Mutex + ngid tester.Tgid } const ( @@ -38,17 +39,22 @@ const ( // Setup a kvraft group (group 0) for the shard controller and make // the controller clerk. func MakeTest(t *testing.T, part string, reliable, randomkeys bool) *Test { - cfg := tester.MakeConfig(t, NSRV, reliable, -1, kvraft.StartKVServer) ts := &Test{ - ngid: shardcfg.Gid1 + 1, // Gid1 is in use - t: t, + ngid: shardcfg.Gid1 + 1, // Gid1 is in use + t: t, + maxraftstate: -1, } + cfg := tester.MakeConfig(t, NSRV, reliable, ts.StartKVServerControler) ts.Test = kvtest.MakeTest(t, cfg, randomkeys, ts) ts.sck = ts.makeShardCtrlerClerk() ts.Begin(part) return ts } +func (ts *Test) StartKVServerControler(servers []*labrpc.ClientEnd, gid tester.Tgid, me int, persister *tester.Persister) []tester.IService { + return kvraft.StartKVServer(servers, gid, me, persister, ts.maxraftstate) +} + func (ts *Test) MakeClerk() kvtest.IKVClerk { clnt := ts.Config.MakeClient() ck := MakeClerk(clnt, ts.makeQueryClerk()) @@ -102,7 +108,7 @@ func (ts *Test) groups(n int) []tester.Tgid { // itself to own all shards. func (ts *Test) setupKVService() tester.Tgid { scfg := shardcfg.MakeShardConfig() - ts.Config.MakeGroupStart(shardcfg.Gid1, NSRV, -1, shardgrp.StartKVServer) + ts.Config.MakeGroupStart(shardcfg.Gid1, NSRV, ts.StartKVServerShard) scfg.JoinBalance(map[tester.Tgid][]string{shardcfg.Gid1: ts.Group(shardcfg.Gid1).SrvNames()}) if err := ts.sck.Init(scfg); err != rpc.OK { ts.t.Fatalf("Init err %v", err) @@ -111,9 +117,13 @@ func (ts *Test) setupKVService() tester.Tgid { return shardcfg.Gid1 } +func (ts *Test) StartKVServerShard(servers []*labrpc.ClientEnd, gid tester.Tgid, me int, persister *tester.Persister) []tester.IService { + return shardgrp.StartKVServer(servers, gid, me, persister, ts.maxraftstate) +} + func (ts *Test) joinGroups(sck *shardctrler.ShardCtrlerClerk, gids []tester.Tgid) rpc.Err { for i, gid := range gids { - ts.Config.MakeGroupStart(gid, NSRV, -1, shardgrp.StartKVServer) + ts.Config.MakeGroupStart(gid, NSRV, ts.StartKVServerShard) if err := sck.Join(gid, ts.Group(gid).SrvNames()); err != rpc.OK { return err } @@ -141,11 +151,11 @@ func (ts *Test) checkLogs(gids []tester.Tgid) { for _, gid := range gids { n := ts.Group(gid).LogSize() s := ts.Group(gid).SnapshotSize() - if ts.Group(gid).Maxraftstate >= 0 && n > 8*ts.Group(gid).Maxraftstate { + if ts.maxraftstate >= 0 && n > 8*ts.maxraftstate { ts.t.Fatalf("persister.RaftStateSize() %v, but maxraftstate %v", - n, ts.Group(gid).Maxraftstate) + n, ts.maxraftstate) } - if ts.Group(gid).Maxraftstate < 0 && s > 0 { + if ts.maxraftstate < 0 && s > 0 { ts.t.Fatalf("maxraftstate is -1, but snapshot is non-empty!") } diff --git a/src/tester1/config.go b/src/tester1/config.go index ced536b..d213b7c 100644 --- a/src/tester1/config.go +++ b/src/tester1/config.go @@ -15,16 +15,10 @@ import ( "time" "6.5840/labrpc" - "6.5840/raft" ) const GRP0 = 0 -type IKVServer interface { - Raft() *raft.Raft - Kill() -} - type Config struct { *Clnts // The clnts in the test *Groups // The server groups in the test @@ -39,7 +33,7 @@ type Config struct { ops int32 // number of clerk get/put/append method calls } -func MakeConfig(t *testing.T, n int, reliable bool, maxraftstate int, mks FstartServer) *Config { +func MakeConfig(t *testing.T, n int, reliable bool, mks FstartServer) *Config { ncpu_once.Do(func() { if runtime.NumCPU() < 2 { fmt.Printf("warning: only one CPU, which may conceal locking bugs\n") @@ -51,7 +45,7 @@ func MakeConfig(t *testing.T, n int, reliable bool, maxraftstate int, mks Fstart cfg.t = t cfg.net = labrpc.MakeNetwork() cfg.Groups = newGroups(cfg.net) - cfg.MakeGroupStart(GRP0, n, maxraftstate, mks) + cfg.MakeGroupStart(GRP0, n, mks) cfg.Clnts = makeClnts(cfg.net) cfg.start = time.Now() @@ -87,8 +81,8 @@ func (cfg *Config) Cleanup() { cfg.CheckTimeout() } -func (cfg *Config) MakeGroupStart(gid Tgid, nsrv, maxraftstate int, mks FstartServer) { - cfg.MakeGroup(gid, nsrv, maxraftstate, mks) +func (cfg *Config) MakeGroupStart(gid Tgid, nsrv int, mks FstartServer) { + cfg.MakeGroup(gid, nsrv, mks) cfg.Group(gid).StartServers() } diff --git a/src/tester1/group.go b/src/tester1/group.go index 271289d..5c4fa28 100644 --- a/src/tester1/group.go +++ b/src/tester1/group.go @@ -6,12 +6,18 @@ import ( "sync" "6.5840/labrpc" - "6.5840/raft" ) type Tgid int -type FstartServer func(ends []*labrpc.ClientEnd, grp Tgid, srv int, persister *raft.Persister, maxraftstate int) IKVServer +// A service must support Kill(); the tester will Kill() +// on service returned by FstartServer() +type IService interface { + Kill() +} + +// Start server and return the services to register with labrpc +type FstartServer func(ends []*labrpc.ClientEnd, grp Tgid, srv int, persister *Persister) []IService // Each server has a name: i'th server of group gid. If there is only a single // server, it its gid = 0 and its i is 0. @@ -31,11 +37,11 @@ func newGroups(net *labrpc.Network) *Groups { return &Groups{net: net, grps: make(map[Tgid]*ServerGrp)} } -func (gs *Groups) MakeGroup(gid Tgid, nsrv, maxraftstate int, mks FstartServer) { +func (gs *Groups) MakeGroup(gid Tgid, nsrv int, mks FstartServer) { gs.mu.Lock() defer gs.mu.Unlock() - gs.grps[gid] = makeSrvGrp(gs.net, gid, nsrv, maxraftstate, mks) + gs.grps[gid] = makeSrvGrp(gs.net, gid, nsrv, mks) } func (gs *Groups) lookupGroup(gid Tgid) *ServerGrp { @@ -62,8 +68,6 @@ func (gs *Groups) cleanup() { } type ServerGrp struct { - Maxraftstate int - net *labrpc.Network srvs []*Server servernames []string @@ -72,14 +76,13 @@ type ServerGrp struct { mks FstartServer } -func makeSrvGrp(net *labrpc.Network, gid Tgid, n, m int, mks FstartServer) *ServerGrp { +func makeSrvGrp(net *labrpc.Network, gid Tgid, n int, mks FstartServer) *ServerGrp { sg := &ServerGrp{ - Maxraftstate: m, - net: net, - srvs: make([]*Server, n), - gid: gid, - connected: make([]bool, n), - mks: mks, + net: net, + srvs: make([]*Server, n), + gid: gid, + connected: make([]bool, n), + mks: mks, } for i, _ := range sg.srvs { sg.srvs[i] = makeServer(net, gid, n) @@ -99,6 +102,14 @@ func (sg *ServerGrp) SrvNames() []string { return sg.servernames } +func (sg *ServerGrp) Services() [][]IService { + ss := make([][]IService, 0, len(sg.srvs)) + for _, s := range sg.srvs { + ss = append(ss, s.svcs) + } + return ss +} + func (sg *ServerGrp) SrvNamesTo(to []int) []string { ns := make([]string, 0, len(to)) for _, i := range to { @@ -127,8 +138,10 @@ func (sg *ServerGrp) ConnectOne(i int) { func (sg *ServerGrp) cleanup() { for _, s := range sg.srvs { - if s.kvsrv != nil { - s.kvsrv.Kill() + if s.svcs != nil { + for _, svc := range s.svcs { + svc.Kill() + } } } } @@ -210,13 +223,11 @@ func (sg *ServerGrp) StartServer(i int) { srv := sg.srvs[i].startServer(sg.gid) sg.srvs[i] = srv - srv.kvsrv = sg.mks(srv.clntEnds, sg.gid, i, srv.saved, sg.Maxraftstate) - kvsvc := labrpc.MakeService(srv.kvsrv) + srv.svcs = sg.mks(srv.clntEnds, sg.gid, i, srv.saved) labsrv := labrpc.MakeServer() - labsrv.AddService(kvsvc) - if len(sg.srvs) > 1 { // Run with raft? - rfsvc := labrpc.MakeService(srv.kvsrv.Raft()) - labsrv.AddService(rfsvc) + for _, svc := range srv.svcs { + s := labrpc.MakeService(svc) + labsrv.AddService(s) } sg.net.AddServer(ServerName(sg.gid, i), labsrv) } @@ -254,23 +265,8 @@ func (sg *ServerGrp) start() { } } -func (sg *ServerGrp) GetState(i int) (int, bool) { - return sg.srvs[i].kvsrv.Raft().GetState() -} - -func (sg *ServerGrp) Leader() (bool, int) { - for i, _ := range sg.srvs { - _, is_leader := sg.GetState(i) - if is_leader { - return true, i - } - } - return false, 0 -} - // Partition servers into 2 groups and put current leader in minority -func (sg *ServerGrp) MakePartition() ([]int, []int) { - _, l := sg.Leader() +func (sg *ServerGrp) MakePartition(l int) ([]int, []int) { n := len(sg.srvs) p1 := make([]int, n/2+1) p2 := make([]int, n/2) diff --git a/src/tester1/persister.go b/src/tester1/persister.go new file mode 100644 index 0000000..540b97f --- /dev/null +++ b/src/tester1/persister.go @@ -0,0 +1,70 @@ +package tester + +// +// support for Raft and kvraft to save persistent +// Raft state (log &c) and k/v server snapshots. +// +// we will use the original persister.go to test your code for grading. +// so, while you can modify this code to help you debug, please +// test with the original before submitting. +// + +import "sync" + +type Persister struct { + mu sync.Mutex + raftstate []byte + snapshot []byte +} + +func MakePersister() *Persister { + return &Persister{} +} + +func clone(orig []byte) []byte { + x := make([]byte, len(orig)) + copy(x, orig) + return x +} + +func (ps *Persister) Copy() *Persister { + ps.mu.Lock() + defer ps.mu.Unlock() + np := MakePersister() + np.raftstate = ps.raftstate + np.snapshot = ps.snapshot + return np +} + +func (ps *Persister) ReadRaftState() []byte { + ps.mu.Lock() + defer ps.mu.Unlock() + return clone(ps.raftstate) +} + +func (ps *Persister) RaftStateSize() int { + ps.mu.Lock() + defer ps.mu.Unlock() + return len(ps.raftstate) +} + +// Save both Raft state and K/V snapshot as a single atomic action, +// to help avoid them getting out of sync. +func (ps *Persister) Save(raftstate []byte, snapshot []byte) { + ps.mu.Lock() + defer ps.mu.Unlock() + ps.raftstate = clone(raftstate) + ps.snapshot = clone(snapshot) +} + +func (ps *Persister) ReadSnapshot() []byte { + ps.mu.Lock() + defer ps.mu.Unlock() + return clone(ps.snapshot) +} + +func (ps *Persister) SnapshotSize() int { + ps.mu.Lock() + defer ps.mu.Unlock() + return len(ps.snapshot) +} diff --git a/src/tester1/srv.go b/src/tester1/srv.go index eacd6d1..045ae3f 100644 --- a/src/tester1/srv.go +++ b/src/tester1/srv.go @@ -2,15 +2,16 @@ package tester import ( // "log" + "sync" "6.5840/labrpc" - "6.5840/raft" ) type Server struct { + mu sync.Mutex net *labrpc.Network - saved *raft.Persister - kvsrv IKVServer + saved *Persister + svcs []IService // list of services exported by endNames []string clntEnds []*labrpc.ClientEnd } @@ -40,12 +41,14 @@ func (s *Server) startServer(gid Tgid) *Server { if s.saved != nil { srv.saved = s.saved.Copy() } else { - srv.saved = raft.MakePersister() + srv.saved = MakePersister() } return srv } func (s *Server) connect(to []int) { + s.mu.Lock() + defer s.mu.Unlock() for j := 0; j < len(to); j++ { endname := s.endNames[to[j]] s.net.Enable(endname, true) @@ -53,6 +56,9 @@ func (s *Server) connect(to []int) { } func (s *Server) disconnect(from []int) { + s.mu.Lock() + defer s.mu.Unlock() + if s.endNames == nil { return } @@ -62,8 +68,10 @@ func (s *Server) disconnect(from []int) { } } -// XXX lock s? func (s *Server) shutdownServer() { + s.mu.Lock() + defer s.mu.Unlock() + // a fresh persister, in case old instance // continues to update the Persister. // but copy old persister's content so that we always @@ -72,9 +80,11 @@ func (s *Server) shutdownServer() { s.saved = s.saved.Copy() } - kv := s.kvsrv - if kv != nil { - kv.Kill() - s.kvsrv = nil + // inform all services to stop + for _, svc := range s.svcs { + if svc != nil { + svc.Kill() + } } + s.svcs = nil }