This commit is contained in:
Frans Kaashoek 2025-02-01 08:16:49 -05:00
parent 43a3ba2a00
commit 2f72ad504b
20 changed files with 2293 additions and 104 deletions

View File

@ -178,7 +178,8 @@ func TestOnePartition4A(t *testing.T) {
ver0 := ts.PutAtLeastOnce(ck, "1", "13", rpc.Tversion(0), -1) ver0 := ts.PutAtLeastOnce(ck, "1", "13", rpc.Tversion(0), -1)
p1, p2 := ts.Group(Gid).MakePartition() _, l := ts.Leader()
p1, p2 := ts.Group(Gid).MakePartition(l)
ts.Group(Gid).Partition(p1, p2) ts.Group(Gid).Partition(p1, p2)
ckp1 := ts.MakeClerkTo(p1) // connect ckp1 to p1 ckp1 := ts.MakeClerkTo(p1) // connect ckp1 to p1

View File

@ -5,7 +5,8 @@ import (
"6.5840/kvsrv1/rpc" "6.5840/kvsrv1/rpc"
"6.5840/labrpc" "6.5840/labrpc"
"6.5840/raft" "6.5840/raft1"
"6.5840/tester1"
) )
@ -51,7 +52,7 @@ type RSM struct {
// //
// MakeRSM() must return quickly, so it should start goroutines for // MakeRSM() must return quickly, so it should start goroutines for
// any long-running work. // any long-running work.
func MakeRSM(servers []*labrpc.ClientEnd, me int, persister *raft.Persister, maxraftstate int, sm StateMachine) *RSM { func MakeRSM(servers []*labrpc.ClientEnd, me int, persister *tester.Persister, maxraftstate int, sm StateMachine) *RSM {
rsm := &RSM{ rsm := &RSM{
me: me, me: me,
maxraftstate: maxraftstate, maxraftstate: maxraftstate,

View File

@ -6,9 +6,9 @@ import (
"sync" "sync"
"6.5840/labgob" "6.5840/labgob"
// "6.5840/kvtest1"
"6.5840/labrpc" "6.5840/labrpc"
"6.5840/raft" "6.5840/raft1"
"6.5840/tester1"
) )
type Inc struct { type Inc struct {
@ -26,7 +26,7 @@ type rsmSrv struct {
counter int counter int
} }
func makeRsmSrv(ts *Test, srv int, ends []*labrpc.ClientEnd, persister *raft.Persister, snapshot bool) *rsmSrv { func makeRsmSrv(ts *Test, srv int, ends []*labrpc.ClientEnd, persister *tester.Persister, snapshot bool) *rsmSrv {
//log.Printf("mksrv %d", srv) //log.Printf("mksrv %d", srv)
labgob.Register(Op{}) labgob.Register(Op{})
labgob.Register(Inc{}) labgob.Register(Inc{})

View File

@ -7,7 +7,7 @@ import (
"6.5840/kvsrv1/rpc" "6.5840/kvsrv1/rpc"
"6.5840/labrpc" "6.5840/labrpc"
"6.5840/raft" //"6.5840/raft1"
"6.5840/tester1" "6.5840/tester1"
) )
@ -31,7 +31,7 @@ func makeTest(t *testing.T, maxraftstate int) *Test {
maxraftstate: maxraftstate, maxraftstate: maxraftstate,
srvs: make([]*rsmSrv, NSRV), srvs: make([]*rsmSrv, NSRV),
} }
ts.Config = tester.MakeConfig(t, NSRV, true, maxraftstate, ts.mksrv) ts.Config = tester.MakeConfig(t, NSRV, true, ts.mksrv)
ts.g = ts.Group(tester.GRP0) ts.g = ts.Group(tester.GRP0)
return ts return ts
} }
@ -42,10 +42,10 @@ func (ts *Test) cleanup() {
ts.CheckTimeout() ts.CheckTimeout()
} }
func (ts *Test) mksrv(ends []*labrpc.ClientEnd, grp tester.Tgid, srv int, persister *raft.Persister, maxraftstate int) tester.IKVServer { func (ts *Test) mksrv(ends []*labrpc.ClientEnd, grp tester.Tgid, srv int, persister *tester.Persister) []tester.IService {
s := makeRsmSrv(ts, srv, ends, persister, false) s := makeRsmSrv(ts, srv, ends, persister, false)
ts.srvs[srv] = s ts.srvs[srv] = s
return s return []tester.IService{s.rsm.Raft()}
} }
func (ts *Test) one() *Rep { func (ts *Test) one() *Rep {

View File

@ -4,6 +4,8 @@ import (
"testing" "testing"
"6.5840/kvtest1" "6.5840/kvtest1"
"6.5840/labrpc"
"6.5840/raft1"
"6.5840/tester1" "6.5840/tester1"
) )
@ -22,7 +24,6 @@ type Test struct {
const Gid = tester.GRP0 const Gid = tester.GRP0
func MakeTest(t *testing.T, part string, nclients, nservers int, reliable bool, crash bool, partitions bool, maxraftstate int, randomkeys bool) *Test { func MakeTest(t *testing.T, part string, nclients, nservers int, reliable bool, crash bool, partitions bool, maxraftstate int, randomkeys bool) *Test {
cfg := tester.MakeConfig(t, nservers, reliable, maxraftstate, StartKVServer)
ts := &Test{ ts := &Test{
t: t, t: t,
part: part, part: part,
@ -33,17 +34,39 @@ func MakeTest(t *testing.T, part string, nclients, nservers int, reliable bool,
maxraftstate: maxraftstate, maxraftstate: maxraftstate,
randomkeys: randomkeys, randomkeys: randomkeys,
} }
cfg := tester.MakeConfig(t, nservers, reliable, ts.StartKVServer)
ts.Test = kvtest.MakeTest(t, cfg, randomkeys, ts) ts.Test = kvtest.MakeTest(t, cfg, randomkeys, ts)
ts.Begin(ts.makeTitle()) ts.Begin(ts.makeTitle())
return ts return ts
} }
func (ts *Test) StartKVServer(servers []*labrpc.ClientEnd, gid tester.Tgid, me int, persister *tester.Persister) []tester.IService {
return StartKVServer(servers, gid, me, persister, ts.maxraftstate)
}
func (ts *Test) MakeClerk() kvtest.IKVClerk { func (ts *Test) MakeClerk() kvtest.IKVClerk {
clnt := ts.Config.MakeClient() clnt := ts.Config.MakeClient()
ck := MakeClerk(clnt, ts.Group(Gid).SrvNames()) ck := MakeClerk(clnt, ts.Group(Gid).SrvNames())
return &kvtest.TestClerk{ck, clnt} return &kvtest.TestClerk{ck, clnt}
} }
func (ts *Test) Leader() (bool, int) {
for i, ss := range ts.Group(Gid).Services() {
for _, s := range ss {
switch r := s.(type) {
case *raft.Raft:
_, is_leader := r.GetState()
if is_leader {
return true, i
}
default:
}
}
}
return false, 0
}
func (ts *Test) DeleteClerk(ck kvtest.IKVClerk) { func (ts *Test) DeleteClerk(ck kvtest.IKVClerk) {
tck := ck.(*kvtest.TestClerk) tck := ck.(*kvtest.TestClerk)
ts.DeleteClient(tck.Clnt) ts.DeleteClient(tck.Clnt)

View File

@ -6,7 +6,6 @@ import (
"6.5840/kvsrv1/rpc" "6.5840/kvsrv1/rpc"
"6.5840/labrpc" "6.5840/labrpc"
"6.5840/raft"
"6.5840/tester1" "6.5840/tester1"
) )
@ -50,14 +49,9 @@ func (kv *KVServer) Put(args *rpc.PutArgs, reply *rpc.PutReply) {
func (kv *KVServer) Kill() { func (kv *KVServer) Kill() {
} }
// You can ignore for this lab
func (kv *KVServer) Raft() *raft.Raft {
return nil
}
// You can ignore all arguments; they are for replicated KVservers in lab 4 // You can ignore all arguments; they are for replicated KVservers in lab 4
func StartKVServer(ends []*labrpc.ClientEnd, gid tester.Tgid, srv int, persister *raft.Persister, maxraftstate int) tester.IKVServer { func StartKVServer(ends []*labrpc.ClientEnd, gid tester.Tgid, srv int, persister *tester.Persister) []tester.IService {
kv := MakeKVServer() kv := MakeKVServer()
return kv return []tester.IService{kv}
} }

View File

@ -15,7 +15,7 @@ type TestKV struct {
} }
func MakeTestKV(t *testing.T, reliable bool) *TestKV { func MakeTestKV(t *testing.T, reliable bool) *TestKV {
cfg := tester.MakeConfig(t, 1, reliable, -1, StartKVServer) cfg := tester.MakeConfig(t, 1, reliable, StartKVServer)
ts := &TestKV{ ts := &TestKV{
t: t, t: t,
reliable: reliable, reliable: reliable,

260
src/raft1/raft.go Normal file
View File

@ -0,0 +1,260 @@
package raft
//
// this is an outline of the API that raft must expose to
// the service (or tester). see comments below for
// each of these functions for more details.
//
// rf = Make(...)
// create a new Raft server.
// rf.Start(command interface{}) (index, term, isleader)
// start agreement on a new log entry
// rf.GetState() (term, isLeader)
// ask a Raft for its current term, and whether it thinks it is leader
// ApplyMsg
// each time a new entry is committed to the log, each Raft peer
// should send an ApplyMsg to the service (or tester)
// in the same server.
//
import (
// "bytes"
"math/rand"
"sync"
"sync/atomic"
"time"
// "6.5840/labgob"
"6.5840/labrpc"
"6.5840/tester1"
)
// as each Raft peer becomes aware that successive log entries are
// committed, the peer should send an ApplyMsg to the service (or
// tester) on the same server, via the applyCh passed to Make(). set
// CommandValid to true to indicate that the ApplyMsg contains a newly
// committed log entry.
//
// in part 3D you'll want to send other kinds of messages (e.g.,
// snapshots) on the applyCh, but set CommandValid to false for these
// other uses.
type ApplyMsg struct {
CommandValid bool
Command interface{}
CommandIndex int
// For 3D:
SnapshotValid bool
Snapshot []byte
SnapshotTerm int
SnapshotIndex int
}
// A Go object implementing a single Raft peer.
type Raft struct {
mu sync.Mutex // Lock to protect shared access to this peer's state
peers []*labrpc.ClientEnd // RPC end points of all peers
persister *tester.Persister // Object to hold this peer's persisted state
me int // this peer's index into peers[]
dead int32 // set by Kill()
// Your data here (3A, 3B, 3C).
// Look at the paper's Figure 2 for a description of what
// state a Raft server must maintain.
}
// return currentTerm and whether this server
// believes it is the leader.
func (rf *Raft) GetState() (int, bool) {
var term int
var isleader bool
// Your code here (3A).
return term, isleader
}
// save Raft's persistent state to stable storage,
// where it can later be retrieved after a crash and restart.
// see paper's Figure 2 for a description of what should be persistent.
// before you've implemented snapshots, you should pass nil as the
// second argument to persister.Save().
// after you've implemented snapshots, pass the current snapshot
// (or nil if there's not yet a snapshot).
func (rf *Raft) persist() {
// Your code here (3C).
// Example:
// w := new(bytes.Buffer)
// e := labgob.NewEncoder(w)
// e.Encode(rf.xxx)
// e.Encode(rf.yyy)
// raftstate := w.Bytes()
// rf.persister.Save(raftstate, nil)
}
// restore previously persisted state.
func (rf *Raft) readPersist(data []byte) {
if data == nil || len(data) < 1 { // bootstrap without any state?
return
}
// Your code here (3C).
// Example:
// r := bytes.NewBuffer(data)
// d := labgob.NewDecoder(r)
// var xxx
// var yyy
// if d.Decode(&xxx) != nil ||
// d.Decode(&yyy) != nil {
// error...
// } else {
// rf.xxx = xxx
// rf.yyy = yyy
// }
}
// the service says it has created a snapshot that has
// all info up to and including index. this means the
// service no longer needs the log through (and including)
// that index. Raft should now trim its log as much as possible.
func (rf *Raft) Snapshot(index int, snapshot []byte) {
// Your code here (3D).
}
// example RequestVote RPC arguments structure.
// field names must start with capital letters!
type RequestVoteArgs struct {
// Your data here (3A, 3B).
}
// example RequestVote RPC reply structure.
// field names must start with capital letters!
type RequestVoteReply struct {
// Your data here (3A).
}
// example RequestVote RPC handler.
func (rf *Raft) RequestVote(args *RequestVoteArgs, reply *RequestVoteReply) {
// Your code here (3A, 3B).
}
// example code to send a RequestVote RPC to a server.
// server is the index of the target server in rf.peers[].
// expects RPC arguments in args.
// fills in *reply with RPC reply, so caller should
// pass &reply.
// the types of the args and reply passed to Call() must be
// the same as the types of the arguments declared in the
// handler function (including whether they are pointers).
//
// The labrpc package simulates a lossy network, in which servers
// may be unreachable, and in which requests and replies may be lost.
// Call() sends a request and waits for a reply. If a reply arrives
// within a timeout interval, Call() returns true; otherwise
// Call() returns false. Thus Call() may not return for a while.
// A false return can be caused by a dead server, a live server that
// can't be reached, a lost request, or a lost reply.
//
// Call() is guaranteed to return (perhaps after a delay) *except* if the
// handler function on the server side does not return. Thus there
// is no need to implement your own timeouts around Call().
//
// look at the comments in ../labrpc/labrpc.go for more details.
//
// if you're having trouble getting RPC to work, check that you've
// capitalized all field names in structs passed over RPC, and
// that the caller passes the address of the reply struct with &, not
// the struct itself.
func (rf *Raft) sendRequestVote(server int, args *RequestVoteArgs, reply *RequestVoteReply) bool {
ok := rf.peers[server].Call("Raft.RequestVote", args, reply)
return ok
}
// the service using Raft (e.g. a k/v server) wants to start
// agreement on the next command to be appended to Raft's log. if this
// server isn't the leader, returns false. otherwise start the
// agreement and return immediately. there is no guarantee that this
// command will ever be committed to the Raft log, since the leader
// may fail or lose an election. even if the Raft instance has been killed,
// this function should return gracefully.
//
// the first return value is the index that the command will appear at
// if it's ever committed. the second return value is the current
// term. the third return value is true if this server believes it is
// the leader.
func (rf *Raft) Start(command interface{}) (int, int, bool) {
index := -1
term := -1
isLeader := true
// Your code here (3B).
return index, term, isLeader
}
// the tester doesn't halt goroutines created by Raft after each test,
// but it does call the Kill() method. your code can use killed() to
// check whether Kill() has been called. the use of atomic avoids the
// need for a lock.
//
// the issue is that long-running goroutines use memory and may chew
// up CPU time, perhaps causing later tests to fail and generating
// confusing debug output. any goroutine with a long-running loop
// should call killed() to check whether it should stop.
func (rf *Raft) Kill() {
atomic.StoreInt32(&rf.dead, 1)
// Your code here, if desired.
}
func (rf *Raft) killed() bool {
z := atomic.LoadInt32(&rf.dead)
return z == 1
}
func (rf *Raft) ticker() {
for rf.killed() == false {
// Your code here (3A)
// Check if a leader election should be started.
// pause for a random amount of time between 50 and 350
// milliseconds.
ms := 50 + (rand.Int63() % 300)
time.Sleep(time.Duration(ms) * time.Millisecond)
}
}
// the service or tester wants to create a Raft server. the ports
// of all the Raft servers (including this one) are in peers[]. this
// server's port is peers[me]. all the servers' peers[] arrays
// have the same order. persister is a place for this server to
// save its persistent state, and also initially holds the most
// recent saved state, if any. applyCh is a channel on which the
// tester or service expects Raft to send ApplyMsg messages.
// Make() must return quickly, so it should start goroutines
// for any long-running work.
func Make(peers []*labrpc.ClientEnd, me int,
persister *tester.Persister, applyCh chan ApplyMsg) *Raft {
rf := &Raft{}
rf.peers = peers
rf.persister = persister
rf.me = me
// Your initialization code here (3A, 3B, 3C).
// initialize from state persisted before a crash
rf.readPersist(persister.ReadRaftState())
// start ticker goroutine to start elections
go rf.ticker()
return rf
}

1209
src/raft1/raft_test.go Normal file

File diff suppressed because it is too large Load Diff

185
src/raft1/server.go Normal file
View File

@ -0,0 +1,185 @@
package raft
import (
"bytes"
"fmt"
"log"
"sync"
"6.5840/labgob"
"6.5840/labrpc"
"6.5840/tester1"
)
const (
SnapShotInterval = 10
)
type rfsrv struct {
ts *Test
me int
applyErr string // from apply channel readers
lastApplied int
persister *tester.Persister
mu sync.Mutex
raft *Raft
logs map[int]any // copy of each server's committed entries
}
func newRfsrv(ts *Test, srv int, ends []*labrpc.ClientEnd, persister *tester.Persister, snapshot bool) *rfsrv {
//log.Printf("mksrv %d", srv)
s := &rfsrv{
ts: ts,
me: srv,
logs: map[int]any{},
persister: persister,
}
applyCh := make(chan ApplyMsg)
s.raft = Make(ends, srv, persister, applyCh)
if snapshot {
snapshot := persister.ReadSnapshot()
if snapshot != nil && len(snapshot) > 0 {
// mimic KV server and process snapshot now.
// ideally Raft should send it up on applyCh...
err := s.ingestSnap(snapshot, -1)
if err != "" {
ts.t.Fatal(err)
}
}
go s.applierSnap(applyCh)
} else {
go s.applier(applyCh)
}
return s
}
func (rs *rfsrv) Kill() {
//log.Printf("rs kill %d", rs.me)
rs.raft = nil // tester will call Kill() on rs.raft
if rs.persister != nil {
// mimic KV server that saves its persistent state in case it
// restarts.
raftlog := rs.persister.ReadRaftState()
snapshot := rs.persister.ReadSnapshot()
rs.persister.Save(raftlog, snapshot)
}
}
func (rs *rfsrv) GetState() (int, bool) {
rs.mu.Lock()
defer rs.mu.Unlock()
return rs.raft.GetState()
}
func (rs *rfsrv) Raft() *Raft {
rs.mu.Lock()
defer rs.mu.Unlock()
return rs.raft
}
func (rs *rfsrv) Logs(i int) (any, bool) {
rs.mu.Lock()
defer rs.mu.Unlock()
v, ok := rs.logs[i]
return v, ok
}
// applier reads message from apply ch and checks that they match the log
// contents
func (rs *rfsrv) applier(applyCh chan ApplyMsg) {
for m := range applyCh {
if m.CommandValid == false {
// ignore other types of ApplyMsg
} else {
err_msg, prevok := rs.ts.checkLogs(rs.me, m)
if m.CommandIndex > 1 && prevok == false {
err_msg = fmt.Sprintf("server %v apply out of order %v", rs.me, m.CommandIndex)
}
if err_msg != "" {
log.Fatalf("apply error: %v", err_msg)
rs.applyErr = err_msg
// keep reading after error so that Raft doesn't block
// holding locks...
}
}
}
}
// periodically snapshot raft state
func (rs *rfsrv) applierSnap(applyCh chan ApplyMsg) {
if rs.raft == nil {
return // ???
}
for m := range applyCh {
err_msg := ""
if m.SnapshotValid {
err_msg = rs.ingestSnap(m.Snapshot, m.SnapshotIndex)
} else if m.CommandValid {
if m.CommandIndex != rs.lastApplied+1 {
err_msg = fmt.Sprintf("server %v apply out of order, expected index %v, got %v", rs.me, rs.lastApplied+1, m.CommandIndex)
}
if err_msg == "" {
var prevok bool
err_msg, prevok = rs.ts.checkLogs(rs.me, m)
if m.CommandIndex > 1 && prevok == false {
err_msg = fmt.Sprintf("server %v apply out of order %v", rs.me, m.CommandIndex)
}
}
rs.lastApplied = m.CommandIndex
if (m.CommandIndex+1)%SnapShotInterval == 0 {
w := new(bytes.Buffer)
e := labgob.NewEncoder(w)
e.Encode(m.CommandIndex)
var xlog []any
for j := 0; j <= m.CommandIndex; j++ {
xlog = append(xlog, rs.logs[j])
}
e.Encode(xlog)
rs.raft.Snapshot(m.CommandIndex, w.Bytes())
}
} else {
// Ignore other types of ApplyMsg.
}
if err_msg != "" {
log.Fatalf("apply error: %v", err_msg)
rs.applyErr = err_msg
// keep reading after error so that Raft doesn't block
// holding locks...
}
}
}
// returns "" or error string
func (rs *rfsrv) ingestSnap(snapshot []byte, index int) string {
rs.mu.Lock()
defer rs.mu.Unlock()
if snapshot == nil {
log.Fatalf("nil snapshot")
return "nil snapshot"
}
r := bytes.NewBuffer(snapshot)
d := labgob.NewDecoder(r)
var lastIncludedIndex int
var xlog []any
if d.Decode(&lastIncludedIndex) != nil ||
d.Decode(&xlog) != nil {
log.Fatalf("snapshot decode error")
return "snapshot Decode() error"
}
if index != -1 && index != lastIncludedIndex {
err := fmt.Sprintf("server %v snapshot doesn't match m.SnapshotIndex", rs.me)
return err
}
rs.logs = map[int]any{}
for j := 0; j < len(xlog); j++ {
rs.logs[j] = xlog[j]
}
rs.lastApplied = lastIncludedIndex
return ""
}

266
src/raft1/test.go Normal file
View File

@ -0,0 +1,266 @@
package raft
import (
"fmt"
//log
"math/rand"
"sync"
"sync/atomic"
"testing"
"time"
"6.5840/labrpc"
"6.5840/tester1"
)
type Test struct {
*tester.Config
t *testing.T
n int
g *tester.ServerGrp
finished int32
mu sync.Mutex
srvs []*rfsrv
maxIndex int
snapshot bool
}
func makeTest(t *testing.T, n int, reliable bool, snapshot bool) *Test {
ts := &Test{
t: t,
n: n,
srvs: make([]*rfsrv, n),
snapshot: snapshot,
}
ts.Config = tester.MakeConfig(t, n, reliable, ts.mksrv)
ts.Config.SetLongDelays(true)
ts.g = ts.Group(tester.GRP0)
return ts
}
func (ts *Test) cleanup() {
atomic.StoreInt32(&ts.finished, 1)
ts.End()
ts.Config.Cleanup()
ts.CheckTimeout()
}
func (ts *Test) mksrv(ends []*labrpc.ClientEnd, grp tester.Tgid, srv int, persister *tester.Persister) []tester.IService {
s := newRfsrv(ts, srv, ends, persister, ts.snapshot)
ts.srvs[srv] = s
return []tester.IService{s, s.raft}
}
func (ts *Test) restart(i int) {
ts.g.StartServer(i) // which will call mksrv to make a new server
ts.Group(tester.GRP0).ConnectAll()
}
func (ts *Test) checkOneLeader() int {
for iters := 0; iters < 10; iters++ {
ms := 450 + (rand.Int63() % 100)
time.Sleep(time.Duration(ms) * time.Millisecond)
leaders := make(map[int][]int)
for i := 0; i < ts.n; i++ {
if ts.g.IsConnected(i) {
if term, leader := ts.srvs[i].GetState(); leader {
leaders[term] = append(leaders[term], i)
}
}
}
lastTermWithLeader := -1
for term, leaders := range leaders {
if len(leaders) > 1 {
ts.Fatalf("term %d has %d (>1) leaders", term, len(leaders))
}
if term > lastTermWithLeader {
lastTermWithLeader = term
}
}
if len(leaders) != 0 {
return leaders[lastTermWithLeader][0]
}
}
ts.Fatalf("expected one leader, got none")
return -1
}
func (ts *Test) checkTerms() int {
term := -1
for i := 0; i < ts.n; i++ {
if ts.g.IsConnected(i) {
xterm, _ := ts.srvs[i].GetState()
if term == -1 {
term = xterm
} else if term != xterm {
ts.Fatalf("servers disagree on term")
}
}
}
return term
}
func (ts *Test) checkLogs(i int, m ApplyMsg) (string, bool) {
ts.mu.Lock()
defer ts.mu.Unlock()
err_msg := ""
v := m.Command
me := ts.srvs[i]
for j, rs := range ts.srvs {
if old, oldok := rs.Logs(m.CommandIndex); oldok && old != v {
//log.Printf("%v: log %v; server %v\n", i, me.logs, rs.logs)
// some server has already committed a different value for this entry!
err_msg = fmt.Sprintf("commit index=%v server=%v %v != server=%v %v",
m.CommandIndex, i, m.Command, j, old)
}
}
_, prevok := me.logs[m.CommandIndex-1]
me.logs[m.CommandIndex] = v
if m.CommandIndex > ts.maxIndex {
ts.maxIndex = m.CommandIndex
}
return err_msg, prevok
}
// check that none of the connected servers
// thinks it is the leader.
func (ts *Test) checkNoLeader() {
for i := 0; i < ts.n; i++ {
if ts.g.IsConnected(i) {
_, is_leader := ts.srvs[i].GetState()
if is_leader {
ts.Fatalf("expected no leader among connected servers, but %v claims to be leader", i)
}
}
}
}
// how many servers think a log entry is committed?
func (ts *Test) nCommitted(index int) (int, any) {
ts.mu.Lock()
defer ts.mu.Unlock()
count := 0
var cmd any = nil
for _, rs := range ts.srvs {
if rs.applyErr != "" {
ts.t.Fatal(rs.applyErr)
}
cmd1, ok := rs.Logs(index)
if ok {
if count > 0 && cmd != cmd1 {
ts.Fatalf("committed values do not match: index %v, %v, %v",
index, cmd, cmd1)
}
count += 1
cmd = cmd1
}
}
return count, cmd
}
// do a complete agreement.
// it might choose the wrong leader initially,
// and have to re-submit after giving up.
// entirely gives up after about 10 seconds.
// indirectly checks that the servers agree on the
// same value, since nCommitted() checks this,
// as do the threads that read from applyCh.
// returns index.
// if retry==true, may submit the command multiple
// times, in case a leader fails just after Start().
// if retry==false, calls Start() only once, in order
// to simplify the early Lab 3B tests.
func (ts *Test) one(cmd any, expectedServers int, retry bool) int {
t0 := time.Now()
starts := 0
for time.Since(t0).Seconds() < 10 && ts.checkFinished() == false {
// try all the servers, maybe one is the leader.
index := -1
for range ts.srvs {
starts = (starts + 1) % len(ts.srvs)
var rf *Raft
if ts.g.IsConnected(starts) {
rf = ts.srvs[starts].raft
}
if rf != nil {
//log.Printf("peer %d Start %v", starts, cmd)
index1, _, ok := rf.Start(cmd)
if ok {
index = index1
break
}
}
}
if index != -1 {
// somebody claimed to be the leader and to have
// submitted our command; wait a while for agreement.
t1 := time.Now()
for time.Since(t1).Seconds() < 2 {
nd, cmd1 := ts.nCommitted(index)
if nd > 0 && nd >= expectedServers {
// committed
if cmd1 == cmd {
// and it was the command we submitted.
return index
}
}
time.Sleep(20 * time.Millisecond)
}
if retry == false {
ts.Fatalf("one(%v) failed to reach agreement", cmd)
}
} else {
time.Sleep(50 * time.Millisecond)
}
}
if ts.checkFinished() == false {
ts.Fatalf("one(%v) failed to reach agreement", cmd)
}
return -1
}
func (ts *Test) checkFinished() bool {
z := atomic.LoadInt32(&ts.finished)
return z != 0
}
// wait for at least n servers to commit.
// but don't wait forever.
func (ts *Test) wait(index int, n int, startTerm int) any {
to := 10 * time.Millisecond
for iters := 0; iters < 30; iters++ {
nd, _ := ts.nCommitted(index)
if nd >= n {
break
}
time.Sleep(to)
if to < time.Second {
to *= 2
}
if startTerm > -1 {
for _, rs := range ts.srvs {
if t, _ := rs.raft.GetState(); t > startTerm {
// someone has moved on
// can no longer guarantee that we'll "win"
return -1
}
}
}
}
nd, cmd := ts.nCommitted(index)
if nd < n {
ts.Fatalf("only %d decided for index %d; wanted %d",
nd, index, n)
}
return cmd
}

164
src/raft1/test.out Normal file
View File

@ -0,0 +1,164 @@
=== RUN TestInitialElection3A
Test (3A): initial election (reliable network)...
... Passed -- 3.9 3 38 0
--- PASS: TestInitialElection3A (3.95s)
=== RUN TestReElection3A
Test (3A): election after network failure (reliable network)...
... Passed -- 6.4 3 68 0
--- PASS: TestReElection3A (6.43s)
=== RUN TestManyElections3A
Test (3A): multiple elections (reliable network)...
... Passed -- 9.7 7 414 0
--- PASS: TestManyElections3A (9.66s)
=== RUN TestBasicAgree3B
Test (3B): basic agreement (reliable network)...
... Passed -- 1.9 3 18 0
--- PASS: TestBasicAgree3B (1.91s)
=== RUN TestRPCBytes3B
Test (3B): RPC byte count (reliable network)...
... Passed -- 3.5 3 50 0
--- PASS: TestRPCBytes3B (3.50s)
=== RUN TestFollowerFailure3B
Test (3B): test progressive failure of followers (reliable network)...
... Passed -- 5.9 3 60 0
--- PASS: TestFollowerFailure3B (5.90s)
=== RUN TestLeaderFailure3B
Test (3B): test failure of leaders (reliable network)...
... Passed -- 6.5 3 113 0
--- PASS: TestLeaderFailure3B (6.52s)
=== RUN TestFailAgree3B
Test (3B): agreement after follower reconnects (reliable network)...
... Passed -- 6.4 3 68 0
--- PASS: TestFailAgree3B (6.43s)
=== RUN TestFailNoAgree3B
Test (3B): no agreement if too many followers disconnect (reliable network)...
... Passed -- 4.7 5 116 0
--- PASS: TestFailNoAgree3B (4.69s)
=== RUN TestConcurrentStarts3B
Test (3B): concurrent Start()s (reliable network)...
... Passed -- 2.1 3 20 0
--- PASS: TestConcurrentStarts3B (2.06s)
=== RUN TestRejoin3B
Test (3B): rejoin of partitioned leader (reliable network)...
... Passed -- 7.9 3 102 0
--- PASS: TestRejoin3B (7.91s)
=== RUN TestBackup3B
Test (3B): leader backs up quickly over incorrect follower logs (reliable network)...
... Passed -- 26.0 5 944 0
--- PASS: TestBackup3B (26.04s)
=== RUN TestCount3B
Test (3B): RPC counts aren't too high (reliable network)...
... Passed -- 3.5 3 34 0
--- PASS: TestCount3B (3.55s)
=== RUN TestPersist13C
Test (3C): basic persistence (reliable network)...
... Passed -- 6.8 3 58 0
--- PASS: TestPersist13C (6.77s)
=== RUN TestPersist23C
Test (3C): more persistence (reliable network)...
... Passed -- 21.9 5 337 0
--- PASS: TestPersist23C (21.89s)
=== RUN TestPersist33C
Test (3C): partitioned leader and one follower crash, leader restarts (reliable network)...
... Passed -- 3.6 3 32 0
--- PASS: TestPersist33C (3.63s)
=== RUN TestFigure83C
Test (3C): Figure 8 (reliable netwTestFigure83Cork)...
goroutine 7711 [running]:
runtime/debug.Stack()
/usr/lib/go/src/runtime/debug/stack.go:26 +0x67
runtime/debug.PrintStack()
/usr/lib/go/src/runtime/debug/stack.go:18 +0x1d
6.5840/tester1.(*Config).Fatalf(0xc0001d0c00, {0x7d12ea, 0x21}, {0xc000519df0, 0x1, 0x1})
/home/kaashoek/classes/65840-2025/staff/mygo/src/tester1/config.go:145 +0x3b
6.5840/raft1.(*Test).one(0xc0001d0ba0, {0x777260, 0xc0003a9bd8}, 0x5, 0x1)
/home/kaashoek/classes/65840-2025/staff/mygo/src/raft1/test.go:227 +0x227
6.5840/raft1.TestFigure83C(0xc00030cea0)
/home/kaashoek/classes/65840-2025/staff/mygo/src/raft1/raft_test.go:824 +0x3f0
testing.tRunner(0xc00030cea0, 0x7db020)
/usr/lib/go/src/testing/testing.go:1690 +0x227
created by testing.(*T).Run in goroutine 1
/usr/lib/go/src/testing/testing.go:1743 +0x826
config.go:146: one(3082010368482628662) failed to reach agreement
--- FAIL: TestFigure83C (59.13s)
=== RUN TestUnreliableAgree3C
Test (3C): unreliable agreement (unreliable network)...
... Passed -- 4.4 5 245 0
--- PASS: TestUnreliableAgree3C (4.42s)
=== RUN TestFigure8Unreliable3C
Test (3C): Figure 8 (unreliable) (unreliable network)...
... Passed -- 34.3 5 845 0
--- PASS: TestFigure8Unreliable3C (34.26s)
=== RUN TestReliableChurn3C
Test (3C): churn (reliable network)...
goroutine 21747 [running]:
runtime/debug.Stack()
/usr/lib/go/src/runtime/debug/stack.go:26 +0x67
runtime/debug.PrintStack()
/usr/lib/go/src/runtime/debug/stack.go:18 +0x1d
6.5840/tester1.(*Config).Fatalf(0xc000845d40, {0x7d12ea, 0x21}, {0xc000519cd0, 0x1, 0x1})
/home/kaashoek/classes/65840-2025/staff/mygo/src/tester1/config.go:145 +0x3b
6.5840/raft1.(*Test).one(0xc000845ce0, {0x777260, 0xc000735788}, 0x5, 0x1)
/home/kaashoek/classes/65840-2025/staff/mygo/src/raft1/test.go:227 +0x227
6.5840/raft1.internalChurn(0xc000258000, 0x1)
/home/kaashoek/classes/65840-2025/staff/mygo/src/raft1/raft_test.go:1028 +0xbd0
6.5840/raft1.TestReliableChurn3C(0xc000258000)
/home/kaashoek/classes/65840-2025/staff/mygo/src/raft1/raft_test.go:1055 +0x2c
testing.tRunner(0xc000258000, 0x7db080)
/usr/lib/go/src/testing/testing.go:1690 +0x227
created by testing.(*T).Run in goroutine 1
/usr/lib/go/src/testing/testing.go:1743 +0x826
config.go:146: one(4060683027750831645) failed to reach agreement
--- FAIL: TestReliableChurn3C (26.10s)
=== RUN TestUnreliableChurn3C
Test (3C): unreliable churn (unreliable network)...
goroutine 26074 [running]:
runtime/debug.Stack()
/usr/lib/go/src/runtime/debug/stack.go:26 +0x67
runtime/debug.PrintStack()
/usr/lib/go/src/runtime/debug/stack.go:18 +0x1d
6.5840/tester1.(*Config).Fatalf(0xc00074db00, {0x7d12ea, 0x21}, {0xc000519cd0, 0x1, 0x1})
/home/kaashoek/classes/65840-2025/staff/mygo/src/tester1/config.go:145 +0x3b
6.5840/raft1.(*Test).one(0xc00074daa0, {0x777260, 0xc000634008}, 0x5, 0x1)
/home/kaashoek/classes/65840-2025/staff/mygo/src/raft1/test.go:227 +0x227
6.5840/raft1.internalChurn(0xc0001c2000, 0x0)
/home/kaashoek/classes/65840-2025/staff/mygo/src/raft1/raft_test.go:1028 +0xbd0
6.5840/raft1.TestUnreliableChurn3C(0xc0001c2000)
/home/kaashoek/classes/65840-2025/staff/mygo/src/raft1/raft_test.go:1059 +0x29
testing.tRunner(0xc0001c2000, 0x7db0c8)
/usr/lib/go/src/testing/testing.go:1690 +0x227
created by testing.(*T).Run in goroutine 1
/usr/lib/go/src/testing/testing.go:1743 +0x826
config.go:146: one(8736026451642885096) failed to reach agreement
--- FAIL: TestUnreliableChurn3C (26.11s)
=== RUN TestSnapshotBasic3D
Test (3D): snapshots basic (reliable network)...
... Passed -- 11.0 3 201 0
--- PASS: TestSnapshotBasic3D (11.04s)
=== RUN TestSnapshotInstall3D
Test (3D): install snapshots (disconnect) (reliable network)...
... Passed -- 65.5 3 909 0
--- PASS: TestSnapshotInstall3D (65.46s)
=== RUN TestSnapshotInstallUnreliable3D
Test (3D): install snapshots (disconnect) (unreliable network)...
... Passed -- 88.1 3 1159 0
--- PASS: TestSnapshotInstallUnreliable3D (88.10s)
=== RUN TestSnapshotInstallCrash3D
Test (3D): install snapshots (crash) (reliable network)...
... Passed -- 53.4 3 590 0
--- PASS: TestSnapshotInstallCrash3D (53.38s)
=== RUN TestSnapshotInstallUnCrash3D
Test (3D): install snapshots (crash) (unreliable network)...
... Passed -- 65.6 3 690 0
--- PASS: TestSnapshotInstallUnCrash3D (65.60s)
=== RUN TestSnapshotAllCrash3D
Test (3D): crash and restart all servers (unreliable network)...
... Passed -- 23.2 3 336 0
--- PASS: TestSnapshotAllCrash3D (23.20s)
=== RUN TestSnapshotInit3D
Test (3D): snapshot initialization after crash (unreliable network)...
... Passed -- 6.3 3 75 0
--- PASS: TestSnapshotInit3D (6.34s)
FAIL
exit status 1
FAIL 6.5840/raft1 584.014s

12
src/raft1/util.go Normal file
View File

@ -0,0 +1,12 @@
package raft
import "log"
// Debugging
const Debug = false
func DPrintf(format string, a ...interface{}) {
if Debug {
log.Printf(format, a...)
}
}

View File

@ -3,7 +3,7 @@ package shardcfg
import ( import (
"testing" "testing"
"6.5840/kvtest1" "6.5840/tester1"
) )
func check_same_config(t *testing.T, c1 ShardConfig, c2 ShardConfig) { func check_same_config(t *testing.T, c1 ShardConfig, c2 ShardConfig) {
@ -37,13 +37,13 @@ func TestBasic(t *testing.T) {
Gid2 = 2 Gid2 = 2
) )
cfg := MakeShardConfig() cfg := MakeShardConfig()
cfg.CheckConfig(t, []kvtest.Tgid{}) cfg.CheckConfig(t, []tester.Tgid{})
cfg.JoinBalance(map[kvtest.Tgid][]string{Gid1: []string{"x", "y", "z"}}) cfg.JoinBalance(map[tester.Tgid][]string{Gid1: []string{"x", "y", "z"}})
cfg.CheckConfig(t, []kvtest.Tgid{Gid1}) cfg.CheckConfig(t, []tester.Tgid{Gid1})
cfg.JoinBalance(map[kvtest.Tgid][]string{Gid2: []string{"a", "b", "c"}}) cfg.JoinBalance(map[tester.Tgid][]string{Gid2: []string{"a", "b", "c"}})
cfg.CheckConfig(t, []kvtest.Tgid{Gid1, Gid2}) cfg.CheckConfig(t, []tester.Tgid{Gid1, Gid2})
sa1 := cfg.Groups[Gid1] sa1 := cfg.Groups[Gid1]
if len(sa1) != 3 || sa1[0] != "x" || sa1[1] != "y" || sa1[2] != "z" { if len(sa1) != 3 || sa1[0] != "x" || sa1[1] != "y" || sa1[2] != "z" {
@ -54,9 +54,9 @@ func TestBasic(t *testing.T) {
t.Fatalf("wrong servers for gid %v: %v\n", Gid2, sa2) t.Fatalf("wrong servers for gid %v: %v\n", Gid2, sa2)
} }
cfg.LeaveBalance([]kvtest.Tgid{Gid1}) cfg.LeaveBalance([]tester.Tgid{Gid1})
cfg.CheckConfig(t, []kvtest.Tgid{Gid2}) cfg.CheckConfig(t, []tester.Tgid{Gid2})
cfg.LeaveBalance([]kvtest.Tgid{Gid2}) cfg.LeaveBalance([]tester.Tgid{Gid2})
cfg.CheckConfig(t, []kvtest.Tgid{}) cfg.CheckConfig(t, []tester.Tgid{})
} }

View File

@ -8,7 +8,6 @@ import (
"6.5840/kvsrv1/rpc" "6.5840/kvsrv1/rpc"
"6.5840/labgob" "6.5840/labgob"
"6.5840/labrpc" "6.5840/labrpc"
"6.5840/raft"
"6.5840/shardkv1/shardgrp/shardrpc" "6.5840/shardkv1/shardgrp/shardrpc"
) )
@ -69,11 +68,6 @@ func (kv *KVServer) Kill() {
// Your code here, if desired. // Your code here, if desired.
} }
// Return kv's raft struct
func (kv *KVServer) Raft() *raft.Raft {
return kv.rsm.Raft()
}
func (kv *KVServer) killed() bool { func (kv *KVServer) killed() bool {
z := atomic.LoadInt32(&kv.dead) z := atomic.LoadInt32(&kv.dead)
return z == 1 return z == 1
@ -81,7 +75,7 @@ func (kv *KVServer) killed() bool {
// StartKVServer() and MakeRSM() must return quickly, so they should // StartKVServer() and MakeRSM() must return quickly, so they should
// start goroutines for any long-running work. // start goroutines for any long-running work.
func StartKVServer(servers []*labrpc.ClientEnd, gid tester.Tgid, me int, persister *raft.Persister, maxraftstate int) tester.IKVServer { func StartKVServer(servers []*labrpc.ClientEnd, gid tester.Tgid, me int, persister *tester.Persister, maxraftstate int) []tester.IService {
// call labgob.Register on structures you want // call labgob.Register on structures you want
// Go's RPC library to marshall/unmarshall. // Go's RPC library to marshall/unmarshall.
labgob.Register(shardrpc.PutArgs{}) labgob.Register(shardrpc.PutArgs{})
@ -95,5 +89,5 @@ func StartKVServer(servers []*labrpc.ClientEnd, gid tester.Tgid, me int, persist
kv.rsm = rsm.MakeRSM(servers, me, persister, maxraftstate, kv) kv.rsm = rsm.MakeRSM(servers, me, persister, maxraftstate, kv)
// Your code here // Your code here
return kv return []tester.IService{kv, kv.rsm.Raft()}
} }

View File

@ -25,6 +25,7 @@ type Test struct {
sck *shardctrler.ShardCtrlerClerk sck *shardctrler.ShardCtrlerClerk
part string part string
maxraftstate int
mu sync.Mutex mu sync.Mutex
ngid tester.Tgid ngid tester.Tgid
} }
@ -38,17 +39,22 @@ const (
// Setup a kvraft group (group 0) for the shard controller and make // Setup a kvraft group (group 0) for the shard controller and make
// the controller clerk. // the controller clerk.
func MakeTest(t *testing.T, part string, reliable, randomkeys bool) *Test { func MakeTest(t *testing.T, part string, reliable, randomkeys bool) *Test {
cfg := tester.MakeConfig(t, NSRV, reliable, -1, kvraft.StartKVServer)
ts := &Test{ ts := &Test{
ngid: shardcfg.Gid1 + 1, // Gid1 is in use ngid: shardcfg.Gid1 + 1, // Gid1 is in use
t: t, t: t,
maxraftstate: -1,
} }
cfg := tester.MakeConfig(t, NSRV, reliable, ts.StartKVServerControler)
ts.Test = kvtest.MakeTest(t, cfg, randomkeys, ts) ts.Test = kvtest.MakeTest(t, cfg, randomkeys, ts)
ts.sck = ts.makeShardCtrlerClerk() ts.sck = ts.makeShardCtrlerClerk()
ts.Begin(part) ts.Begin(part)
return ts return ts
} }
func (ts *Test) StartKVServerControler(servers []*labrpc.ClientEnd, gid tester.Tgid, me int, persister *tester.Persister) []tester.IService {
return kvraft.StartKVServer(servers, gid, me, persister, ts.maxraftstate)
}
func (ts *Test) MakeClerk() kvtest.IKVClerk { func (ts *Test) MakeClerk() kvtest.IKVClerk {
clnt := ts.Config.MakeClient() clnt := ts.Config.MakeClient()
ck := MakeClerk(clnt, ts.makeQueryClerk()) ck := MakeClerk(clnt, ts.makeQueryClerk())
@ -102,7 +108,7 @@ func (ts *Test) groups(n int) []tester.Tgid {
// itself to own all shards. // itself to own all shards.
func (ts *Test) setupKVService() tester.Tgid { func (ts *Test) setupKVService() tester.Tgid {
scfg := shardcfg.MakeShardConfig() scfg := shardcfg.MakeShardConfig()
ts.Config.MakeGroupStart(shardcfg.Gid1, NSRV, -1, shardgrp.StartKVServer) ts.Config.MakeGroupStart(shardcfg.Gid1, NSRV, ts.StartKVServerShard)
scfg.JoinBalance(map[tester.Tgid][]string{shardcfg.Gid1: ts.Group(shardcfg.Gid1).SrvNames()}) scfg.JoinBalance(map[tester.Tgid][]string{shardcfg.Gid1: ts.Group(shardcfg.Gid1).SrvNames()})
if err := ts.sck.Init(scfg); err != rpc.OK { if err := ts.sck.Init(scfg); err != rpc.OK {
ts.t.Fatalf("Init err %v", err) ts.t.Fatalf("Init err %v", err)
@ -111,9 +117,13 @@ func (ts *Test) setupKVService() tester.Tgid {
return shardcfg.Gid1 return shardcfg.Gid1
} }
func (ts *Test) StartKVServerShard(servers []*labrpc.ClientEnd, gid tester.Tgid, me int, persister *tester.Persister) []tester.IService {
return shardgrp.StartKVServer(servers, gid, me, persister, ts.maxraftstate)
}
func (ts *Test) joinGroups(sck *shardctrler.ShardCtrlerClerk, gids []tester.Tgid) rpc.Err { func (ts *Test) joinGroups(sck *shardctrler.ShardCtrlerClerk, gids []tester.Tgid) rpc.Err {
for i, gid := range gids { for i, gid := range gids {
ts.Config.MakeGroupStart(gid, NSRV, -1, shardgrp.StartKVServer) ts.Config.MakeGroupStart(gid, NSRV, ts.StartKVServerShard)
if err := sck.Join(gid, ts.Group(gid).SrvNames()); err != rpc.OK { if err := sck.Join(gid, ts.Group(gid).SrvNames()); err != rpc.OK {
return err return err
} }
@ -141,11 +151,11 @@ func (ts *Test) checkLogs(gids []tester.Tgid) {
for _, gid := range gids { for _, gid := range gids {
n := ts.Group(gid).LogSize() n := ts.Group(gid).LogSize()
s := ts.Group(gid).SnapshotSize() s := ts.Group(gid).SnapshotSize()
if ts.Group(gid).Maxraftstate >= 0 && n > 8*ts.Group(gid).Maxraftstate { if ts.maxraftstate >= 0 && n > 8*ts.maxraftstate {
ts.t.Fatalf("persister.RaftStateSize() %v, but maxraftstate %v", ts.t.Fatalf("persister.RaftStateSize() %v, but maxraftstate %v",
n, ts.Group(gid).Maxraftstate) n, ts.maxraftstate)
} }
if ts.Group(gid).Maxraftstate < 0 && s > 0 { if ts.maxraftstate < 0 && s > 0 {
ts.t.Fatalf("maxraftstate is -1, but snapshot is non-empty!") ts.t.Fatalf("maxraftstate is -1, but snapshot is non-empty!")
} }

View File

@ -15,16 +15,10 @@ import (
"time" "time"
"6.5840/labrpc" "6.5840/labrpc"
"6.5840/raft"
) )
const GRP0 = 0 const GRP0 = 0
type IKVServer interface {
Raft() *raft.Raft
Kill()
}
type Config struct { type Config struct {
*Clnts // The clnts in the test *Clnts // The clnts in the test
*Groups // The server groups in the test *Groups // The server groups in the test
@ -39,7 +33,7 @@ type Config struct {
ops int32 // number of clerk get/put/append method calls ops int32 // number of clerk get/put/append method calls
} }
func MakeConfig(t *testing.T, n int, reliable bool, maxraftstate int, mks FstartServer) *Config { func MakeConfig(t *testing.T, n int, reliable bool, mks FstartServer) *Config {
ncpu_once.Do(func() { ncpu_once.Do(func() {
if runtime.NumCPU() < 2 { if runtime.NumCPU() < 2 {
fmt.Printf("warning: only one CPU, which may conceal locking bugs\n") fmt.Printf("warning: only one CPU, which may conceal locking bugs\n")
@ -51,7 +45,7 @@ func MakeConfig(t *testing.T, n int, reliable bool, maxraftstate int, mks Fstart
cfg.t = t cfg.t = t
cfg.net = labrpc.MakeNetwork() cfg.net = labrpc.MakeNetwork()
cfg.Groups = newGroups(cfg.net) cfg.Groups = newGroups(cfg.net)
cfg.MakeGroupStart(GRP0, n, maxraftstate, mks) cfg.MakeGroupStart(GRP0, n, mks)
cfg.Clnts = makeClnts(cfg.net) cfg.Clnts = makeClnts(cfg.net)
cfg.start = time.Now() cfg.start = time.Now()
@ -87,8 +81,8 @@ func (cfg *Config) Cleanup() {
cfg.CheckTimeout() cfg.CheckTimeout()
} }
func (cfg *Config) MakeGroupStart(gid Tgid, nsrv, maxraftstate int, mks FstartServer) { func (cfg *Config) MakeGroupStart(gid Tgid, nsrv int, mks FstartServer) {
cfg.MakeGroup(gid, nsrv, maxraftstate, mks) cfg.MakeGroup(gid, nsrv, mks)
cfg.Group(gid).StartServers() cfg.Group(gid).StartServers()
} }

View File

@ -6,12 +6,18 @@ import (
"sync" "sync"
"6.5840/labrpc" "6.5840/labrpc"
"6.5840/raft"
) )
type Tgid int type Tgid int
type FstartServer func(ends []*labrpc.ClientEnd, grp Tgid, srv int, persister *raft.Persister, maxraftstate int) IKVServer // A service must support Kill(); the tester will Kill()
// on service returned by FstartServer()
type IService interface {
Kill()
}
// Start server and return the services to register with labrpc
type FstartServer func(ends []*labrpc.ClientEnd, grp Tgid, srv int, persister *Persister) []IService
// Each server has a name: i'th server of group gid. If there is only a single // Each server has a name: i'th server of group gid. If there is only a single
// server, it its gid = 0 and its i is 0. // server, it its gid = 0 and its i is 0.
@ -31,11 +37,11 @@ func newGroups(net *labrpc.Network) *Groups {
return &Groups{net: net, grps: make(map[Tgid]*ServerGrp)} return &Groups{net: net, grps: make(map[Tgid]*ServerGrp)}
} }
func (gs *Groups) MakeGroup(gid Tgid, nsrv, maxraftstate int, mks FstartServer) { func (gs *Groups) MakeGroup(gid Tgid, nsrv int, mks FstartServer) {
gs.mu.Lock() gs.mu.Lock()
defer gs.mu.Unlock() defer gs.mu.Unlock()
gs.grps[gid] = makeSrvGrp(gs.net, gid, nsrv, maxraftstate, mks) gs.grps[gid] = makeSrvGrp(gs.net, gid, nsrv, mks)
} }
func (gs *Groups) lookupGroup(gid Tgid) *ServerGrp { func (gs *Groups) lookupGroup(gid Tgid) *ServerGrp {
@ -62,8 +68,6 @@ func (gs *Groups) cleanup() {
} }
type ServerGrp struct { type ServerGrp struct {
Maxraftstate int
net *labrpc.Network net *labrpc.Network
srvs []*Server srvs []*Server
servernames []string servernames []string
@ -72,9 +76,8 @@ type ServerGrp struct {
mks FstartServer mks FstartServer
} }
func makeSrvGrp(net *labrpc.Network, gid Tgid, n, m int, mks FstartServer) *ServerGrp { func makeSrvGrp(net *labrpc.Network, gid Tgid, n int, mks FstartServer) *ServerGrp {
sg := &ServerGrp{ sg := &ServerGrp{
Maxraftstate: m,
net: net, net: net,
srvs: make([]*Server, n), srvs: make([]*Server, n),
gid: gid, gid: gid,
@ -99,6 +102,14 @@ func (sg *ServerGrp) SrvNames() []string {
return sg.servernames return sg.servernames
} }
func (sg *ServerGrp) Services() [][]IService {
ss := make([][]IService, 0, len(sg.srvs))
for _, s := range sg.srvs {
ss = append(ss, s.svcs)
}
return ss
}
func (sg *ServerGrp) SrvNamesTo(to []int) []string { func (sg *ServerGrp) SrvNamesTo(to []int) []string {
ns := make([]string, 0, len(to)) ns := make([]string, 0, len(to))
for _, i := range to { for _, i := range to {
@ -127,8 +138,10 @@ func (sg *ServerGrp) ConnectOne(i int) {
func (sg *ServerGrp) cleanup() { func (sg *ServerGrp) cleanup() {
for _, s := range sg.srvs { for _, s := range sg.srvs {
if s.kvsrv != nil { if s.svcs != nil {
s.kvsrv.Kill() for _, svc := range s.svcs {
svc.Kill()
}
} }
} }
} }
@ -210,13 +223,11 @@ func (sg *ServerGrp) StartServer(i int) {
srv := sg.srvs[i].startServer(sg.gid) srv := sg.srvs[i].startServer(sg.gid)
sg.srvs[i] = srv sg.srvs[i] = srv
srv.kvsrv = sg.mks(srv.clntEnds, sg.gid, i, srv.saved, sg.Maxraftstate) srv.svcs = sg.mks(srv.clntEnds, sg.gid, i, srv.saved)
kvsvc := labrpc.MakeService(srv.kvsrv)
labsrv := labrpc.MakeServer() labsrv := labrpc.MakeServer()
labsrv.AddService(kvsvc) for _, svc := range srv.svcs {
if len(sg.srvs) > 1 { // Run with raft? s := labrpc.MakeService(svc)
rfsvc := labrpc.MakeService(srv.kvsrv.Raft()) labsrv.AddService(s)
labsrv.AddService(rfsvc)
} }
sg.net.AddServer(ServerName(sg.gid, i), labsrv) sg.net.AddServer(ServerName(sg.gid, i), labsrv)
} }
@ -254,23 +265,8 @@ func (sg *ServerGrp) start() {
} }
} }
func (sg *ServerGrp) GetState(i int) (int, bool) {
return sg.srvs[i].kvsrv.Raft().GetState()
}
func (sg *ServerGrp) Leader() (bool, int) {
for i, _ := range sg.srvs {
_, is_leader := sg.GetState(i)
if is_leader {
return true, i
}
}
return false, 0
}
// Partition servers into 2 groups and put current leader in minority // Partition servers into 2 groups and put current leader in minority
func (sg *ServerGrp) MakePartition() ([]int, []int) { func (sg *ServerGrp) MakePartition(l int) ([]int, []int) {
_, l := sg.Leader()
n := len(sg.srvs) n := len(sg.srvs)
p1 := make([]int, n/2+1) p1 := make([]int, n/2+1)
p2 := make([]int, n/2) p2 := make([]int, n/2)

70
src/tester1/persister.go Normal file
View File

@ -0,0 +1,70 @@
package tester
//
// support for Raft and kvraft to save persistent
// Raft state (log &c) and k/v server snapshots.
//
// we will use the original persister.go to test your code for grading.
// so, while you can modify this code to help you debug, please
// test with the original before submitting.
//
import "sync"
type Persister struct {
mu sync.Mutex
raftstate []byte
snapshot []byte
}
func MakePersister() *Persister {
return &Persister{}
}
func clone(orig []byte) []byte {
x := make([]byte, len(orig))
copy(x, orig)
return x
}
func (ps *Persister) Copy() *Persister {
ps.mu.Lock()
defer ps.mu.Unlock()
np := MakePersister()
np.raftstate = ps.raftstate
np.snapshot = ps.snapshot
return np
}
func (ps *Persister) ReadRaftState() []byte {
ps.mu.Lock()
defer ps.mu.Unlock()
return clone(ps.raftstate)
}
func (ps *Persister) RaftStateSize() int {
ps.mu.Lock()
defer ps.mu.Unlock()
return len(ps.raftstate)
}
// Save both Raft state and K/V snapshot as a single atomic action,
// to help avoid them getting out of sync.
func (ps *Persister) Save(raftstate []byte, snapshot []byte) {
ps.mu.Lock()
defer ps.mu.Unlock()
ps.raftstate = clone(raftstate)
ps.snapshot = clone(snapshot)
}
func (ps *Persister) ReadSnapshot() []byte {
ps.mu.Lock()
defer ps.mu.Unlock()
return clone(ps.snapshot)
}
func (ps *Persister) SnapshotSize() int {
ps.mu.Lock()
defer ps.mu.Unlock()
return len(ps.snapshot)
}

View File

@ -2,15 +2,16 @@ package tester
import ( import (
// "log" // "log"
"sync"
"6.5840/labrpc" "6.5840/labrpc"
"6.5840/raft"
) )
type Server struct { type Server struct {
mu sync.Mutex
net *labrpc.Network net *labrpc.Network
saved *raft.Persister saved *Persister
kvsrv IKVServer svcs []IService // list of services exported by
endNames []string endNames []string
clntEnds []*labrpc.ClientEnd clntEnds []*labrpc.ClientEnd
} }
@ -40,12 +41,14 @@ func (s *Server) startServer(gid Tgid) *Server {
if s.saved != nil { if s.saved != nil {
srv.saved = s.saved.Copy() srv.saved = s.saved.Copy()
} else { } else {
srv.saved = raft.MakePersister() srv.saved = MakePersister()
} }
return srv return srv
} }
func (s *Server) connect(to []int) { func (s *Server) connect(to []int) {
s.mu.Lock()
defer s.mu.Unlock()
for j := 0; j < len(to); j++ { for j := 0; j < len(to); j++ {
endname := s.endNames[to[j]] endname := s.endNames[to[j]]
s.net.Enable(endname, true) s.net.Enable(endname, true)
@ -53,6 +56,9 @@ func (s *Server) connect(to []int) {
} }
func (s *Server) disconnect(from []int) { func (s *Server) disconnect(from []int) {
s.mu.Lock()
defer s.mu.Unlock()
if s.endNames == nil { if s.endNames == nil {
return return
} }
@ -62,8 +68,10 @@ func (s *Server) disconnect(from []int) {
} }
} }
// XXX lock s?
func (s *Server) shutdownServer() { func (s *Server) shutdownServer() {
s.mu.Lock()
defer s.mu.Unlock()
// a fresh persister, in case old instance // a fresh persister, in case old instance
// continues to update the Persister. // continues to update the Persister.
// but copy old persister's content so that we always // but copy old persister's content so that we always
@ -72,9 +80,11 @@ func (s *Server) shutdownServer() {
s.saved = s.saved.Copy() s.saved = s.saved.Copy()
} }
kv := s.kvsrv // inform all services to stop
if kv != nil { for _, svc := range s.svcs {
kv.Kill() if svc != nil {
s.kvsrv = nil svc.Kill()
} }
}
s.svcs = nil
} }