diff --git a/src/raft/config.go b/src/raft/config.go deleted file mode 100644 index c23280c..0000000 --- a/src/raft/config.go +++ /dev/null @@ -1,648 +0,0 @@ -package raft - -// -// support for Raft tester. -// -// we will use the original config.go to test your code for grading. -// so, while you can modify this code to help you debug, please -// test with the original before submitting. -// - -import "6.5840/labgob" -import "6.5840/labrpc" -import "bytes" -import "log" -import "sync" -import "sync/atomic" -import "testing" -import "runtime" -import "math/rand" -import crand "crypto/rand" -import "math/big" -import "encoding/base64" -import "time" -import "fmt" - -func randstring(n int) string { - b := make([]byte, 2*n) - crand.Read(b) - s := base64.URLEncoding.EncodeToString(b) - return s[0:n] -} - -func makeSeed() int64 { - max := big.NewInt(int64(1) << 62) - bigx, _ := crand.Int(crand.Reader, max) - x := bigx.Int64() - return x -} - -type config struct { - mu sync.Mutex - t *testing.T - finished int32 - net *labrpc.Network - n int - rafts []*Raft - applyErr []string // from apply channel readers - connected []bool // whether each server is on the net - saved []*Persister - endnames [][]string // the port file names each sends to - logs []map[int]interface{} // copy of each server's committed entries - lastApplied []int - start time.Time // time at which make_config() was called - // begin()/end() statistics - t0 time.Time // time at which test_test.go called cfg.begin() - rpcs0 int // rpcTotal() at start of test - cmds0 int // number of agreements - bytes0 int64 - maxIndex int - maxIndex0 int -} - -var ncpu_once sync.Once - -func make_config(t *testing.T, n int, unreliable bool, snapshot bool) *config { - ncpu_once.Do(func() { - if runtime.NumCPU() < 2 { - fmt.Printf("warning: only one CPU, which may conceal locking bugs\n") - } - rand.Seed(makeSeed()) - }) - runtime.GOMAXPROCS(4) - cfg := &config{} - cfg.t = t - cfg.net = labrpc.MakeNetwork() - cfg.n = n - cfg.applyErr = make([]string, cfg.n) - cfg.rafts = make([]*Raft, cfg.n) - cfg.connected = make([]bool, cfg.n) - cfg.saved = make([]*Persister, cfg.n) - cfg.endnames = make([][]string, cfg.n) - cfg.logs = make([]map[int]interface{}, cfg.n) - cfg.lastApplied = make([]int, cfg.n) - cfg.start = time.Now() - - cfg.setunreliable(unreliable) - - cfg.net.LongDelays(true) - - applier := cfg.applier - if snapshot { - applier = cfg.applierSnap - } - // create a full set of Rafts. - for i := 0; i < cfg.n; i++ { - cfg.logs[i] = map[int]interface{}{} - cfg.start1(i, applier) - } - - // connect everyone - for i := 0; i < cfg.n; i++ { - cfg.connect(i) - } - - return cfg -} - -// shut down a Raft server but save its persistent state. -func (cfg *config) crash1(i int) { - cfg.disconnect(i) - cfg.net.DeleteServer(i) // disable client connections to the server. - - cfg.mu.Lock() - defer cfg.mu.Unlock() - - // a fresh persister, in case old instance - // continues to update the Persister. - // but copy old persister's content so that we always - // pass Make() the last persisted state. - if cfg.saved[i] != nil { - cfg.saved[i] = cfg.saved[i].Copy() - } - - rf := cfg.rafts[i] - if rf != nil { - cfg.mu.Unlock() - rf.Kill() - cfg.mu.Lock() - cfg.rafts[i] = nil - } - - if cfg.saved[i] != nil { - raftlog := cfg.saved[i].ReadRaftState() - snapshot := cfg.saved[i].ReadSnapshot() - cfg.saved[i] = &Persister{} - cfg.saved[i].Save(raftlog, snapshot) - } -} - -func (cfg *config) checkLogs(i int, m ApplyMsg) (string, bool) { - err_msg := "" - v := m.Command - for j := 0; j < len(cfg.logs); j++ { - if old, oldok := cfg.logs[j][m.CommandIndex]; oldok && old != v { - log.Printf("%v: log %v; server %v\n", i, cfg.logs[i], cfg.logs[j]) - // some server has already committed a different value for this entry! - err_msg = fmt.Sprintf("commit index=%v server=%v %v != server=%v %v", - m.CommandIndex, i, m.Command, j, old) - } - } - _, prevok := cfg.logs[i][m.CommandIndex-1] - cfg.logs[i][m.CommandIndex] = v - if m.CommandIndex > cfg.maxIndex { - cfg.maxIndex = m.CommandIndex - } - return err_msg, prevok -} - -// applier reads message from apply ch and checks that they match the log -// contents -func (cfg *config) applier(i int, applyCh chan ApplyMsg) { - for m := range applyCh { - if m.CommandValid == false { - // ignore other types of ApplyMsg - } else { - cfg.mu.Lock() - err_msg, prevok := cfg.checkLogs(i, m) - cfg.mu.Unlock() - if m.CommandIndex > 1 && prevok == false { - err_msg = fmt.Sprintf("server %v apply out of order %v", i, m.CommandIndex) - } - if err_msg != "" { - log.Fatalf("apply error: %v", err_msg) - cfg.applyErr[i] = err_msg - // keep reading after error so that Raft doesn't block - // holding locks... - } - } - } -} - -// returns "" or error string -func (cfg *config) ingestSnap(i int, snapshot []byte, index int) string { - if snapshot == nil { - log.Fatalf("nil snapshot") - return "nil snapshot" - } - r := bytes.NewBuffer(snapshot) - d := labgob.NewDecoder(r) - var lastIncludedIndex int - var xlog []interface{} - if d.Decode(&lastIncludedIndex) != nil || - d.Decode(&xlog) != nil { - log.Fatalf("snapshot decode error") - return "snapshot Decode() error" - } - if index != -1 && index != lastIncludedIndex { - err := fmt.Sprintf("server %v snapshot doesn't match m.SnapshotIndex", i) - return err - } - cfg.logs[i] = map[int]interface{}{} - for j := 0; j < len(xlog); j++ { - cfg.logs[i][j] = xlog[j] - } - cfg.lastApplied[i] = lastIncludedIndex - return "" -} - -const SnapShotInterval = 10 - -// periodically snapshot raft state -func (cfg *config) applierSnap(i int, applyCh chan ApplyMsg) { - cfg.mu.Lock() - rf := cfg.rafts[i] - cfg.mu.Unlock() - if rf == nil { - return // ??? - } - - for m := range applyCh { - err_msg := "" - if m.SnapshotValid { - cfg.mu.Lock() - err_msg = cfg.ingestSnap(i, m.Snapshot, m.SnapshotIndex) - cfg.mu.Unlock() - } else if m.CommandValid { - if m.CommandIndex != cfg.lastApplied[i]+1 { - err_msg = fmt.Sprintf("server %v apply out of order, expected index %v, got %v", i, cfg.lastApplied[i]+1, m.CommandIndex) - } - - if err_msg == "" { - cfg.mu.Lock() - var prevok bool - err_msg, prevok = cfg.checkLogs(i, m) - cfg.mu.Unlock() - if m.CommandIndex > 1 && prevok == false { - err_msg = fmt.Sprintf("server %v apply out of order %v", i, m.CommandIndex) - } - } - - cfg.mu.Lock() - cfg.lastApplied[i] = m.CommandIndex - cfg.mu.Unlock() - - if (m.CommandIndex+1)%SnapShotInterval == 0 { - w := new(bytes.Buffer) - e := labgob.NewEncoder(w) - e.Encode(m.CommandIndex) - var xlog []interface{} - for j := 0; j <= m.CommandIndex; j++ { - xlog = append(xlog, cfg.logs[i][j]) - } - e.Encode(xlog) - rf.Snapshot(m.CommandIndex, w.Bytes()) - } - } else { - // Ignore other types of ApplyMsg. - } - if err_msg != "" { - log.Fatalf("apply error: %v", err_msg) - cfg.applyErr[i] = err_msg - // keep reading after error so that Raft doesn't block - // holding locks... - } - } -} - -// start or re-start a Raft. -// if one already exists, "kill" it first. -// allocate new outgoing port file names, and a new -// state persister, to isolate previous instance of -// this server. since we cannot really kill it. -func (cfg *config) start1(i int, applier func(int, chan ApplyMsg)) { - cfg.crash1(i) - - // a fresh set of outgoing ClientEnd names. - // so that old crashed instance's ClientEnds can't send. - cfg.endnames[i] = make([]string, cfg.n) - for j := 0; j < cfg.n; j++ { - cfg.endnames[i][j] = randstring(20) - } - - // a fresh set of ClientEnds. - ends := make([]*labrpc.ClientEnd, cfg.n) - for j := 0; j < cfg.n; j++ { - ends[j] = cfg.net.MakeEnd(cfg.endnames[i][j]) - cfg.net.Connect(cfg.endnames[i][j], j) - } - - cfg.mu.Lock() - - cfg.lastApplied[i] = 0 - - // a fresh persister, so old instance doesn't overwrite - // new instance's persisted state. - // but copy old persister's content so that we always - // pass Make() the last persisted state. - if cfg.saved[i] != nil { - cfg.saved[i] = cfg.saved[i].Copy() - - snapshot := cfg.saved[i].ReadSnapshot() - if snapshot != nil && len(snapshot) > 0 { - // mimic KV server and process snapshot now. - // ideally Raft should send it up on applyCh... - err := cfg.ingestSnap(i, snapshot, -1) - if err != "" { - cfg.t.Fatal(err) - } - } - } else { - cfg.saved[i] = MakePersister() - } - - cfg.mu.Unlock() - - applyCh := make(chan ApplyMsg) - - rf := Make(ends, i, cfg.saved[i], applyCh) - - cfg.mu.Lock() - cfg.rafts[i] = rf - cfg.mu.Unlock() - - go applier(i, applyCh) - - svc := labrpc.MakeService(rf) - srv := labrpc.MakeServer() - srv.AddService(svc) - cfg.net.AddServer(i, srv) -} - -func (cfg *config) checkTimeout() { - // enforce a two minute real-time limit on each test - if !cfg.t.Failed() && time.Since(cfg.start) > 120*time.Second { - cfg.t.Fatal("test took longer than 120 seconds") - } -} - -func (cfg *config) checkFinished() bool { - z := atomic.LoadInt32(&cfg.finished) - return z != 0 -} - -func (cfg *config) cleanup() { - atomic.StoreInt32(&cfg.finished, 1) - for i := 0; i < len(cfg.rafts); i++ { - if cfg.rafts[i] != nil { - cfg.rafts[i].Kill() - } - } - cfg.net.Cleanup() - cfg.checkTimeout() -} - -// attach server i to the net. -func (cfg *config) connect(i int) { - // fmt.Printf("connect(%d)\n", i) - - cfg.connected[i] = true - - // outgoing ClientEnds - for j := 0; j < cfg.n; j++ { - if cfg.connected[j] { - endname := cfg.endnames[i][j] - cfg.net.Enable(endname, true) - } - } - - // incoming ClientEnds - for j := 0; j < cfg.n; j++ { - if cfg.connected[j] { - endname := cfg.endnames[j][i] - cfg.net.Enable(endname, true) - } - } -} - -// detach server i from the net. -func (cfg *config) disconnect(i int) { - // fmt.Printf("disconnect(%d)\n", i) - - cfg.connected[i] = false - - // outgoing ClientEnds - for j := 0; j < cfg.n; j++ { - if cfg.endnames[i] != nil { - endname := cfg.endnames[i][j] - cfg.net.Enable(endname, false) - } - } - - // incoming ClientEnds - for j := 0; j < cfg.n; j++ { - if cfg.endnames[j] != nil { - endname := cfg.endnames[j][i] - cfg.net.Enable(endname, false) - } - } -} - -func (cfg *config) rpcCount(server int) int { - return cfg.net.GetCount(server) -} - -func (cfg *config) rpcTotal() int { - return cfg.net.GetTotalCount() -} - -func (cfg *config) setunreliable(unrel bool) { - cfg.net.Reliable(!unrel) -} - -func (cfg *config) bytesTotal() int64 { - return cfg.net.GetTotalBytes() -} - -func (cfg *config) setlongreordering(longrel bool) { - cfg.net.LongReordering(longrel) -} - -// check that one of the connected servers thinks -// it is the leader, and that no other connected -// server thinks otherwise. -// -// try a few times in case re-elections are needed. -func (cfg *config) checkOneLeader() int { - for iters := 0; iters < 10; iters++ { - ms := 450 + (rand.Int63() % 100) - time.Sleep(time.Duration(ms) * time.Millisecond) - - leaders := make(map[int][]int) - for i := 0; i < cfg.n; i++ { - if cfg.connected[i] { - if term, leader := cfg.rafts[i].GetState(); leader { - leaders[term] = append(leaders[term], i) - } - } - } - - lastTermWithLeader := -1 - for term, leaders := range leaders { - if len(leaders) > 1 { - cfg.t.Fatalf("term %d has %d (>1) leaders", term, len(leaders)) - } - if term > lastTermWithLeader { - lastTermWithLeader = term - } - } - - if len(leaders) != 0 { - return leaders[lastTermWithLeader][0] - } - } - cfg.t.Fatalf("expected one leader, got none") - return -1 -} - -// check that everyone agrees on the term. -func (cfg *config) checkTerms() int { - term := -1 - for i := 0; i < cfg.n; i++ { - if cfg.connected[i] { - xterm, _ := cfg.rafts[i].GetState() - if term == -1 { - term = xterm - } else if term != xterm { - cfg.t.Fatalf("servers disagree on term") - } - } - } - return term -} - -// check that none of the connected servers -// thinks it is the leader. -func (cfg *config) checkNoLeader() { - for i := 0; i < cfg.n; i++ { - if cfg.connected[i] { - _, is_leader := cfg.rafts[i].GetState() - if is_leader { - cfg.t.Fatalf("expected no leader among connected servers, but %v claims to be leader", i) - } - } - } -} - -// how many servers think a log entry is committed? -func (cfg *config) nCommitted(index int) (int, interface{}) { - count := 0 - var cmd interface{} = nil - for i := 0; i < len(cfg.rafts); i++ { - if cfg.applyErr[i] != "" { - cfg.t.Fatal(cfg.applyErr[i]) - } - - cfg.mu.Lock() - cmd1, ok := cfg.logs[i][index] - cfg.mu.Unlock() - - if ok { - if count > 0 && cmd != cmd1 { - cfg.t.Fatalf("committed values do not match: index %v, %v, %v", - index, cmd, cmd1) - } - count += 1 - cmd = cmd1 - } - } - return count, cmd -} - -// wait for at least n servers to commit. -// but don't wait forever. -func (cfg *config) wait(index int, n int, startTerm int) interface{} { - to := 10 * time.Millisecond - for iters := 0; iters < 30; iters++ { - nd, _ := cfg.nCommitted(index) - if nd >= n { - break - } - time.Sleep(to) - if to < time.Second { - to *= 2 - } - if startTerm > -1 { - for _, r := range cfg.rafts { - if t, _ := r.GetState(); t > startTerm { - // someone has moved on - // can no longer guarantee that we'll "win" - return -1 - } - } - } - } - nd, cmd := cfg.nCommitted(index) - if nd < n { - cfg.t.Fatalf("only %d decided for index %d; wanted %d", - nd, index, n) - } - return cmd -} - -// do a complete agreement. -// it might choose the wrong leader initially, -// and have to re-submit after giving up. -// entirely gives up after about 10 seconds. -// indirectly checks that the servers agree on the -// same value, since nCommitted() checks this, -// as do the threads that read from applyCh. -// returns index. -// if retry==true, may submit the command multiple -// times, in case a leader fails just after Start(). -// if retry==false, calls Start() only once, in order -// to simplify the early Lab 3B tests. -func (cfg *config) one(cmd interface{}, expectedServers int, retry bool) int { - t0 := time.Now() - starts := 0 - for time.Since(t0).Seconds() < 10 && cfg.checkFinished() == false { - // try all the servers, maybe one is the leader. - index := -1 - for si := 0; si < cfg.n; si++ { - starts = (starts + 1) % cfg.n - var rf *Raft - cfg.mu.Lock() - if cfg.connected[starts] { - rf = cfg.rafts[starts] - } - cfg.mu.Unlock() - if rf != nil { - index1, _, ok := rf.Start(cmd) - if ok { - index = index1 - break - } - } - } - - if index != -1 { - // somebody claimed to be the leader and to have - // submitted our command; wait a while for agreement. - t1 := time.Now() - for time.Since(t1).Seconds() < 2 { - nd, cmd1 := cfg.nCommitted(index) - if nd > 0 && nd >= expectedServers { - // committed - if cmd1 == cmd { - // and it was the command we submitted. - return index - } - } - time.Sleep(20 * time.Millisecond) - } - if retry == false { - cfg.t.Fatalf("one(%v) failed to reach agreement", cmd) - } - } else { - time.Sleep(50 * time.Millisecond) - } - } - if cfg.checkFinished() == false { - cfg.t.Fatalf("one(%v) failed to reach agreement", cmd) - } - return -1 -} - -// start a Test. -// print the Test message. -// e.g. cfg.begin("Test (3B): RPC counts aren't too high") -func (cfg *config) begin(description string) { - fmt.Printf("%s ...\n", description) - cfg.t0 = time.Now() - cfg.rpcs0 = cfg.rpcTotal() - cfg.bytes0 = cfg.bytesTotal() - cfg.cmds0 = 0 - cfg.maxIndex0 = cfg.maxIndex -} - -// end a Test -- the fact that we got here means there -// was no failure. -// print the Passed message, -// and some performance numbers. -func (cfg *config) end() { - cfg.checkTimeout() - if cfg.t.Failed() == false { - cfg.mu.Lock() - t := time.Since(cfg.t0).Seconds() // real time - npeers := cfg.n // number of Raft peers - nrpc := cfg.rpcTotal() - cfg.rpcs0 // number of RPC sends - nbytes := cfg.bytesTotal() - cfg.bytes0 // number of bytes - ncmds := cfg.maxIndex - cfg.maxIndex0 // number of Raft agreements reported - cfg.mu.Unlock() - - fmt.Printf(" ... Passed --") - fmt.Printf(" %4.1f %d %4d %7d %4d\n", t, npeers, nrpc, nbytes, ncmds) - } -} - -// Maximum log size across all servers -func (cfg *config) LogSize() int { - logsize := 0 - for i := 0; i < cfg.n; i++ { - n := cfg.saved[i].RaftStateSize() - if n > logsize { - logsize = n - } - } - return logsize -} diff --git a/src/raft/persister.go b/src/raft/persister.go deleted file mode 100644 index c5f816c..0000000 --- a/src/raft/persister.go +++ /dev/null @@ -1,70 +0,0 @@ -package raft - -// -// support for Raft and kvraft to save persistent -// Raft state (log &c) and k/v server snapshots. -// -// we will use the original persister.go to test your code for grading. -// so, while you can modify this code to help you debug, please -// test with the original before submitting. -// - -import "sync" - -type Persister struct { - mu sync.Mutex - raftstate []byte - snapshot []byte -} - -func MakePersister() *Persister { - return &Persister{} -} - -func clone(orig []byte) []byte { - x := make([]byte, len(orig)) - copy(x, orig) - return x -} - -func (ps *Persister) Copy() *Persister { - ps.mu.Lock() - defer ps.mu.Unlock() - np := MakePersister() - np.raftstate = ps.raftstate - np.snapshot = ps.snapshot - return np -} - -func (ps *Persister) ReadRaftState() []byte { - ps.mu.Lock() - defer ps.mu.Unlock() - return clone(ps.raftstate) -} - -func (ps *Persister) RaftStateSize() int { - ps.mu.Lock() - defer ps.mu.Unlock() - return len(ps.raftstate) -} - -// Save both Raft state and K/V snapshot as a single atomic action, -// to help avoid them getting out of sync. -func (ps *Persister) Save(raftstate []byte, snapshot []byte) { - ps.mu.Lock() - defer ps.mu.Unlock() - ps.raftstate = clone(raftstate) - ps.snapshot = clone(snapshot) -} - -func (ps *Persister) ReadSnapshot() []byte { - ps.mu.Lock() - defer ps.mu.Unlock() - return clone(ps.snapshot) -} - -func (ps *Persister) SnapshotSize() int { - ps.mu.Lock() - defer ps.mu.Unlock() - return len(ps.snapshot) -} diff --git a/src/raft/raft.go b/src/raft/raft.go deleted file mode 100644 index 264d77e..0000000 --- a/src/raft/raft.go +++ /dev/null @@ -1,259 +0,0 @@ -package raft - -// -// this is an outline of the API that raft must expose to -// the service (or tester). see comments below for -// each of these functions for more details. -// -// rf = Make(...) -// create a new Raft server. -// rf.Start(command interface{}) (index, term, isleader) -// start agreement on a new log entry -// rf.GetState() (term, isLeader) -// ask a Raft for its current term, and whether it thinks it is leader -// ApplyMsg -// each time a new entry is committed to the log, each Raft peer -// should send an ApplyMsg to the service (or tester) -// in the same server. -// - -import ( - // "bytes" - "math/rand" - "sync" - "sync/atomic" - "time" - - // "6.5840/labgob" - "6.5840/labrpc" -) - - -// as each Raft peer becomes aware that successive log entries are -// committed, the peer should send an ApplyMsg to the service (or -// tester) on the same server, via the applyCh passed to Make(). set -// CommandValid to true to indicate that the ApplyMsg contains a newly -// committed log entry. -// -// in part 3D you'll want to send other kinds of messages (e.g., -// snapshots) on the applyCh, but set CommandValid to false for these -// other uses. -type ApplyMsg struct { - CommandValid bool - Command interface{} - CommandIndex int - - // For 3D: - SnapshotValid bool - Snapshot []byte - SnapshotTerm int - SnapshotIndex int -} - -// A Go object implementing a single Raft peer. -type Raft struct { - mu sync.Mutex // Lock to protect shared access to this peer's state - peers []*labrpc.ClientEnd // RPC end points of all peers - persister *Persister // Object to hold this peer's persisted state - me int // this peer's index into peers[] - dead int32 // set by Kill() - - // Your data here (3A, 3B, 3C). - // Look at the paper's Figure 2 for a description of what - // state a Raft server must maintain. - -} - -// return currentTerm and whether this server -// believes it is the leader. -func (rf *Raft) GetState() (int, bool) { - - var term int - var isleader bool - // Your code here (3A). - return term, isleader -} - -// save Raft's persistent state to stable storage, -// where it can later be retrieved after a crash and restart. -// see paper's Figure 2 for a description of what should be persistent. -// before you've implemented snapshots, you should pass nil as the -// second argument to persister.Save(). -// after you've implemented snapshots, pass the current snapshot -// (or nil if there's not yet a snapshot). -func (rf *Raft) persist() { - // Your code here (3C). - // Example: - // w := new(bytes.Buffer) - // e := labgob.NewEncoder(w) - // e.Encode(rf.xxx) - // e.Encode(rf.yyy) - // raftstate := w.Bytes() - // rf.persister.Save(raftstate, nil) -} - - -// restore previously persisted state. -func (rf *Raft) readPersist(data []byte) { - if data == nil || len(data) < 1 { // bootstrap without any state? - return - } - // Your code here (3C). - // Example: - // r := bytes.NewBuffer(data) - // d := labgob.NewDecoder(r) - // var xxx - // var yyy - // if d.Decode(&xxx) != nil || - // d.Decode(&yyy) != nil { - // error... - // } else { - // rf.xxx = xxx - // rf.yyy = yyy - // } -} - - -// the service says it has created a snapshot that has -// all info up to and including index. this means the -// service no longer needs the log through (and including) -// that index. Raft should now trim its log as much as possible. -func (rf *Raft) Snapshot(index int, snapshot []byte) { - // Your code here (3D). - -} - - -// example RequestVote RPC arguments structure. -// field names must start with capital letters! -type RequestVoteArgs struct { - // Your data here (3A, 3B). -} - -// example RequestVote RPC reply structure. -// field names must start with capital letters! -type RequestVoteReply struct { - // Your data here (3A). -} - -// example RequestVote RPC handler. -func (rf *Raft) RequestVote(args *RequestVoteArgs, reply *RequestVoteReply) { - // Your code here (3A, 3B). -} - -// example code to send a RequestVote RPC to a server. -// server is the index of the target server in rf.peers[]. -// expects RPC arguments in args. -// fills in *reply with RPC reply, so caller should -// pass &reply. -// the types of the args and reply passed to Call() must be -// the same as the types of the arguments declared in the -// handler function (including whether they are pointers). -// -// The labrpc package simulates a lossy network, in which servers -// may be unreachable, and in which requests and replies may be lost. -// Call() sends a request and waits for a reply. If a reply arrives -// within a timeout interval, Call() returns true; otherwise -// Call() returns false. Thus Call() may not return for a while. -// A false return can be caused by a dead server, a live server that -// can't be reached, a lost request, or a lost reply. -// -// Call() is guaranteed to return (perhaps after a delay) *except* if the -// handler function on the server side does not return. Thus there -// is no need to implement your own timeouts around Call(). -// -// look at the comments in ../labrpc/labrpc.go for more details. -// -// if you're having trouble getting RPC to work, check that you've -// capitalized all field names in structs passed over RPC, and -// that the caller passes the address of the reply struct with &, not -// the struct itself. -func (rf *Raft) sendRequestVote(server int, args *RequestVoteArgs, reply *RequestVoteReply) bool { - ok := rf.peers[server].Call("Raft.RequestVote", args, reply) - return ok -} - - -// the service using Raft (e.g. a k/v server) wants to start -// agreement on the next command to be appended to Raft's log. if this -// server isn't the leader, returns false. otherwise start the -// agreement and return immediately. there is no guarantee that this -// command will ever be committed to the Raft log, since the leader -// may fail or lose an election. even if the Raft instance has been killed, -// this function should return gracefully. -// -// the first return value is the index that the command will appear at -// if it's ever committed. the second return value is the current -// term. the third return value is true if this server believes it is -// the leader. -func (rf *Raft) Start(command interface{}) (int, int, bool) { - index := -1 - term := -1 - isLeader := true - - // Your code here (3B). - - - return index, term, isLeader -} - -// the tester doesn't halt goroutines created by Raft after each test, -// but it does call the Kill() method. your code can use killed() to -// check whether Kill() has been called. the use of atomic avoids the -// need for a lock. -// -// the issue is that long-running goroutines use memory and may chew -// up CPU time, perhaps causing later tests to fail and generating -// confusing debug output. any goroutine with a long-running loop -// should call killed() to check whether it should stop. -func (rf *Raft) Kill() { - atomic.StoreInt32(&rf.dead, 1) - // Your code here, if desired. -} - -func (rf *Raft) killed() bool { - z := atomic.LoadInt32(&rf.dead) - return z == 1 -} - -func (rf *Raft) ticker() { - for rf.killed() == false { - - // Your code here (3A) - // Check if a leader election should be started. - - - // pause for a random amount of time between 50 and 350 - // milliseconds. - ms := 50 + (rand.Int63() % 300) - time.Sleep(time.Duration(ms) * time.Millisecond) - } -} - -// the service or tester wants to create a Raft server. the ports -// of all the Raft servers (including this one) are in peers[]. this -// server's port is peers[me]. all the servers' peers[] arrays -// have the same order. persister is a place for this server to -// save its persistent state, and also initially holds the most -// recent saved state, if any. applyCh is a channel on which the -// tester or service expects Raft to send ApplyMsg messages. -// Make() must return quickly, so it should start goroutines -// for any long-running work. -func Make(peers []*labrpc.ClientEnd, me int, - persister *Persister, applyCh chan ApplyMsg) *Raft { - rf := &Raft{} - rf.peers = peers - rf.persister = persister - rf.me = me - - // Your initialization code here (3A, 3B, 3C). - - // initialize from state persisted before a crash - rf.readPersist(persister.ReadRaftState()) - - // start ticker goroutine to start elections - go rf.ticker() - - - return rf -} diff --git a/src/raft/test_test.go b/src/raft/test_test.go deleted file mode 100644 index 6eccdaa..0000000 --- a/src/raft/test_test.go +++ /dev/null @@ -1,1270 +0,0 @@ -package raft - -// -// Raft tests. -// -// we will use the original test_test.go to test your code for grading. -// so, while you can modify this code to help you debug, please -// test with the original before submitting. -// - -import "testing" -import "fmt" -import "time" -import "math/rand" -import "sync/atomic" -import "sync" - -// The tester generously allows solutions to complete elections in one second -// (much more than the paper's range of timeouts). -const RaftElectionTimeout = 1000 * time.Millisecond - -func TestInitialElection3A(t *testing.T) { - servers := 3 - cfg := make_config(t, servers, false, false) - defer cfg.cleanup() - - cfg.begin("Test (3A): initial election") - - // is a leader elected? - cfg.checkOneLeader() - - // sleep a bit to avoid racing with followers learning of the - // election, then check that all peers agree on the term. - time.Sleep(50 * time.Millisecond) - term1 := cfg.checkTerms() - if term1 < 1 { - t.Fatalf("term is %v, but should be at least 1", term1) - } - - // does the leader+term stay the same if there is no network failure? - time.Sleep(2 * RaftElectionTimeout) - term2 := cfg.checkTerms() - if term1 != term2 { - fmt.Printf("warning: term changed even though there were no failures") - } - - // there should still be a leader. - cfg.checkOneLeader() - - cfg.end() -} - -func TestReElection3A(t *testing.T) { - servers := 3 - cfg := make_config(t, servers, false, false) - defer cfg.cleanup() - - cfg.begin("Test (3A): election after network failure") - - leader1 := cfg.checkOneLeader() - - // if the leader disconnects, a new one should be elected. - cfg.disconnect(leader1) - cfg.checkOneLeader() - - // if the old leader rejoins, that shouldn't - // disturb the new leader. and the old leader - // should switch to follower. - cfg.connect(leader1) - leader2 := cfg.checkOneLeader() - - // if there's no quorum, no new leader should - // be elected. - cfg.disconnect(leader2) - cfg.disconnect((leader2 + 1) % servers) - time.Sleep(2 * RaftElectionTimeout) - - // check that the one connected server - // does not think it is the leader. - cfg.checkNoLeader() - - // if a quorum arises, it should elect a leader. - cfg.connect((leader2 + 1) % servers) - cfg.checkOneLeader() - - // re-join of last node shouldn't prevent leader from existing. - cfg.connect(leader2) - cfg.checkOneLeader() - - cfg.end() -} - -func TestManyElections3A(t *testing.T) { - servers := 7 - cfg := make_config(t, servers, false, false) - defer cfg.cleanup() - - cfg.begin("Test (3A): multiple elections") - - cfg.checkOneLeader() - - iters := 10 - for ii := 1; ii < iters; ii++ { - // disconnect three nodes - i1 := rand.Int() % servers - i2 := rand.Int() % servers - i3 := rand.Int() % servers - cfg.disconnect(i1) - cfg.disconnect(i2) - cfg.disconnect(i3) - - // either the current leader should still be alive, - // or the remaining four should elect a new one. - cfg.checkOneLeader() - - cfg.connect(i1) - cfg.connect(i2) - cfg.connect(i3) - } - - cfg.checkOneLeader() - - cfg.end() -} - -func TestBasicAgree3B(t *testing.T) { - servers := 3 - cfg := make_config(t, servers, false, false) - defer cfg.cleanup() - - cfg.begin("Test (3B): basic agreement") - - iters := 3 - for index := 1; index < iters+1; index++ { - nd, _ := cfg.nCommitted(index) - if nd > 0 { - t.Fatalf("some have committed before Start()") - } - - xindex := cfg.one(index*100, servers, false) - if xindex != index { - t.Fatalf("got index %v but expected %v", xindex, index) - } - } - - cfg.end() -} - -// check, based on counting bytes of RPCs, that -// each command is sent to each peer just once. -func TestRPCBytes3B(t *testing.T) { - servers := 3 - cfg := make_config(t, servers, false, false) - defer cfg.cleanup() - - cfg.begin("Test (3B): RPC byte count") - - cfg.one(99, servers, false) - bytes0 := cfg.bytesTotal() - - iters := 10 - var sent int64 = 0 - for index := 2; index < iters+2; index++ { - cmd := randstring(5000) - xindex := cfg.one(cmd, servers, false) - if xindex != index { - t.Fatalf("got index %v but expected %v", xindex, index) - } - sent += int64(len(cmd)) - } - - bytes1 := cfg.bytesTotal() - got := bytes1 - bytes0 - expected := int64(servers) * sent - if got > expected+50000 { - t.Fatalf("too many RPC bytes; got %v, expected %v", got, expected) - } - - cfg.end() -} - -// test just failure of followers. -func TestFollowerFailure3B(t *testing.T) { - servers := 3 - cfg := make_config(t, servers, false, false) - defer cfg.cleanup() - - cfg.begin("Test (3B): test progressive failure of followers") - - cfg.one(101, servers, false) - - // disconnect one follower from the network. - leader1 := cfg.checkOneLeader() - cfg.disconnect((leader1 + 1) % servers) - - // the leader and remaining follower should be - // able to agree despite the disconnected follower. - cfg.one(102, servers-1, false) - time.Sleep(RaftElectionTimeout) - cfg.one(103, servers-1, false) - - // disconnect the remaining follower - leader2 := cfg.checkOneLeader() - cfg.disconnect((leader2 + 1) % servers) - cfg.disconnect((leader2 + 2) % servers) - - // submit a command. - index, _, ok := cfg.rafts[leader2].Start(104) - if ok != true { - t.Fatalf("leader rejected Start()") - } - if index != 4 { - t.Fatalf("expected index 4, got %v", index) - } - - time.Sleep(2 * RaftElectionTimeout) - - // check that command 104 did not commit. - n, _ := cfg.nCommitted(index) - if n > 0 { - t.Fatalf("%v committed but no majority", n) - } - - cfg.end() -} - -// test just failure of leaders. -func TestLeaderFailure3B(t *testing.T) { - servers := 3 - cfg := make_config(t, servers, false, false) - defer cfg.cleanup() - - cfg.begin("Test (3B): test failure of leaders") - - cfg.one(101, servers, false) - - // disconnect the first leader. - leader1 := cfg.checkOneLeader() - cfg.disconnect(leader1) - - // the remaining followers should elect - // a new leader. - cfg.one(102, servers-1, false) - time.Sleep(RaftElectionTimeout) - cfg.one(103, servers-1, false) - - // disconnect the new leader. - leader2 := cfg.checkOneLeader() - cfg.disconnect(leader2) - - // submit a command to each server. - for i := 0; i < servers; i++ { - cfg.rafts[i].Start(104) - } - - time.Sleep(2 * RaftElectionTimeout) - - // check that command 104 did not commit. - n, _ := cfg.nCommitted(4) - if n > 0 { - t.Fatalf("%v committed but no majority", n) - } - - cfg.end() -} - -// test that a follower participates after -// disconnect and re-connect. -func TestFailAgree3B(t *testing.T) { - servers := 3 - cfg := make_config(t, servers, false, false) - defer cfg.cleanup() - - cfg.begin("Test (3B): agreement after follower reconnects") - - cfg.one(101, servers, false) - - // disconnect one follower from the network. - leader := cfg.checkOneLeader() - cfg.disconnect((leader + 1) % servers) - - // the leader and remaining follower should be - // able to agree despite the disconnected follower. - cfg.one(102, servers-1, false) - cfg.one(103, servers-1, false) - time.Sleep(RaftElectionTimeout) - cfg.one(104, servers-1, false) - cfg.one(105, servers-1, false) - - // re-connect - cfg.connect((leader + 1) % servers) - - // the full set of servers should preserve - // previous agreements, and be able to agree - // on new commands. - cfg.one(106, servers, true) - time.Sleep(RaftElectionTimeout) - cfg.one(107, servers, true) - - cfg.end() -} - -func TestFailNoAgree3B(t *testing.T) { - servers := 5 - cfg := make_config(t, servers, false, false) - defer cfg.cleanup() - - cfg.begin("Test (3B): no agreement if too many followers disconnect") - - cfg.one(10, servers, false) - - // 3 of 5 followers disconnect - leader := cfg.checkOneLeader() - cfg.disconnect((leader + 1) % servers) - cfg.disconnect((leader + 2) % servers) - cfg.disconnect((leader + 3) % servers) - - index, _, ok := cfg.rafts[leader].Start(20) - if ok != true { - t.Fatalf("leader rejected Start()") - } - if index != 2 { - t.Fatalf("expected index 2, got %v", index) - } - - time.Sleep(2 * RaftElectionTimeout) - - n, _ := cfg.nCommitted(index) - if n > 0 { - t.Fatalf("%v committed but no majority", n) - } - - // repair - cfg.connect((leader + 1) % servers) - cfg.connect((leader + 2) % servers) - cfg.connect((leader + 3) % servers) - - // the disconnected majority may have chosen a leader from - // among their own ranks, forgetting index 2. - leader2 := cfg.checkOneLeader() - index2, _, ok2 := cfg.rafts[leader2].Start(30) - if ok2 == false { - t.Fatalf("leader2 rejected Start()") - } - if index2 < 2 || index2 > 3 { - t.Fatalf("unexpected index %v", index2) - } - - cfg.one(1000, servers, true) - - cfg.end() -} - -func TestConcurrentStarts3B(t *testing.T) { - servers := 3 - cfg := make_config(t, servers, false, false) - defer cfg.cleanup() - - cfg.begin("Test (3B): concurrent Start()s") - - var success bool -loop: - for try := 0; try < 5; try++ { - if try > 0 { - // give solution some time to settle - time.Sleep(3 * time.Second) - } - - leader := cfg.checkOneLeader() - _, term, ok := cfg.rafts[leader].Start(1) - if !ok { - // leader moved on really quickly - continue - } - - iters := 5 - var wg sync.WaitGroup - is := make(chan int, iters) - for ii := 0; ii < iters; ii++ { - wg.Add(1) - go func(i int) { - defer wg.Done() - i, term1, ok := cfg.rafts[leader].Start(100 + i) - if term1 != term { - return - } - if ok != true { - return - } - is <- i - }(ii) - } - - wg.Wait() - close(is) - - for j := 0; j < servers; j++ { - if t, _ := cfg.rafts[j].GetState(); t != term { - // term changed -- can't expect low RPC counts - continue loop - } - } - - failed := false - cmds := []int{} - for index := range is { - cmd := cfg.wait(index, servers, term) - if ix, ok := cmd.(int); ok { - if ix == -1 { - // peers have moved on to later terms - // so we can't expect all Start()s to - // have succeeded - failed = true - break - } - cmds = append(cmds, ix) - } else { - t.Fatalf("value %v is not an int", cmd) - } - } - - if failed { - // avoid leaking goroutines - go func() { - for range is { - } - }() - continue - } - - for ii := 0; ii < iters; ii++ { - x := 100 + ii - ok := false - for j := 0; j < len(cmds); j++ { - if cmds[j] == x { - ok = true - } - } - if ok == false { - t.Fatalf("cmd %v missing in %v", x, cmds) - } - } - - success = true - break - } - - if !success { - t.Fatalf("term changed too often") - } - - cfg.end() -} - -func TestRejoin3B(t *testing.T) { - servers := 3 - cfg := make_config(t, servers, false, false) - defer cfg.cleanup() - - cfg.begin("Test (3B): rejoin of partitioned leader") - - cfg.one(101, servers, true) - - // leader network failure - leader1 := cfg.checkOneLeader() - cfg.disconnect(leader1) - - // make old leader try to agree on some entries - cfg.rafts[leader1].Start(102) - cfg.rafts[leader1].Start(103) - cfg.rafts[leader1].Start(104) - - // new leader commits, also for index=2 - cfg.one(103, 2, true) - - // new leader network failure - leader2 := cfg.checkOneLeader() - cfg.disconnect(leader2) - - // old leader connected again - cfg.connect(leader1) - - cfg.one(104, 2, true) - - // all together now - cfg.connect(leader2) - - cfg.one(105, servers, true) - - cfg.end() -} - -func TestBackup3B(t *testing.T) { - servers := 5 - cfg := make_config(t, servers, false, false) - defer cfg.cleanup() - - cfg.begin("Test (3B): leader backs up quickly over incorrect follower logs") - - cfg.one(rand.Int(), servers, true) - - // put leader and one follower in a partition - leader1 := cfg.checkOneLeader() - cfg.disconnect((leader1 + 2) % servers) - cfg.disconnect((leader1 + 3) % servers) - cfg.disconnect((leader1 + 4) % servers) - - // submit lots of commands that won't commit - for i := 0; i < 50; i++ { - cfg.rafts[leader1].Start(rand.Int()) - } - - time.Sleep(RaftElectionTimeout / 2) - - cfg.disconnect((leader1 + 0) % servers) - cfg.disconnect((leader1 + 1) % servers) - - // allow other partition to recover - cfg.connect((leader1 + 2) % servers) - cfg.connect((leader1 + 3) % servers) - cfg.connect((leader1 + 4) % servers) - - // lots of successful commands to new group. - for i := 0; i < 50; i++ { - cfg.one(rand.Int(), 3, true) - } - - // now another partitioned leader and one follower - leader2 := cfg.checkOneLeader() - other := (leader1 + 2) % servers - if leader2 == other { - other = (leader2 + 1) % servers - } - cfg.disconnect(other) - - // lots more commands that won't commit - for i := 0; i < 50; i++ { - cfg.rafts[leader2].Start(rand.Int()) - } - - time.Sleep(RaftElectionTimeout / 2) - - // bring original leader back to life, - for i := 0; i < servers; i++ { - cfg.disconnect(i) - } - cfg.connect((leader1 + 0) % servers) - cfg.connect((leader1 + 1) % servers) - cfg.connect(other) - - // lots of successful commands to new group. - for i := 0; i < 50; i++ { - cfg.one(rand.Int(), 3, true) - } - - // now everyone - for i := 0; i < servers; i++ { - cfg.connect(i) - } - cfg.one(rand.Int(), servers, true) - - cfg.end() -} - -func TestCount3B(t *testing.T) { - servers := 3 - cfg := make_config(t, servers, false, false) - defer cfg.cleanup() - - cfg.begin("Test (3B): RPC counts aren't too high") - - rpcs := func() (n int) { - for j := 0; j < servers; j++ { - n += cfg.rpcCount(j) - } - return - } - - leader := cfg.checkOneLeader() - - total1 := rpcs() - - if total1 > 30 || total1 < 1 { - t.Fatalf("too many or few RPCs (%v) to elect initial leader\n", total1) - } - - var total2 int - var success bool -loop: - for try := 0; try < 5; try++ { - if try > 0 { - // give solution some time to settle - time.Sleep(3 * time.Second) - } - - leader = cfg.checkOneLeader() - total1 = rpcs() - - iters := 10 - starti, term, ok := cfg.rafts[leader].Start(1) - if !ok { - // leader moved on really quickly - continue - } - cmds := []int{} - for i := 1; i < iters+2; i++ { - x := int(rand.Int31()) - cmds = append(cmds, x) - index1, term1, ok := cfg.rafts[leader].Start(x) - if term1 != term { - // Term changed while starting - continue loop - } - if !ok { - // No longer the leader, so term has changed - continue loop - } - if starti+i != index1 { - t.Fatalf("Start() failed") - } - } - - for i := 1; i < iters+1; i++ { - cmd := cfg.wait(starti+i, servers, term) - if ix, ok := cmd.(int); ok == false || ix != cmds[i-1] { - if ix == -1 { - // term changed -- try again - continue loop - } - t.Fatalf("wrong value %v committed for index %v; expected %v\n", cmd, starti+i, cmds) - } - } - - failed := false - total2 = 0 - for j := 0; j < servers; j++ { - if t, _ := cfg.rafts[j].GetState(); t != term { - // term changed -- can't expect low RPC counts - // need to keep going to update total2 - failed = true - } - total2 += cfg.rpcCount(j) - } - - if failed { - continue loop - } - - if total2-total1 > (iters+1+3)*3 { - t.Fatalf("too many RPCs (%v) for %v entries\n", total2-total1, iters) - } - - success = true - break - } - - if !success { - t.Fatalf("term changed too often") - } - - time.Sleep(RaftElectionTimeout) - - total3 := 0 - for j := 0; j < servers; j++ { - total3 += cfg.rpcCount(j) - } - - if total3-total2 > 3*20 { - t.Fatalf("too many RPCs (%v) for 1 second of idleness\n", total3-total2) - } - - cfg.end() -} - -func TestPersist13C(t *testing.T) { - servers := 3 - cfg := make_config(t, servers, false, false) - defer cfg.cleanup() - - cfg.begin("Test (3C): basic persistence") - - cfg.one(11, servers, true) - - // crash and re-start all - for i := 0; i < servers; i++ { - cfg.start1(i, cfg.applier) - } - for i := 0; i < servers; i++ { - cfg.disconnect(i) - cfg.connect(i) - } - - cfg.one(12, servers, true) - - leader1 := cfg.checkOneLeader() - cfg.disconnect(leader1) - cfg.start1(leader1, cfg.applier) - cfg.connect(leader1) - - cfg.one(13, servers, true) - - leader2 := cfg.checkOneLeader() - cfg.disconnect(leader2) - cfg.one(14, servers-1, true) - cfg.start1(leader2, cfg.applier) - cfg.connect(leader2) - - cfg.wait(4, servers, -1) // wait for leader2 to join before killing i3 - - i3 := (cfg.checkOneLeader() + 1) % servers - cfg.disconnect(i3) - cfg.one(15, servers-1, true) - cfg.start1(i3, cfg.applier) - cfg.connect(i3) - - cfg.one(16, servers, true) - - cfg.end() -} - -func TestPersist23C(t *testing.T) { - servers := 5 - cfg := make_config(t, servers, false, false) - defer cfg.cleanup() - - cfg.begin("Test (3C): more persistence") - - index := 1 - for iters := 0; iters < 5; iters++ { - cfg.one(10+index, servers, true) - index++ - - leader1 := cfg.checkOneLeader() - - cfg.disconnect((leader1 + 1) % servers) - cfg.disconnect((leader1 + 2) % servers) - - cfg.one(10+index, servers-2, true) - index++ - - cfg.disconnect((leader1 + 0) % servers) - cfg.disconnect((leader1 + 3) % servers) - cfg.disconnect((leader1 + 4) % servers) - - cfg.start1((leader1+1)%servers, cfg.applier) - cfg.start1((leader1+2)%servers, cfg.applier) - cfg.connect((leader1 + 1) % servers) - cfg.connect((leader1 + 2) % servers) - - time.Sleep(RaftElectionTimeout) - - cfg.start1((leader1+3)%servers, cfg.applier) - cfg.connect((leader1 + 3) % servers) - - cfg.one(10+index, servers-2, true) - index++ - - cfg.connect((leader1 + 4) % servers) - cfg.connect((leader1 + 0) % servers) - } - - cfg.one(1000, servers, true) - - cfg.end() -} - -func TestPersist33C(t *testing.T) { - servers := 3 - cfg := make_config(t, servers, false, false) - defer cfg.cleanup() - - cfg.begin("Test (3C): partitioned leader and one follower crash, leader restarts") - - cfg.one(101, 3, true) - - leader := cfg.checkOneLeader() - cfg.disconnect((leader + 2) % servers) - - cfg.one(102, 2, true) - - cfg.crash1((leader + 0) % servers) - cfg.crash1((leader + 1) % servers) - cfg.connect((leader + 2) % servers) - cfg.start1((leader+0)%servers, cfg.applier) - cfg.connect((leader + 0) % servers) - - cfg.one(103, 2, true) - - cfg.start1((leader+1)%servers, cfg.applier) - cfg.connect((leader + 1) % servers) - - cfg.one(104, servers, true) - - cfg.end() -} - -// Test the scenarios described in Figure 8 of the extended Raft paper. Each -// iteration asks a leader, if there is one, to insert a command in the Raft -// log. If there is a leader, that leader will fail quickly with a high -// probability (perhaps without committing the command), or crash after a while -// with low probability (most likey committing the command). If the number of -// alive servers isn't enough to form a majority, perhaps start a new server. -// The leader in a new term may try to finish replicating log entries that -// haven't been committed yet. -func TestFigure83C(t *testing.T) { - servers := 5 - cfg := make_config(t, servers, false, false) - defer cfg.cleanup() - - cfg.begin("Test (3C): Figure 8") - - cfg.one(rand.Int(), 1, true) - - nup := servers - for iters := 0; iters < 1000; iters++ { - leader := -1 - for i := 0; i < servers; i++ { - if cfg.rafts[i] != nil { - _, _, ok := cfg.rafts[i].Start(rand.Int()) - if ok { - leader = i - } - } - } - - if (rand.Int() % 1000) < 100 { - ms := rand.Int63() % (int64(RaftElectionTimeout/time.Millisecond) / 2) - time.Sleep(time.Duration(ms) * time.Millisecond) - } else { - ms := (rand.Int63() % 13) - time.Sleep(time.Duration(ms) * time.Millisecond) - } - - if leader != -1 { - cfg.crash1(leader) - nup -= 1 - } - - if nup < 3 { - s := rand.Int() % servers - if cfg.rafts[s] == nil { - cfg.start1(s, cfg.applier) - cfg.connect(s) - nup += 1 - } - } - } - - for i := 0; i < servers; i++ { - if cfg.rafts[i] == nil { - cfg.start1(i, cfg.applier) - cfg.connect(i) - } - } - - cfg.one(rand.Int(), servers, true) - - cfg.end() -} - -func TestUnreliableAgree3C(t *testing.T) { - servers := 5 - cfg := make_config(t, servers, true, false) - defer cfg.cleanup() - - cfg.begin("Test (3C): unreliable agreement") - - var wg sync.WaitGroup - - for iters := 1; iters < 50; iters++ { - for j := 0; j < 4; j++ { - wg.Add(1) - go func(iters, j int) { - defer wg.Done() - cfg.one((100*iters)+j, 1, true) - }(iters, j) - } - cfg.one(iters, 1, true) - } - - cfg.setunreliable(false) - - wg.Wait() - - cfg.one(100, servers, true) - - cfg.end() -} - -func TestFigure8Unreliable3C(t *testing.T) { - servers := 5 - cfg := make_config(t, servers, true, false) - defer cfg.cleanup() - - cfg.begin("Test (3C): Figure 8 (unreliable)") - - cfg.one(rand.Int()%10000, 1, true) - - nup := servers - for iters := 0; iters < 1000; iters++ { - if iters == 200 { - cfg.setlongreordering(true) - } - leader := -1 - for i := 0; i < servers; i++ { - _, _, ok := cfg.rafts[i].Start(rand.Int() % 10000) - if ok && cfg.connected[i] { - leader = i - } - } - - if (rand.Int() % 1000) < 100 { - ms := rand.Int63() % (int64(RaftElectionTimeout/time.Millisecond) / 2) - time.Sleep(time.Duration(ms) * time.Millisecond) - } else { - ms := (rand.Int63() % 13) - time.Sleep(time.Duration(ms) * time.Millisecond) - } - - if leader != -1 && (rand.Int()%1000) < int(RaftElectionTimeout/time.Millisecond)/2 { - cfg.disconnect(leader) - nup -= 1 - } - - if nup < 3 { - s := rand.Int() % servers - if cfg.connected[s] == false { - cfg.connect(s) - nup += 1 - } - } - } - - for i := 0; i < servers; i++ { - if cfg.connected[i] == false { - cfg.connect(i) - } - } - - cfg.one(rand.Int()%10000, servers, true) - - cfg.end() -} - -func internalChurn(t *testing.T, unreliable bool) { - - servers := 5 - cfg := make_config(t, servers, unreliable, false) - defer cfg.cleanup() - - if unreliable { - cfg.begin("Test (3C): unreliable churn") - } else { - cfg.begin("Test (3C): churn") - } - - stop := int32(0) - - // create concurrent clients - cfn := func(me int, ch chan []int) { - var ret []int - ret = nil - defer func() { ch <- ret }() - values := []int{} - for atomic.LoadInt32(&stop) == 0 { - x := rand.Int() - index := -1 - ok := false - for i := 0; i < servers; i++ { - // try them all, maybe one of them is a leader - cfg.mu.Lock() - rf := cfg.rafts[i] - cfg.mu.Unlock() - if rf != nil { - index1, _, ok1 := rf.Start(x) - if ok1 { - ok = ok1 - index = index1 - } - } - } - if ok { - // maybe leader will commit our value, maybe not. - // but don't wait forever. - for _, to := range []int{10, 20, 50, 100, 200} { - nd, cmd := cfg.nCommitted(index) - if nd > 0 { - if xx, ok := cmd.(int); ok { - if xx == x { - values = append(values, x) - } - } else { - cfg.t.Fatalf("wrong command type") - } - break - } - time.Sleep(time.Duration(to) * time.Millisecond) - } - } else { - time.Sleep(time.Duration(79+me*17) * time.Millisecond) - } - } - ret = values - } - - ncli := 3 - cha := []chan []int{} - for i := 0; i < ncli; i++ { - cha = append(cha, make(chan []int)) - go cfn(i, cha[i]) - } - - for iters := 0; iters < 20; iters++ { - if (rand.Int() % 1000) < 200 { - i := rand.Int() % servers - cfg.disconnect(i) - } - - if (rand.Int() % 1000) < 500 { - i := rand.Int() % servers - if cfg.rafts[i] == nil { - cfg.start1(i, cfg.applier) - } - cfg.connect(i) - } - - if (rand.Int() % 1000) < 200 { - i := rand.Int() % servers - if cfg.rafts[i] != nil { - cfg.crash1(i) - } - } - - // Make crash/restart infrequent enough that the peers can often - // keep up, but not so infrequent that everything has settled - // down from one change to the next. Pick a value smaller than - // the election timeout, but not hugely smaller. - time.Sleep((RaftElectionTimeout * 7) / 10) - } - - time.Sleep(RaftElectionTimeout) - cfg.setunreliable(false) - for i := 0; i < servers; i++ { - if cfg.rafts[i] == nil { - cfg.start1(i, cfg.applier) - } - cfg.connect(i) - } - - atomic.StoreInt32(&stop, 1) - - values := []int{} - for i := 0; i < ncli; i++ { - vv := <-cha[i] - if vv == nil { - t.Fatal("client failed") - } - values = append(values, vv...) - } - - time.Sleep(RaftElectionTimeout) - - lastIndex := cfg.one(rand.Int(), servers, true) - - really := make([]int, lastIndex+1) - for index := 1; index <= lastIndex; index++ { - v := cfg.wait(index, servers, -1) - if vi, ok := v.(int); ok { - really = append(really, vi) - } else { - t.Fatalf("not an int") - } - } - - for _, v1 := range values { - ok := false - for _, v2 := range really { - if v1 == v2 { - ok = true - } - } - if ok == false { - cfg.t.Fatalf("didn't find a value") - } - } - - cfg.end() -} - -func TestReliableChurn3C(t *testing.T) { - internalChurn(t, false) -} - -func TestUnreliableChurn3C(t *testing.T) { - internalChurn(t, true) -} - -const MAXLOGSIZE = 2000 - -func snapcommon(t *testing.T, name string, disconnect bool, reliable bool, crash bool) { - iters := 30 - servers := 3 - cfg := make_config(t, servers, !reliable, true) - defer cfg.cleanup() - - cfg.begin(name) - - cfg.one(rand.Int(), servers, true) - leader1 := cfg.checkOneLeader() - - for i := 0; i < iters; i++ { - victim := (leader1 + 1) % servers - sender := leader1 - if i%3 == 1 { - sender = (leader1 + 1) % servers - victim = leader1 - } - - if disconnect { - cfg.disconnect(victim) - cfg.one(rand.Int(), servers-1, true) - } - if crash { - cfg.crash1(victim) - cfg.one(rand.Int(), servers-1, true) - } - - // perhaps send enough to get a snapshot - nn := (SnapShotInterval / 2) + (rand.Int() % SnapShotInterval) - for i := 0; i < nn; i++ { - cfg.rafts[sender].Start(rand.Int()) - } - - // let applier threads catch up with the Start()'s - if disconnect == false && crash == false { - // make sure all followers have caught up, so that - // an InstallSnapshot RPC isn't required for - // TestSnapshotBasic3D(). - cfg.one(rand.Int(), servers, true) - } else { - cfg.one(rand.Int(), servers-1, true) - } - - if cfg.LogSize() >= MAXLOGSIZE { - cfg.t.Fatalf("Log size too large") - } - if disconnect { - // reconnect a follower, who maybe behind and - // needs to rceive a snapshot to catch up. - cfg.connect(victim) - cfg.one(rand.Int(), servers, true) - leader1 = cfg.checkOneLeader() - } - if crash { - cfg.start1(victim, cfg.applierSnap) - cfg.connect(victim) - cfg.one(rand.Int(), servers, true) - leader1 = cfg.checkOneLeader() - } - } - cfg.end() -} - -func TestSnapshotBasic3D(t *testing.T) { - snapcommon(t, "Test (3D): snapshots basic", false, true, false) -} - -func TestSnapshotInstall3D(t *testing.T) { - snapcommon(t, "Test (3D): install snapshots (disconnect)", true, true, false) -} - -func TestSnapshotInstallUnreliable3D(t *testing.T) { - snapcommon(t, "Test (3D): install snapshots (disconnect+unreliable)", - true, false, false) -} - -func TestSnapshotInstallCrash3D(t *testing.T) { - snapcommon(t, "Test (3D): install snapshots (crash)", false, true, true) -} - -func TestSnapshotInstallUnCrash3D(t *testing.T) { - snapcommon(t, "Test (3D): install snapshots (unreliable+crash)", false, false, true) -} - -// do the servers persist the snapshots, and -// restart using snapshot along with the -// tail of the log? -func TestSnapshotAllCrash3D(t *testing.T) { - servers := 3 - iters := 5 - cfg := make_config(t, servers, false, true) - defer cfg.cleanup() - - cfg.begin("Test (3D): crash and restart all servers") - - cfg.one(rand.Int(), servers, true) - - for i := 0; i < iters; i++ { - // perhaps enough to get a snapshot - nn := (SnapShotInterval / 2) + (rand.Int() % SnapShotInterval) - for i := 0; i < nn; i++ { - cfg.one(rand.Int(), servers, true) - } - - index1 := cfg.one(rand.Int(), servers, true) - - // crash all - for i := 0; i < servers; i++ { - cfg.crash1(i) - } - - // revive all - for i := 0; i < servers; i++ { - cfg.start1(i, cfg.applierSnap) - cfg.connect(i) - } - - index2 := cfg.one(rand.Int(), servers, true) - if index2 < index1+1 { - t.Fatalf("index decreased from %v to %v", index1, index2) - } - } - cfg.end() -} - -// do servers correctly initialize their in-memory copy of the snapshot, making -// sure that future writes to persistent state don't lose state? -func TestSnapshotInit3D(t *testing.T) { - servers := 3 - cfg := make_config(t, servers, false, true) - defer cfg.cleanup() - - cfg.begin("Test (3D): snapshot initialization after crash") - cfg.one(rand.Int(), servers, true) - - // enough ops to make a snapshot - nn := SnapShotInterval + 1 - for i := 0; i < nn; i++ { - cfg.one(rand.Int(), servers, true) - } - - // crash all - for i := 0; i < servers; i++ { - cfg.crash1(i) - } - - // revive all - for i := 0; i < servers; i++ { - cfg.start1(i, cfg.applierSnap) - cfg.connect(i) - } - - // a single op, to get something to be written back to persistent storage. - cfg.one(rand.Int(), servers, true) - - // crash all - for i := 0; i < servers; i++ { - cfg.crash1(i) - } - - // revive all - for i := 0; i < servers; i++ { - cfg.start1(i, cfg.applierSnap) - cfg.connect(i) - } - - // do another op to trigger potential bug - cfg.one(rand.Int(), servers, true) - cfg.end() -} diff --git a/src/raft/util.go b/src/raft/util.go deleted file mode 100644 index e064403..0000000 --- a/src/raft/util.go +++ /dev/null @@ -1,12 +0,0 @@ -package raft - -import "log" - -// Debugging -const Debug = false - -func DPrintf(format string, a ...interface{}) { - if Debug { - log.Printf(format, a...) - } -}