x
This commit is contained in:
parent
2f72ad504b
commit
788497f1a1
@ -1,648 +0,0 @@
|
|||||||
package raft
|
|
||||||
|
|
||||||
//
|
|
||||||
// support for Raft tester.
|
|
||||||
//
|
|
||||||
// we will use the original config.go to test your code for grading.
|
|
||||||
// so, while you can modify this code to help you debug, please
|
|
||||||
// test with the original before submitting.
|
|
||||||
//
|
|
||||||
|
|
||||||
import "6.5840/labgob"
|
|
||||||
import "6.5840/labrpc"
|
|
||||||
import "bytes"
|
|
||||||
import "log"
|
|
||||||
import "sync"
|
|
||||||
import "sync/atomic"
|
|
||||||
import "testing"
|
|
||||||
import "runtime"
|
|
||||||
import "math/rand"
|
|
||||||
import crand "crypto/rand"
|
|
||||||
import "math/big"
|
|
||||||
import "encoding/base64"
|
|
||||||
import "time"
|
|
||||||
import "fmt"
|
|
||||||
|
|
||||||
func randstring(n int) string {
|
|
||||||
b := make([]byte, 2*n)
|
|
||||||
crand.Read(b)
|
|
||||||
s := base64.URLEncoding.EncodeToString(b)
|
|
||||||
return s[0:n]
|
|
||||||
}
|
|
||||||
|
|
||||||
func makeSeed() int64 {
|
|
||||||
max := big.NewInt(int64(1) << 62)
|
|
||||||
bigx, _ := crand.Int(crand.Reader, max)
|
|
||||||
x := bigx.Int64()
|
|
||||||
return x
|
|
||||||
}
|
|
||||||
|
|
||||||
type config struct {
|
|
||||||
mu sync.Mutex
|
|
||||||
t *testing.T
|
|
||||||
finished int32
|
|
||||||
net *labrpc.Network
|
|
||||||
n int
|
|
||||||
rafts []*Raft
|
|
||||||
applyErr []string // from apply channel readers
|
|
||||||
connected []bool // whether each server is on the net
|
|
||||||
saved []*Persister
|
|
||||||
endnames [][]string // the port file names each sends to
|
|
||||||
logs []map[int]interface{} // copy of each server's committed entries
|
|
||||||
lastApplied []int
|
|
||||||
start time.Time // time at which make_config() was called
|
|
||||||
// begin()/end() statistics
|
|
||||||
t0 time.Time // time at which test_test.go called cfg.begin()
|
|
||||||
rpcs0 int // rpcTotal() at start of test
|
|
||||||
cmds0 int // number of agreements
|
|
||||||
bytes0 int64
|
|
||||||
maxIndex int
|
|
||||||
maxIndex0 int
|
|
||||||
}
|
|
||||||
|
|
||||||
var ncpu_once sync.Once
|
|
||||||
|
|
||||||
func make_config(t *testing.T, n int, unreliable bool, snapshot bool) *config {
|
|
||||||
ncpu_once.Do(func() {
|
|
||||||
if runtime.NumCPU() < 2 {
|
|
||||||
fmt.Printf("warning: only one CPU, which may conceal locking bugs\n")
|
|
||||||
}
|
|
||||||
rand.Seed(makeSeed())
|
|
||||||
})
|
|
||||||
runtime.GOMAXPROCS(4)
|
|
||||||
cfg := &config{}
|
|
||||||
cfg.t = t
|
|
||||||
cfg.net = labrpc.MakeNetwork()
|
|
||||||
cfg.n = n
|
|
||||||
cfg.applyErr = make([]string, cfg.n)
|
|
||||||
cfg.rafts = make([]*Raft, cfg.n)
|
|
||||||
cfg.connected = make([]bool, cfg.n)
|
|
||||||
cfg.saved = make([]*Persister, cfg.n)
|
|
||||||
cfg.endnames = make([][]string, cfg.n)
|
|
||||||
cfg.logs = make([]map[int]interface{}, cfg.n)
|
|
||||||
cfg.lastApplied = make([]int, cfg.n)
|
|
||||||
cfg.start = time.Now()
|
|
||||||
|
|
||||||
cfg.setunreliable(unreliable)
|
|
||||||
|
|
||||||
cfg.net.LongDelays(true)
|
|
||||||
|
|
||||||
applier := cfg.applier
|
|
||||||
if snapshot {
|
|
||||||
applier = cfg.applierSnap
|
|
||||||
}
|
|
||||||
// create a full set of Rafts.
|
|
||||||
for i := 0; i < cfg.n; i++ {
|
|
||||||
cfg.logs[i] = map[int]interface{}{}
|
|
||||||
cfg.start1(i, applier)
|
|
||||||
}
|
|
||||||
|
|
||||||
// connect everyone
|
|
||||||
for i := 0; i < cfg.n; i++ {
|
|
||||||
cfg.connect(i)
|
|
||||||
}
|
|
||||||
|
|
||||||
return cfg
|
|
||||||
}
|
|
||||||
|
|
||||||
// shut down a Raft server but save its persistent state.
|
|
||||||
func (cfg *config) crash1(i int) {
|
|
||||||
cfg.disconnect(i)
|
|
||||||
cfg.net.DeleteServer(i) // disable client connections to the server.
|
|
||||||
|
|
||||||
cfg.mu.Lock()
|
|
||||||
defer cfg.mu.Unlock()
|
|
||||||
|
|
||||||
// a fresh persister, in case old instance
|
|
||||||
// continues to update the Persister.
|
|
||||||
// but copy old persister's content so that we always
|
|
||||||
// pass Make() the last persisted state.
|
|
||||||
if cfg.saved[i] != nil {
|
|
||||||
cfg.saved[i] = cfg.saved[i].Copy()
|
|
||||||
}
|
|
||||||
|
|
||||||
rf := cfg.rafts[i]
|
|
||||||
if rf != nil {
|
|
||||||
cfg.mu.Unlock()
|
|
||||||
rf.Kill()
|
|
||||||
cfg.mu.Lock()
|
|
||||||
cfg.rafts[i] = nil
|
|
||||||
}
|
|
||||||
|
|
||||||
if cfg.saved[i] != nil {
|
|
||||||
raftlog := cfg.saved[i].ReadRaftState()
|
|
||||||
snapshot := cfg.saved[i].ReadSnapshot()
|
|
||||||
cfg.saved[i] = &Persister{}
|
|
||||||
cfg.saved[i].Save(raftlog, snapshot)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (cfg *config) checkLogs(i int, m ApplyMsg) (string, bool) {
|
|
||||||
err_msg := ""
|
|
||||||
v := m.Command
|
|
||||||
for j := 0; j < len(cfg.logs); j++ {
|
|
||||||
if old, oldok := cfg.logs[j][m.CommandIndex]; oldok && old != v {
|
|
||||||
log.Printf("%v: log %v; server %v\n", i, cfg.logs[i], cfg.logs[j])
|
|
||||||
// some server has already committed a different value for this entry!
|
|
||||||
err_msg = fmt.Sprintf("commit index=%v server=%v %v != server=%v %v",
|
|
||||||
m.CommandIndex, i, m.Command, j, old)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
_, prevok := cfg.logs[i][m.CommandIndex-1]
|
|
||||||
cfg.logs[i][m.CommandIndex] = v
|
|
||||||
if m.CommandIndex > cfg.maxIndex {
|
|
||||||
cfg.maxIndex = m.CommandIndex
|
|
||||||
}
|
|
||||||
return err_msg, prevok
|
|
||||||
}
|
|
||||||
|
|
||||||
// applier reads message from apply ch and checks that they match the log
|
|
||||||
// contents
|
|
||||||
func (cfg *config) applier(i int, applyCh chan ApplyMsg) {
|
|
||||||
for m := range applyCh {
|
|
||||||
if m.CommandValid == false {
|
|
||||||
// ignore other types of ApplyMsg
|
|
||||||
} else {
|
|
||||||
cfg.mu.Lock()
|
|
||||||
err_msg, prevok := cfg.checkLogs(i, m)
|
|
||||||
cfg.mu.Unlock()
|
|
||||||
if m.CommandIndex > 1 && prevok == false {
|
|
||||||
err_msg = fmt.Sprintf("server %v apply out of order %v", i, m.CommandIndex)
|
|
||||||
}
|
|
||||||
if err_msg != "" {
|
|
||||||
log.Fatalf("apply error: %v", err_msg)
|
|
||||||
cfg.applyErr[i] = err_msg
|
|
||||||
// keep reading after error so that Raft doesn't block
|
|
||||||
// holding locks...
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// returns "" or error string
|
|
||||||
func (cfg *config) ingestSnap(i int, snapshot []byte, index int) string {
|
|
||||||
if snapshot == nil {
|
|
||||||
log.Fatalf("nil snapshot")
|
|
||||||
return "nil snapshot"
|
|
||||||
}
|
|
||||||
r := bytes.NewBuffer(snapshot)
|
|
||||||
d := labgob.NewDecoder(r)
|
|
||||||
var lastIncludedIndex int
|
|
||||||
var xlog []interface{}
|
|
||||||
if d.Decode(&lastIncludedIndex) != nil ||
|
|
||||||
d.Decode(&xlog) != nil {
|
|
||||||
log.Fatalf("snapshot decode error")
|
|
||||||
return "snapshot Decode() error"
|
|
||||||
}
|
|
||||||
if index != -1 && index != lastIncludedIndex {
|
|
||||||
err := fmt.Sprintf("server %v snapshot doesn't match m.SnapshotIndex", i)
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
cfg.logs[i] = map[int]interface{}{}
|
|
||||||
for j := 0; j < len(xlog); j++ {
|
|
||||||
cfg.logs[i][j] = xlog[j]
|
|
||||||
}
|
|
||||||
cfg.lastApplied[i] = lastIncludedIndex
|
|
||||||
return ""
|
|
||||||
}
|
|
||||||
|
|
||||||
const SnapShotInterval = 10
|
|
||||||
|
|
||||||
// periodically snapshot raft state
|
|
||||||
func (cfg *config) applierSnap(i int, applyCh chan ApplyMsg) {
|
|
||||||
cfg.mu.Lock()
|
|
||||||
rf := cfg.rafts[i]
|
|
||||||
cfg.mu.Unlock()
|
|
||||||
if rf == nil {
|
|
||||||
return // ???
|
|
||||||
}
|
|
||||||
|
|
||||||
for m := range applyCh {
|
|
||||||
err_msg := ""
|
|
||||||
if m.SnapshotValid {
|
|
||||||
cfg.mu.Lock()
|
|
||||||
err_msg = cfg.ingestSnap(i, m.Snapshot, m.SnapshotIndex)
|
|
||||||
cfg.mu.Unlock()
|
|
||||||
} else if m.CommandValid {
|
|
||||||
if m.CommandIndex != cfg.lastApplied[i]+1 {
|
|
||||||
err_msg = fmt.Sprintf("server %v apply out of order, expected index %v, got %v", i, cfg.lastApplied[i]+1, m.CommandIndex)
|
|
||||||
}
|
|
||||||
|
|
||||||
if err_msg == "" {
|
|
||||||
cfg.mu.Lock()
|
|
||||||
var prevok bool
|
|
||||||
err_msg, prevok = cfg.checkLogs(i, m)
|
|
||||||
cfg.mu.Unlock()
|
|
||||||
if m.CommandIndex > 1 && prevok == false {
|
|
||||||
err_msg = fmt.Sprintf("server %v apply out of order %v", i, m.CommandIndex)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
cfg.mu.Lock()
|
|
||||||
cfg.lastApplied[i] = m.CommandIndex
|
|
||||||
cfg.mu.Unlock()
|
|
||||||
|
|
||||||
if (m.CommandIndex+1)%SnapShotInterval == 0 {
|
|
||||||
w := new(bytes.Buffer)
|
|
||||||
e := labgob.NewEncoder(w)
|
|
||||||
e.Encode(m.CommandIndex)
|
|
||||||
var xlog []interface{}
|
|
||||||
for j := 0; j <= m.CommandIndex; j++ {
|
|
||||||
xlog = append(xlog, cfg.logs[i][j])
|
|
||||||
}
|
|
||||||
e.Encode(xlog)
|
|
||||||
rf.Snapshot(m.CommandIndex, w.Bytes())
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
// Ignore other types of ApplyMsg.
|
|
||||||
}
|
|
||||||
if err_msg != "" {
|
|
||||||
log.Fatalf("apply error: %v", err_msg)
|
|
||||||
cfg.applyErr[i] = err_msg
|
|
||||||
// keep reading after error so that Raft doesn't block
|
|
||||||
// holding locks...
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// start or re-start a Raft.
|
|
||||||
// if one already exists, "kill" it first.
|
|
||||||
// allocate new outgoing port file names, and a new
|
|
||||||
// state persister, to isolate previous instance of
|
|
||||||
// this server. since we cannot really kill it.
|
|
||||||
func (cfg *config) start1(i int, applier func(int, chan ApplyMsg)) {
|
|
||||||
cfg.crash1(i)
|
|
||||||
|
|
||||||
// a fresh set of outgoing ClientEnd names.
|
|
||||||
// so that old crashed instance's ClientEnds can't send.
|
|
||||||
cfg.endnames[i] = make([]string, cfg.n)
|
|
||||||
for j := 0; j < cfg.n; j++ {
|
|
||||||
cfg.endnames[i][j] = randstring(20)
|
|
||||||
}
|
|
||||||
|
|
||||||
// a fresh set of ClientEnds.
|
|
||||||
ends := make([]*labrpc.ClientEnd, cfg.n)
|
|
||||||
for j := 0; j < cfg.n; j++ {
|
|
||||||
ends[j] = cfg.net.MakeEnd(cfg.endnames[i][j])
|
|
||||||
cfg.net.Connect(cfg.endnames[i][j], j)
|
|
||||||
}
|
|
||||||
|
|
||||||
cfg.mu.Lock()
|
|
||||||
|
|
||||||
cfg.lastApplied[i] = 0
|
|
||||||
|
|
||||||
// a fresh persister, so old instance doesn't overwrite
|
|
||||||
// new instance's persisted state.
|
|
||||||
// but copy old persister's content so that we always
|
|
||||||
// pass Make() the last persisted state.
|
|
||||||
if cfg.saved[i] != nil {
|
|
||||||
cfg.saved[i] = cfg.saved[i].Copy()
|
|
||||||
|
|
||||||
snapshot := cfg.saved[i].ReadSnapshot()
|
|
||||||
if snapshot != nil && len(snapshot) > 0 {
|
|
||||||
// mimic KV server and process snapshot now.
|
|
||||||
// ideally Raft should send it up on applyCh...
|
|
||||||
err := cfg.ingestSnap(i, snapshot, -1)
|
|
||||||
if err != "" {
|
|
||||||
cfg.t.Fatal(err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
cfg.saved[i] = MakePersister()
|
|
||||||
}
|
|
||||||
|
|
||||||
cfg.mu.Unlock()
|
|
||||||
|
|
||||||
applyCh := make(chan ApplyMsg)
|
|
||||||
|
|
||||||
rf := Make(ends, i, cfg.saved[i], applyCh)
|
|
||||||
|
|
||||||
cfg.mu.Lock()
|
|
||||||
cfg.rafts[i] = rf
|
|
||||||
cfg.mu.Unlock()
|
|
||||||
|
|
||||||
go applier(i, applyCh)
|
|
||||||
|
|
||||||
svc := labrpc.MakeService(rf)
|
|
||||||
srv := labrpc.MakeServer()
|
|
||||||
srv.AddService(svc)
|
|
||||||
cfg.net.AddServer(i, srv)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (cfg *config) checkTimeout() {
|
|
||||||
// enforce a two minute real-time limit on each test
|
|
||||||
if !cfg.t.Failed() && time.Since(cfg.start) > 120*time.Second {
|
|
||||||
cfg.t.Fatal("test took longer than 120 seconds")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (cfg *config) checkFinished() bool {
|
|
||||||
z := atomic.LoadInt32(&cfg.finished)
|
|
||||||
return z != 0
|
|
||||||
}
|
|
||||||
|
|
||||||
func (cfg *config) cleanup() {
|
|
||||||
atomic.StoreInt32(&cfg.finished, 1)
|
|
||||||
for i := 0; i < len(cfg.rafts); i++ {
|
|
||||||
if cfg.rafts[i] != nil {
|
|
||||||
cfg.rafts[i].Kill()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
cfg.net.Cleanup()
|
|
||||||
cfg.checkTimeout()
|
|
||||||
}
|
|
||||||
|
|
||||||
// attach server i to the net.
|
|
||||||
func (cfg *config) connect(i int) {
|
|
||||||
// fmt.Printf("connect(%d)\n", i)
|
|
||||||
|
|
||||||
cfg.connected[i] = true
|
|
||||||
|
|
||||||
// outgoing ClientEnds
|
|
||||||
for j := 0; j < cfg.n; j++ {
|
|
||||||
if cfg.connected[j] {
|
|
||||||
endname := cfg.endnames[i][j]
|
|
||||||
cfg.net.Enable(endname, true)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// incoming ClientEnds
|
|
||||||
for j := 0; j < cfg.n; j++ {
|
|
||||||
if cfg.connected[j] {
|
|
||||||
endname := cfg.endnames[j][i]
|
|
||||||
cfg.net.Enable(endname, true)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// detach server i from the net.
|
|
||||||
func (cfg *config) disconnect(i int) {
|
|
||||||
// fmt.Printf("disconnect(%d)\n", i)
|
|
||||||
|
|
||||||
cfg.connected[i] = false
|
|
||||||
|
|
||||||
// outgoing ClientEnds
|
|
||||||
for j := 0; j < cfg.n; j++ {
|
|
||||||
if cfg.endnames[i] != nil {
|
|
||||||
endname := cfg.endnames[i][j]
|
|
||||||
cfg.net.Enable(endname, false)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// incoming ClientEnds
|
|
||||||
for j := 0; j < cfg.n; j++ {
|
|
||||||
if cfg.endnames[j] != nil {
|
|
||||||
endname := cfg.endnames[j][i]
|
|
||||||
cfg.net.Enable(endname, false)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (cfg *config) rpcCount(server int) int {
|
|
||||||
return cfg.net.GetCount(server)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (cfg *config) rpcTotal() int {
|
|
||||||
return cfg.net.GetTotalCount()
|
|
||||||
}
|
|
||||||
|
|
||||||
func (cfg *config) setunreliable(unrel bool) {
|
|
||||||
cfg.net.Reliable(!unrel)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (cfg *config) bytesTotal() int64 {
|
|
||||||
return cfg.net.GetTotalBytes()
|
|
||||||
}
|
|
||||||
|
|
||||||
func (cfg *config) setlongreordering(longrel bool) {
|
|
||||||
cfg.net.LongReordering(longrel)
|
|
||||||
}
|
|
||||||
|
|
||||||
// check that one of the connected servers thinks
|
|
||||||
// it is the leader, and that no other connected
|
|
||||||
// server thinks otherwise.
|
|
||||||
//
|
|
||||||
// try a few times in case re-elections are needed.
|
|
||||||
func (cfg *config) checkOneLeader() int {
|
|
||||||
for iters := 0; iters < 10; iters++ {
|
|
||||||
ms := 450 + (rand.Int63() % 100)
|
|
||||||
time.Sleep(time.Duration(ms) * time.Millisecond)
|
|
||||||
|
|
||||||
leaders := make(map[int][]int)
|
|
||||||
for i := 0; i < cfg.n; i++ {
|
|
||||||
if cfg.connected[i] {
|
|
||||||
if term, leader := cfg.rafts[i].GetState(); leader {
|
|
||||||
leaders[term] = append(leaders[term], i)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
lastTermWithLeader := -1
|
|
||||||
for term, leaders := range leaders {
|
|
||||||
if len(leaders) > 1 {
|
|
||||||
cfg.t.Fatalf("term %d has %d (>1) leaders", term, len(leaders))
|
|
||||||
}
|
|
||||||
if term > lastTermWithLeader {
|
|
||||||
lastTermWithLeader = term
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if len(leaders) != 0 {
|
|
||||||
return leaders[lastTermWithLeader][0]
|
|
||||||
}
|
|
||||||
}
|
|
||||||
cfg.t.Fatalf("expected one leader, got none")
|
|
||||||
return -1
|
|
||||||
}
|
|
||||||
|
|
||||||
// check that everyone agrees on the term.
|
|
||||||
func (cfg *config) checkTerms() int {
|
|
||||||
term := -1
|
|
||||||
for i := 0; i < cfg.n; i++ {
|
|
||||||
if cfg.connected[i] {
|
|
||||||
xterm, _ := cfg.rafts[i].GetState()
|
|
||||||
if term == -1 {
|
|
||||||
term = xterm
|
|
||||||
} else if term != xterm {
|
|
||||||
cfg.t.Fatalf("servers disagree on term")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return term
|
|
||||||
}
|
|
||||||
|
|
||||||
// check that none of the connected servers
|
|
||||||
// thinks it is the leader.
|
|
||||||
func (cfg *config) checkNoLeader() {
|
|
||||||
for i := 0; i < cfg.n; i++ {
|
|
||||||
if cfg.connected[i] {
|
|
||||||
_, is_leader := cfg.rafts[i].GetState()
|
|
||||||
if is_leader {
|
|
||||||
cfg.t.Fatalf("expected no leader among connected servers, but %v claims to be leader", i)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// how many servers think a log entry is committed?
|
|
||||||
func (cfg *config) nCommitted(index int) (int, interface{}) {
|
|
||||||
count := 0
|
|
||||||
var cmd interface{} = nil
|
|
||||||
for i := 0; i < len(cfg.rafts); i++ {
|
|
||||||
if cfg.applyErr[i] != "" {
|
|
||||||
cfg.t.Fatal(cfg.applyErr[i])
|
|
||||||
}
|
|
||||||
|
|
||||||
cfg.mu.Lock()
|
|
||||||
cmd1, ok := cfg.logs[i][index]
|
|
||||||
cfg.mu.Unlock()
|
|
||||||
|
|
||||||
if ok {
|
|
||||||
if count > 0 && cmd != cmd1 {
|
|
||||||
cfg.t.Fatalf("committed values do not match: index %v, %v, %v",
|
|
||||||
index, cmd, cmd1)
|
|
||||||
}
|
|
||||||
count += 1
|
|
||||||
cmd = cmd1
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return count, cmd
|
|
||||||
}
|
|
||||||
|
|
||||||
// wait for at least n servers to commit.
|
|
||||||
// but don't wait forever.
|
|
||||||
func (cfg *config) wait(index int, n int, startTerm int) interface{} {
|
|
||||||
to := 10 * time.Millisecond
|
|
||||||
for iters := 0; iters < 30; iters++ {
|
|
||||||
nd, _ := cfg.nCommitted(index)
|
|
||||||
if nd >= n {
|
|
||||||
break
|
|
||||||
}
|
|
||||||
time.Sleep(to)
|
|
||||||
if to < time.Second {
|
|
||||||
to *= 2
|
|
||||||
}
|
|
||||||
if startTerm > -1 {
|
|
||||||
for _, r := range cfg.rafts {
|
|
||||||
if t, _ := r.GetState(); t > startTerm {
|
|
||||||
// someone has moved on
|
|
||||||
// can no longer guarantee that we'll "win"
|
|
||||||
return -1
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
nd, cmd := cfg.nCommitted(index)
|
|
||||||
if nd < n {
|
|
||||||
cfg.t.Fatalf("only %d decided for index %d; wanted %d",
|
|
||||||
nd, index, n)
|
|
||||||
}
|
|
||||||
return cmd
|
|
||||||
}
|
|
||||||
|
|
||||||
// do a complete agreement.
|
|
||||||
// it might choose the wrong leader initially,
|
|
||||||
// and have to re-submit after giving up.
|
|
||||||
// entirely gives up after about 10 seconds.
|
|
||||||
// indirectly checks that the servers agree on the
|
|
||||||
// same value, since nCommitted() checks this,
|
|
||||||
// as do the threads that read from applyCh.
|
|
||||||
// returns index.
|
|
||||||
// if retry==true, may submit the command multiple
|
|
||||||
// times, in case a leader fails just after Start().
|
|
||||||
// if retry==false, calls Start() only once, in order
|
|
||||||
// to simplify the early Lab 3B tests.
|
|
||||||
func (cfg *config) one(cmd interface{}, expectedServers int, retry bool) int {
|
|
||||||
t0 := time.Now()
|
|
||||||
starts := 0
|
|
||||||
for time.Since(t0).Seconds() < 10 && cfg.checkFinished() == false {
|
|
||||||
// try all the servers, maybe one is the leader.
|
|
||||||
index := -1
|
|
||||||
for si := 0; si < cfg.n; si++ {
|
|
||||||
starts = (starts + 1) % cfg.n
|
|
||||||
var rf *Raft
|
|
||||||
cfg.mu.Lock()
|
|
||||||
if cfg.connected[starts] {
|
|
||||||
rf = cfg.rafts[starts]
|
|
||||||
}
|
|
||||||
cfg.mu.Unlock()
|
|
||||||
if rf != nil {
|
|
||||||
index1, _, ok := rf.Start(cmd)
|
|
||||||
if ok {
|
|
||||||
index = index1
|
|
||||||
break
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if index != -1 {
|
|
||||||
// somebody claimed to be the leader and to have
|
|
||||||
// submitted our command; wait a while for agreement.
|
|
||||||
t1 := time.Now()
|
|
||||||
for time.Since(t1).Seconds() < 2 {
|
|
||||||
nd, cmd1 := cfg.nCommitted(index)
|
|
||||||
if nd > 0 && nd >= expectedServers {
|
|
||||||
// committed
|
|
||||||
if cmd1 == cmd {
|
|
||||||
// and it was the command we submitted.
|
|
||||||
return index
|
|
||||||
}
|
|
||||||
}
|
|
||||||
time.Sleep(20 * time.Millisecond)
|
|
||||||
}
|
|
||||||
if retry == false {
|
|
||||||
cfg.t.Fatalf("one(%v) failed to reach agreement", cmd)
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
time.Sleep(50 * time.Millisecond)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if cfg.checkFinished() == false {
|
|
||||||
cfg.t.Fatalf("one(%v) failed to reach agreement", cmd)
|
|
||||||
}
|
|
||||||
return -1
|
|
||||||
}
|
|
||||||
|
|
||||||
// start a Test.
|
|
||||||
// print the Test message.
|
|
||||||
// e.g. cfg.begin("Test (3B): RPC counts aren't too high")
|
|
||||||
func (cfg *config) begin(description string) {
|
|
||||||
fmt.Printf("%s ...\n", description)
|
|
||||||
cfg.t0 = time.Now()
|
|
||||||
cfg.rpcs0 = cfg.rpcTotal()
|
|
||||||
cfg.bytes0 = cfg.bytesTotal()
|
|
||||||
cfg.cmds0 = 0
|
|
||||||
cfg.maxIndex0 = cfg.maxIndex
|
|
||||||
}
|
|
||||||
|
|
||||||
// end a Test -- the fact that we got here means there
|
|
||||||
// was no failure.
|
|
||||||
// print the Passed message,
|
|
||||||
// and some performance numbers.
|
|
||||||
func (cfg *config) end() {
|
|
||||||
cfg.checkTimeout()
|
|
||||||
if cfg.t.Failed() == false {
|
|
||||||
cfg.mu.Lock()
|
|
||||||
t := time.Since(cfg.t0).Seconds() // real time
|
|
||||||
npeers := cfg.n // number of Raft peers
|
|
||||||
nrpc := cfg.rpcTotal() - cfg.rpcs0 // number of RPC sends
|
|
||||||
nbytes := cfg.bytesTotal() - cfg.bytes0 // number of bytes
|
|
||||||
ncmds := cfg.maxIndex - cfg.maxIndex0 // number of Raft agreements reported
|
|
||||||
cfg.mu.Unlock()
|
|
||||||
|
|
||||||
fmt.Printf(" ... Passed --")
|
|
||||||
fmt.Printf(" %4.1f %d %4d %7d %4d\n", t, npeers, nrpc, nbytes, ncmds)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Maximum log size across all servers
|
|
||||||
func (cfg *config) LogSize() int {
|
|
||||||
logsize := 0
|
|
||||||
for i := 0; i < cfg.n; i++ {
|
|
||||||
n := cfg.saved[i].RaftStateSize()
|
|
||||||
if n > logsize {
|
|
||||||
logsize = n
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return logsize
|
|
||||||
}
|
|
@ -1,70 +0,0 @@
|
|||||||
package raft
|
|
||||||
|
|
||||||
//
|
|
||||||
// support for Raft and kvraft to save persistent
|
|
||||||
// Raft state (log &c) and k/v server snapshots.
|
|
||||||
//
|
|
||||||
// we will use the original persister.go to test your code for grading.
|
|
||||||
// so, while you can modify this code to help you debug, please
|
|
||||||
// test with the original before submitting.
|
|
||||||
//
|
|
||||||
|
|
||||||
import "sync"
|
|
||||||
|
|
||||||
type Persister struct {
|
|
||||||
mu sync.Mutex
|
|
||||||
raftstate []byte
|
|
||||||
snapshot []byte
|
|
||||||
}
|
|
||||||
|
|
||||||
func MakePersister() *Persister {
|
|
||||||
return &Persister{}
|
|
||||||
}
|
|
||||||
|
|
||||||
func clone(orig []byte) []byte {
|
|
||||||
x := make([]byte, len(orig))
|
|
||||||
copy(x, orig)
|
|
||||||
return x
|
|
||||||
}
|
|
||||||
|
|
||||||
func (ps *Persister) Copy() *Persister {
|
|
||||||
ps.mu.Lock()
|
|
||||||
defer ps.mu.Unlock()
|
|
||||||
np := MakePersister()
|
|
||||||
np.raftstate = ps.raftstate
|
|
||||||
np.snapshot = ps.snapshot
|
|
||||||
return np
|
|
||||||
}
|
|
||||||
|
|
||||||
func (ps *Persister) ReadRaftState() []byte {
|
|
||||||
ps.mu.Lock()
|
|
||||||
defer ps.mu.Unlock()
|
|
||||||
return clone(ps.raftstate)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (ps *Persister) RaftStateSize() int {
|
|
||||||
ps.mu.Lock()
|
|
||||||
defer ps.mu.Unlock()
|
|
||||||
return len(ps.raftstate)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Save both Raft state and K/V snapshot as a single atomic action,
|
|
||||||
// to help avoid them getting out of sync.
|
|
||||||
func (ps *Persister) Save(raftstate []byte, snapshot []byte) {
|
|
||||||
ps.mu.Lock()
|
|
||||||
defer ps.mu.Unlock()
|
|
||||||
ps.raftstate = clone(raftstate)
|
|
||||||
ps.snapshot = clone(snapshot)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (ps *Persister) ReadSnapshot() []byte {
|
|
||||||
ps.mu.Lock()
|
|
||||||
defer ps.mu.Unlock()
|
|
||||||
return clone(ps.snapshot)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (ps *Persister) SnapshotSize() int {
|
|
||||||
ps.mu.Lock()
|
|
||||||
defer ps.mu.Unlock()
|
|
||||||
return len(ps.snapshot)
|
|
||||||
}
|
|
259
src/raft/raft.go
259
src/raft/raft.go
@ -1,259 +0,0 @@
|
|||||||
package raft
|
|
||||||
|
|
||||||
//
|
|
||||||
// this is an outline of the API that raft must expose to
|
|
||||||
// the service (or tester). see comments below for
|
|
||||||
// each of these functions for more details.
|
|
||||||
//
|
|
||||||
// rf = Make(...)
|
|
||||||
// create a new Raft server.
|
|
||||||
// rf.Start(command interface{}) (index, term, isleader)
|
|
||||||
// start agreement on a new log entry
|
|
||||||
// rf.GetState() (term, isLeader)
|
|
||||||
// ask a Raft for its current term, and whether it thinks it is leader
|
|
||||||
// ApplyMsg
|
|
||||||
// each time a new entry is committed to the log, each Raft peer
|
|
||||||
// should send an ApplyMsg to the service (or tester)
|
|
||||||
// in the same server.
|
|
||||||
//
|
|
||||||
|
|
||||||
import (
|
|
||||||
// "bytes"
|
|
||||||
"math/rand"
|
|
||||||
"sync"
|
|
||||||
"sync/atomic"
|
|
||||||
"time"
|
|
||||||
|
|
||||||
// "6.5840/labgob"
|
|
||||||
"6.5840/labrpc"
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
// as each Raft peer becomes aware that successive log entries are
|
|
||||||
// committed, the peer should send an ApplyMsg to the service (or
|
|
||||||
// tester) on the same server, via the applyCh passed to Make(). set
|
|
||||||
// CommandValid to true to indicate that the ApplyMsg contains a newly
|
|
||||||
// committed log entry.
|
|
||||||
//
|
|
||||||
// in part 3D you'll want to send other kinds of messages (e.g.,
|
|
||||||
// snapshots) on the applyCh, but set CommandValid to false for these
|
|
||||||
// other uses.
|
|
||||||
type ApplyMsg struct {
|
|
||||||
CommandValid bool
|
|
||||||
Command interface{}
|
|
||||||
CommandIndex int
|
|
||||||
|
|
||||||
// For 3D:
|
|
||||||
SnapshotValid bool
|
|
||||||
Snapshot []byte
|
|
||||||
SnapshotTerm int
|
|
||||||
SnapshotIndex int
|
|
||||||
}
|
|
||||||
|
|
||||||
// A Go object implementing a single Raft peer.
|
|
||||||
type Raft struct {
|
|
||||||
mu sync.Mutex // Lock to protect shared access to this peer's state
|
|
||||||
peers []*labrpc.ClientEnd // RPC end points of all peers
|
|
||||||
persister *Persister // Object to hold this peer's persisted state
|
|
||||||
me int // this peer's index into peers[]
|
|
||||||
dead int32 // set by Kill()
|
|
||||||
|
|
||||||
// Your data here (3A, 3B, 3C).
|
|
||||||
// Look at the paper's Figure 2 for a description of what
|
|
||||||
// state a Raft server must maintain.
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
// return currentTerm and whether this server
|
|
||||||
// believes it is the leader.
|
|
||||||
func (rf *Raft) GetState() (int, bool) {
|
|
||||||
|
|
||||||
var term int
|
|
||||||
var isleader bool
|
|
||||||
// Your code here (3A).
|
|
||||||
return term, isleader
|
|
||||||
}
|
|
||||||
|
|
||||||
// save Raft's persistent state to stable storage,
|
|
||||||
// where it can later be retrieved after a crash and restart.
|
|
||||||
// see paper's Figure 2 for a description of what should be persistent.
|
|
||||||
// before you've implemented snapshots, you should pass nil as the
|
|
||||||
// second argument to persister.Save().
|
|
||||||
// after you've implemented snapshots, pass the current snapshot
|
|
||||||
// (or nil if there's not yet a snapshot).
|
|
||||||
func (rf *Raft) persist() {
|
|
||||||
// Your code here (3C).
|
|
||||||
// Example:
|
|
||||||
// w := new(bytes.Buffer)
|
|
||||||
// e := labgob.NewEncoder(w)
|
|
||||||
// e.Encode(rf.xxx)
|
|
||||||
// e.Encode(rf.yyy)
|
|
||||||
// raftstate := w.Bytes()
|
|
||||||
// rf.persister.Save(raftstate, nil)
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
// restore previously persisted state.
|
|
||||||
func (rf *Raft) readPersist(data []byte) {
|
|
||||||
if data == nil || len(data) < 1 { // bootstrap without any state?
|
|
||||||
return
|
|
||||||
}
|
|
||||||
// Your code here (3C).
|
|
||||||
// Example:
|
|
||||||
// r := bytes.NewBuffer(data)
|
|
||||||
// d := labgob.NewDecoder(r)
|
|
||||||
// var xxx
|
|
||||||
// var yyy
|
|
||||||
// if d.Decode(&xxx) != nil ||
|
|
||||||
// d.Decode(&yyy) != nil {
|
|
||||||
// error...
|
|
||||||
// } else {
|
|
||||||
// rf.xxx = xxx
|
|
||||||
// rf.yyy = yyy
|
|
||||||
// }
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
// the service says it has created a snapshot that has
|
|
||||||
// all info up to and including index. this means the
|
|
||||||
// service no longer needs the log through (and including)
|
|
||||||
// that index. Raft should now trim its log as much as possible.
|
|
||||||
func (rf *Raft) Snapshot(index int, snapshot []byte) {
|
|
||||||
// Your code here (3D).
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
// example RequestVote RPC arguments structure.
|
|
||||||
// field names must start with capital letters!
|
|
||||||
type RequestVoteArgs struct {
|
|
||||||
// Your data here (3A, 3B).
|
|
||||||
}
|
|
||||||
|
|
||||||
// example RequestVote RPC reply structure.
|
|
||||||
// field names must start with capital letters!
|
|
||||||
type RequestVoteReply struct {
|
|
||||||
// Your data here (3A).
|
|
||||||
}
|
|
||||||
|
|
||||||
// example RequestVote RPC handler.
|
|
||||||
func (rf *Raft) RequestVote(args *RequestVoteArgs, reply *RequestVoteReply) {
|
|
||||||
// Your code here (3A, 3B).
|
|
||||||
}
|
|
||||||
|
|
||||||
// example code to send a RequestVote RPC to a server.
|
|
||||||
// server is the index of the target server in rf.peers[].
|
|
||||||
// expects RPC arguments in args.
|
|
||||||
// fills in *reply with RPC reply, so caller should
|
|
||||||
// pass &reply.
|
|
||||||
// the types of the args and reply passed to Call() must be
|
|
||||||
// the same as the types of the arguments declared in the
|
|
||||||
// handler function (including whether they are pointers).
|
|
||||||
//
|
|
||||||
// The labrpc package simulates a lossy network, in which servers
|
|
||||||
// may be unreachable, and in which requests and replies may be lost.
|
|
||||||
// Call() sends a request and waits for a reply. If a reply arrives
|
|
||||||
// within a timeout interval, Call() returns true; otherwise
|
|
||||||
// Call() returns false. Thus Call() may not return for a while.
|
|
||||||
// A false return can be caused by a dead server, a live server that
|
|
||||||
// can't be reached, a lost request, or a lost reply.
|
|
||||||
//
|
|
||||||
// Call() is guaranteed to return (perhaps after a delay) *except* if the
|
|
||||||
// handler function on the server side does not return. Thus there
|
|
||||||
// is no need to implement your own timeouts around Call().
|
|
||||||
//
|
|
||||||
// look at the comments in ../labrpc/labrpc.go for more details.
|
|
||||||
//
|
|
||||||
// if you're having trouble getting RPC to work, check that you've
|
|
||||||
// capitalized all field names in structs passed over RPC, and
|
|
||||||
// that the caller passes the address of the reply struct with &, not
|
|
||||||
// the struct itself.
|
|
||||||
func (rf *Raft) sendRequestVote(server int, args *RequestVoteArgs, reply *RequestVoteReply) bool {
|
|
||||||
ok := rf.peers[server].Call("Raft.RequestVote", args, reply)
|
|
||||||
return ok
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
// the service using Raft (e.g. a k/v server) wants to start
|
|
||||||
// agreement on the next command to be appended to Raft's log. if this
|
|
||||||
// server isn't the leader, returns false. otherwise start the
|
|
||||||
// agreement and return immediately. there is no guarantee that this
|
|
||||||
// command will ever be committed to the Raft log, since the leader
|
|
||||||
// may fail or lose an election. even if the Raft instance has been killed,
|
|
||||||
// this function should return gracefully.
|
|
||||||
//
|
|
||||||
// the first return value is the index that the command will appear at
|
|
||||||
// if it's ever committed. the second return value is the current
|
|
||||||
// term. the third return value is true if this server believes it is
|
|
||||||
// the leader.
|
|
||||||
func (rf *Raft) Start(command interface{}) (int, int, bool) {
|
|
||||||
index := -1
|
|
||||||
term := -1
|
|
||||||
isLeader := true
|
|
||||||
|
|
||||||
// Your code here (3B).
|
|
||||||
|
|
||||||
|
|
||||||
return index, term, isLeader
|
|
||||||
}
|
|
||||||
|
|
||||||
// the tester doesn't halt goroutines created by Raft after each test,
|
|
||||||
// but it does call the Kill() method. your code can use killed() to
|
|
||||||
// check whether Kill() has been called. the use of atomic avoids the
|
|
||||||
// need for a lock.
|
|
||||||
//
|
|
||||||
// the issue is that long-running goroutines use memory and may chew
|
|
||||||
// up CPU time, perhaps causing later tests to fail and generating
|
|
||||||
// confusing debug output. any goroutine with a long-running loop
|
|
||||||
// should call killed() to check whether it should stop.
|
|
||||||
func (rf *Raft) Kill() {
|
|
||||||
atomic.StoreInt32(&rf.dead, 1)
|
|
||||||
// Your code here, if desired.
|
|
||||||
}
|
|
||||||
|
|
||||||
func (rf *Raft) killed() bool {
|
|
||||||
z := atomic.LoadInt32(&rf.dead)
|
|
||||||
return z == 1
|
|
||||||
}
|
|
||||||
|
|
||||||
func (rf *Raft) ticker() {
|
|
||||||
for rf.killed() == false {
|
|
||||||
|
|
||||||
// Your code here (3A)
|
|
||||||
// Check if a leader election should be started.
|
|
||||||
|
|
||||||
|
|
||||||
// pause for a random amount of time between 50 and 350
|
|
||||||
// milliseconds.
|
|
||||||
ms := 50 + (rand.Int63() % 300)
|
|
||||||
time.Sleep(time.Duration(ms) * time.Millisecond)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// the service or tester wants to create a Raft server. the ports
|
|
||||||
// of all the Raft servers (including this one) are in peers[]. this
|
|
||||||
// server's port is peers[me]. all the servers' peers[] arrays
|
|
||||||
// have the same order. persister is a place for this server to
|
|
||||||
// save its persistent state, and also initially holds the most
|
|
||||||
// recent saved state, if any. applyCh is a channel on which the
|
|
||||||
// tester or service expects Raft to send ApplyMsg messages.
|
|
||||||
// Make() must return quickly, so it should start goroutines
|
|
||||||
// for any long-running work.
|
|
||||||
func Make(peers []*labrpc.ClientEnd, me int,
|
|
||||||
persister *Persister, applyCh chan ApplyMsg) *Raft {
|
|
||||||
rf := &Raft{}
|
|
||||||
rf.peers = peers
|
|
||||||
rf.persister = persister
|
|
||||||
rf.me = me
|
|
||||||
|
|
||||||
// Your initialization code here (3A, 3B, 3C).
|
|
||||||
|
|
||||||
// initialize from state persisted before a crash
|
|
||||||
rf.readPersist(persister.ReadRaftState())
|
|
||||||
|
|
||||||
// start ticker goroutine to start elections
|
|
||||||
go rf.ticker()
|
|
||||||
|
|
||||||
|
|
||||||
return rf
|
|
||||||
}
|
|
File diff suppressed because it is too large
Load Diff
@ -1,12 +0,0 @@
|
|||||||
package raft
|
|
||||||
|
|
||||||
import "log"
|
|
||||||
|
|
||||||
// Debugging
|
|
||||||
const Debug = false
|
|
||||||
|
|
||||||
func DPrintf(format string, a ...interface{}) {
|
|
||||||
if Debug {
|
|
||||||
log.Printf(format, a...)
|
|
||||||
}
|
|
||||||
}
|
|
Loading…
x
Reference in New Issue
Block a user