This commit is contained in:
Yun-Sheng Chang 2025-02-18 15:31:00 -05:00
parent c8e7d779c2
commit eacb273f95
8 changed files with 862 additions and 35 deletions

View File

@ -37,6 +37,7 @@ REFERENCE_FILES=(
src/tester1/group.go
src/tester1/persister.go
src/tester1/srv.go
src/tester1/annotation.go
# lab 4
src/kvraft1/rsm/rsm_test.go

View File

@ -2,8 +2,7 @@ package kvtest
import (
"fmt"
"io/ioutil"
//"log"
"os"
"sync"
"testing"
"time"
@ -46,7 +45,7 @@ func (log *OpLog) Read() []porcupine.Operation {
// absolute timestamps with `time.Now().UnixNano()` (which uses the wall
// clock), we measure time relative to `t0` using `time.Since(t0)`, which uses
// the monotonic clock
var t0 = time.Now()
var t0 = time.Unix(0, 0)
func Get(cfg *tester.Config, ck IKVClerk, key string, log *OpLog, cli int) (string, rpc.Tversion, rpc.Err) {
start := int64(time.Since(t0))
@ -84,14 +83,26 @@ func Put(cfg *tester.Config, ck IKVClerk, key string, value string, version rpc.
// Checks that the log of Clerk.Put's and Clerk.Get's is linearizable (see
// linearizability-faq.txt)
func checkPorcupine(t *testing.T, opLog *OpLog, nsec time.Duration) {
//log.Printf("oplog len %v %v", ts.oplog.Len(), ts.oplog)
func checkPorcupine(
t *testing.T, opLog *OpLog, annotations []porcupine.Annotation, nsec time.Duration,
) {
enabled := os.Getenv("VIS_ENABLE")
fpath := os.Getenv("VIS_FILE")
res, info := porcupine.CheckOperationsVerbose(models.KvModel, opLog.Read(), nsec)
if res == porcupine.Illegal {
file, err := ioutil.TempFile("", "porcupine-*.html")
if err != nil {
fmt.Printf("info: failed to create temp file for visualization")
var file *os.File
var err error
if fpath == "" {
// Save the vis file in a temporary file.
file, err = os.CreateTemp("", "porcupine-*.html")
} else {
file, err = os.OpenFile(fpath, os.O_RDWR | os.O_CREATE | os.O_TRUNC, 0644)
}
if err != nil {
fmt.Printf("info: failed to open visualization file %s (%v)\n", fpath, err)
} else if enabled != "never" {
// Don't produce visualization file if VIS_ENABLE is set to "never".
info.AddAnnotations(annotations)
err = porcupine.Visualize(models.KvModel, info, file)
if err != nil {
fmt.Printf("info: failed to write history visualization to %s\n", file.Name())
@ -103,6 +114,29 @@ func checkPorcupine(t *testing.T, opLog *OpLog, nsec time.Duration) {
} else if res == porcupine.Unknown {
fmt.Println("info: linearizability check timed out, assuming history is ok")
}
// The result is either legal or unknown.
if enabled == "always" {
var file *os.File
var err error
if fpath == "" {
// Save the vis file in a temporary file.
file, err = os.CreateTemp("", "porcupine-*.html")
} else {
file, err = os.OpenFile(fpath, os.O_RDWR | os.O_CREATE | os.O_TRUNC, 0644)
}
if err != nil {
fmt.Printf("info: failed to open visualization file %s (%v)\n", fpath, err)
return
}
info.AddAnnotations(annotations)
err = porcupine.Visualize(models.KvModel, info, file)
if err != nil {
fmt.Printf("info: failed to write history visualization to %s\n", file.Name())
} else {
fmt.Printf("info: wrote history visualization to %s\n", file.Name())
}
}
}
// Porcupine
@ -142,9 +176,13 @@ func (ts *Test) Put(ck IKVClerk, key string, value string, version rpc.Tversion,
}
func (ts *Test) CheckPorcupine() {
checkPorcupine(ts.t, ts.oplog, linearizabilityCheckTimeout)
ts.CheckPorcupineT(linearizabilityCheckTimeout)
}
func (ts *Test) CheckPorcupineT(nsec time.Duration) {
checkPorcupine(ts.t, ts.oplog, nsec)
// ts.RetrieveAnnotations() also clears the accumulated annotations so that
// the vis file containing client operations (generated here) won't be
// overridden by that without client operations (generated at cleanup time).
annotations := ts.RetrieveAnnotations()
checkPorcupine(ts.t, ts.oplog, annotations, nsec)
}

View File

@ -29,6 +29,7 @@ func TestInitialElection3A(t *testing.T) {
ts := makeTest(t, servers, true, false)
defer ts.cleanup()
tester.AnnotateTest("TestInitialElection3A", servers)
ts.Begin("Test (3A): initial election")
// is a leader elected?
@ -58,24 +59,28 @@ func TestReElection3A(t *testing.T) {
ts := makeTest(t, servers, true, false)
defer ts.cleanup()
tester.AnnotateTest("TestReElection3A", servers)
ts.Begin("Test (3A): election after network failure")
leader1 := ts.checkOneLeader()
// if the leader disconnects, a new one should be elected.
ts.g.DisconnectAll(leader1)
tester.AnnotateConnection(ts.g.GetConnected())
ts.checkOneLeader()
// if the old leader rejoins, that shouldn't
// disturb the new leader. and the old leader
// should switch to follower.
ts.g.ConnectOne(leader1)
tester.AnnotateConnection(ts.g.GetConnected())
leader2 := ts.checkOneLeader()
// if there's no quorum, no new leader should
// be elected.
ts.g.DisconnectAll(leader2)
ts.g.DisconnectAll((leader2 + 1) % servers)
tester.AnnotateConnection(ts.g.GetConnected())
time.Sleep(2 * RaftElectionTimeout)
// check that the one connected server
@ -84,10 +89,12 @@ func TestReElection3A(t *testing.T) {
// if a quorum arises, it should elect a leader.
ts.g.ConnectOne((leader2 + 1) % servers)
tester.AnnotateConnection(ts.g.GetConnected())
ts.checkOneLeader()
// re-join of last node shouldn't prevent leader from existing.
ts.g.ConnectOne(leader2)
tester.AnnotateConnection(ts.g.GetConnected())
ts.checkOneLeader()
}
@ -96,6 +103,7 @@ func TestManyElections3A(t *testing.T) {
ts := makeTest(t, servers, true, false)
defer ts.cleanup()
tester.AnnotateTest("TestManyElection3A", servers)
ts.Begin("Test (3A): multiple elections")
ts.checkOneLeader()
@ -109,6 +117,7 @@ func TestManyElections3A(t *testing.T) {
ts.g.DisconnectAll(i1)
ts.g.DisconnectAll(i2)
ts.g.DisconnectAll(i3)
tester.AnnotateConnection(ts.g.GetConnected())
// either the current leader should still be alive,
// or the remaining four should elect a new one.
@ -117,6 +126,7 @@ func TestManyElections3A(t *testing.T) {
ts.g.ConnectOne(i1)
ts.g.ConnectOne(i2)
ts.g.ConnectOne(i3)
tester.AnnotateConnection(ts.g.GetConnected())
}
ts.checkOneLeader()
}
@ -126,6 +136,7 @@ func TestBasicAgree3B(t *testing.T) {
ts := makeTest(t, servers, true, false)
defer ts.cleanup()
tester.AnnotateTest("TestBasicAgree3B", servers)
ts.Begin("Test (3B): basic agreement")
iters := 3
@ -149,6 +160,7 @@ func TestRPCBytes3B(t *testing.T) {
ts := makeTest(t, servers, true, false)
defer ts.cleanup()
tester.AnnotateTest("TestRPCBytes3B", servers)
ts.Begin("Test (3B): RPC byte count")
ts.one(99, servers, false)
@ -180,6 +192,7 @@ func TestFollowerFailure3B(t *testing.T) {
ts := makeTest(t, servers, true, false)
defer ts.cleanup()
tester.AnnotateTest("TestFollowerFailure3B", servers)
ts.Begin("Test (3B): test progressive failure of followers")
ts.one(101, servers, false)
@ -187,6 +200,7 @@ func TestFollowerFailure3B(t *testing.T) {
// disconnect one follower from the network.
leader1 := ts.checkOneLeader()
ts.g.DisconnectAll((leader1 + 1) % servers)
tester.AnnotateConnection(ts.g.GetConnected())
// the leader and remaining follower should be
// able to agree despite the disconnected follower.
@ -198,6 +212,7 @@ func TestFollowerFailure3B(t *testing.T) {
leader2 := ts.checkOneLeader()
ts.g.DisconnectAll((leader2 + 1) % servers)
ts.g.DisconnectAll((leader2 + 2) % servers)
tester.AnnotateConnection(ts.g.GetConnected())
// submit a command.
index, _, ok := ts.srvs[leader2].Raft().Start(104)
@ -211,11 +226,7 @@ func TestFollowerFailure3B(t *testing.T) {
time.Sleep(2 * RaftElectionTimeout)
// check that command 104 did not commit.
n, _ := ts.nCommitted(index)
if n > 0 {
t.Fatalf("%v committed but no majority", n)
}
ts.checkNoAgreement(index)
}
// test just failure of leaders.
@ -224,6 +235,7 @@ func TestLeaderFailure3B(t *testing.T) {
ts := makeTest(t, servers, true, false)
defer ts.cleanup()
tester.AnnotateTest("TestLeaderFailure3B", servers)
ts.Begin("Test (3B): test failure of leaders")
ts.one(101, servers, false)
@ -231,6 +243,7 @@ func TestLeaderFailure3B(t *testing.T) {
// disconnect the first leader.
leader1 := ts.checkOneLeader()
ts.g.DisconnectAll(leader1)
tester.AnnotateConnection(ts.g.GetConnected())
// the remaining followers should elect
// a new leader.
@ -241,6 +254,7 @@ func TestLeaderFailure3B(t *testing.T) {
// disconnect the new leader.
leader2 := ts.checkOneLeader()
ts.g.DisconnectAll(leader2)
tester.AnnotateConnection(ts.g.GetConnected())
// submit a command to each server.
for i := 0; i < servers; i++ {
@ -250,11 +264,7 @@ func TestLeaderFailure3B(t *testing.T) {
time.Sleep(2 * RaftElectionTimeout)
// check that command 104 did not commit.
n, _ := ts.nCommitted(4)
if n > 0 {
t.Fatalf("%v committed but no majority", n)
}
ts.checkNoAgreement(4)
}
// test that a follower participates after
@ -264,6 +274,7 @@ func TestFailAgree3B(t *testing.T) {
ts := makeTest(t, servers, true, false)
defer ts.cleanup()
tester.AnnotateTest("TestFailAgree3B", servers)
ts.Begin("Test (3B): agreement after follower reconnects")
ts.one(101, servers, false)
@ -271,6 +282,7 @@ func TestFailAgree3B(t *testing.T) {
// disconnect one follower from the network.
leader := ts.checkOneLeader()
ts.g.DisconnectAll((leader + 1) % servers)
tester.AnnotateConnection(ts.g.GetConnected())
// the leader and remaining follower should be
// able to agree despite the disconnected follower.
@ -282,6 +294,7 @@ func TestFailAgree3B(t *testing.T) {
// re-connect
ts.g.ConnectOne((leader + 1) % servers)
tester.AnnotateConnection(ts.g.GetConnected())
// the full set of servers should preserve
// previous agreements, and be able to agree
@ -289,7 +302,6 @@ func TestFailAgree3B(t *testing.T) {
ts.one(106, servers, true)
time.Sleep(RaftElectionTimeout)
ts.one(107, servers, true)
}
func TestFailNoAgree3B(t *testing.T) {
@ -297,6 +309,7 @@ func TestFailNoAgree3B(t *testing.T) {
ts := makeTest(t, servers, true, false)
defer ts.cleanup()
tester.AnnotateTest("TestFailNoAgree3B", servers)
ts.Begin("Test (3B): no agreement if too many followers disconnect")
ts.one(10, servers, false)
@ -306,6 +319,7 @@ func TestFailNoAgree3B(t *testing.T) {
ts.g.DisconnectAll((leader + 1) % servers)
ts.g.DisconnectAll((leader + 2) % servers)
ts.g.DisconnectAll((leader + 3) % servers)
tester.AnnotateConnection(ts.g.GetConnected())
index, _, ok := ts.srvs[leader].Raft().Start(20)
if ok != true {
@ -326,6 +340,7 @@ func TestFailNoAgree3B(t *testing.T) {
ts.g.ConnectOne((leader + 1) % servers)
ts.g.ConnectOne((leader + 2) % servers)
ts.g.ConnectOne((leader + 3) % servers)
tester.AnnotateConnection(ts.g.GetConnected())
// the disconnected majority may have chosen a leader from
// among their own ranks, forgetting index 2.
@ -339,7 +354,6 @@ func TestFailNoAgree3B(t *testing.T) {
}
ts.one(1000, servers, true)
}
func TestConcurrentStarts3B(t *testing.T) {
@ -347,6 +361,7 @@ func TestConcurrentStarts3B(t *testing.T) {
ts := makeTest(t, servers, true, false)
defer ts.cleanup()
tester.AnnotateTest("TestConcurrentStarts3B", servers)
ts.Begin("Test (3B): concurrent Start()s")
var success bool
@ -358,9 +373,15 @@ loop:
}
leader := ts.checkOneLeader()
textb := fmt.Sprintf("checking concurrent submission of commands (attempt %v)", try)
tester.AnnotateCheckerBegin(textb)
_, term, ok := ts.srvs[leader].Raft().Start(1)
despretry := "concurrent submission failed; retry"
if !ok {
// leader moved on really quickly
details := fmt.Sprintf("%v is no longer a leader", leader)
tester.AnnotateCheckerNeutral(despretry, details)
continue
}
@ -388,6 +409,9 @@ loop:
for j := 0; j < servers; j++ {
if t, _ := ts.srvs[j].Raft().GetState(); t != term {
// term changed -- can't expect low RPC counts
details := fmt.Sprintf("term of server %v changed from %v to %v",
j, term, t)
tester.AnnotateCheckerNeutral(despretry, details)
continue loop
}
}
@ -402,11 +426,17 @@ loop:
// so we can't expect all Start()s to
// have succeeded
failed = true
details := fmt.Sprintf(
"term changed while waiting for %v servers to commit index %v",
servers, index)
tester.AnnotateCheckerNeutral(despretry, details)
break
}
cmds = append(cmds, ix)
} else {
t.Fatalf("value %v is not an int", cmd)
details := fmt.Sprintf("value %v is not an int", cmd)
tester.AnnotateCheckerFailure("read ill-typed value", details)
t.Fatalf(details)
}
}
@ -428,7 +458,9 @@ loop:
}
}
if ok == false {
t.Fatalf("cmd %v missing in %v", x, cmds)
details := fmt.Sprintf("cmd %v missing in %v", x, cmds)
tester.AnnotateCheckerFailure("concurrent submission failed", details)
t.Fatalf(details)
}
}
@ -437,9 +469,14 @@ loop:
}
if !success {
tester.AnnotateCheckerFailure(
"agreement failed under concurrent submission",
"unable to reach agreement after 5 attempts")
t.Fatalf("term changed too often")
}
text := "agreement reached under concurrent submission"
tester.AnnotateCheckerSuccess(text, "OK")
}
func TestRejoin3B(t *testing.T) {
@ -447,6 +484,7 @@ func TestRejoin3B(t *testing.T) {
ts := makeTest(t, servers, true, false)
defer ts.cleanup()
tester.AnnotateTest("TestRejoin3B", servers)
ts.Begin("Test (3B): rejoin of partitioned leader")
ts.one(101, servers, true)
@ -454,11 +492,15 @@ func TestRejoin3B(t *testing.T) {
// leader network failure
leader1 := ts.checkOneLeader()
ts.g.DisconnectAll(leader1)
tester.AnnotateConnection(ts.g.GetConnected())
// make old leader try to agree on some entries
start := tester.GetAnnotateTimestamp()
ts.srvs[leader1].Raft().Start(102)
ts.srvs[leader1].Raft().Start(103)
ts.srvs[leader1].Raft().Start(104)
text := fmt.Sprintf("submitted commands [102 103 104] to %v", leader1)
tester.AnnotateInfoInterval(start, text, text)
// new leader commits, also for index=2
ts.one(103, 2, true)
@ -469,14 +511,15 @@ func TestRejoin3B(t *testing.T) {
// old leader connected again
ts.g.ConnectOne(leader1)
tester.AnnotateConnection(ts.g.GetConnected())
ts.one(104, 2, true)
// all together now
ts.g.ConnectOne(leader2)
tester.AnnotateConnection(ts.g.GetConnected())
ts.one(105, servers, true)
}
func TestBackup3B(t *testing.T) {
@ -484,6 +527,7 @@ func TestBackup3B(t *testing.T) {
ts := makeTest(t, servers, true, false)
defer ts.cleanup()
tester.AnnotateTest("TestBackup3B", servers)
ts.Begin("Test (3B): leader backs up quickly over incorrect follower logs")
ts.one(rand.Int(), servers, true)
@ -493,11 +537,15 @@ func TestBackup3B(t *testing.T) {
ts.g.DisconnectAll((leader1 + 2) % servers)
ts.g.DisconnectAll((leader1 + 3) % servers)
ts.g.DisconnectAll((leader1 + 4) % servers)
tester.AnnotateConnection(ts.g.GetConnected())
// submit lots of commands that won't commit
start := tester.GetAnnotateTimestamp()
for i := 0; i < 50; i++ {
ts.srvs[leader1].Raft().Start(rand.Int())
}
text := fmt.Sprintf("submitted 50 commands to %v", leader1)
tester.AnnotateInfoInterval(start, text, text)
time.Sleep(RaftElectionTimeout / 2)
@ -508,6 +556,7 @@ func TestBackup3B(t *testing.T) {
ts.g.ConnectOne((leader1 + 2) % servers)
ts.g.ConnectOne((leader1 + 3) % servers)
ts.g.ConnectOne((leader1 + 4) % servers)
tester.AnnotateConnection(ts.g.GetConnected())
// lots of successful commands to new group.
for i := 0; i < 50; i++ {
@ -521,11 +570,15 @@ func TestBackup3B(t *testing.T) {
other = (leader2 + 1) % servers
}
ts.g.DisconnectAll(other)
tester.AnnotateConnection(ts.g.GetConnected())
// lots more commands that won't commit
start = tester.GetAnnotateTimestamp()
for i := 0; i < 50; i++ {
ts.srvs[leader2].Raft().Start(rand.Int())
}
text = fmt.Sprintf("submitted 50 commands to %v", leader2)
tester.AnnotateInfoInterval(start, text, text)
time.Sleep(RaftElectionTimeout / 2)
@ -536,6 +589,7 @@ func TestBackup3B(t *testing.T) {
ts.g.ConnectOne((leader1 + 0) % servers)
ts.g.ConnectOne((leader1 + 1) % servers)
ts.g.ConnectOne(other)
tester.AnnotateConnection(ts.g.GetConnected())
// lots of successful commands to new group.
for i := 0; i < 50; i++ {
@ -546,6 +600,7 @@ func TestBackup3B(t *testing.T) {
for i := 0; i < servers; i++ {
ts.g.ConnectOne(i)
}
tester.AnnotateConnection(ts.g.GetConnected())
ts.one(rand.Int(), servers, true)
}
@ -554,6 +609,7 @@ func TestCount3B(t *testing.T) {
ts := makeTest(t, servers, true, false)
defer ts.cleanup()
tester.AnnotateTest("TestCount3B", servers)
ts.Begin("Test (3B): RPC counts aren't too high")
rpcs := func() (n int) {
@ -568,7 +624,9 @@ func TestCount3B(t *testing.T) {
total1 := rpcs()
if total1 > 30 || total1 < 1 {
t.Fatalf("too many or few RPCs (%v) to elect initial leader\n", total1)
text := fmt.Sprintf("too many or few RPCs (%v) to elect initial leader", total1)
tester.AnnotateCheckerFailure(text, text)
t.Fatalf("%s", text)
}
var total2 int
@ -581,14 +639,20 @@ loop:
}
leader = ts.checkOneLeader()
textb := fmt.Sprintf("checking reasonable RPC counts for agreement (attempt %v)", try)
tester.AnnotateCheckerBegin(textb)
total1 = rpcs()
iters := 10
starti, term, ok := ts.srvs[leader].Raft().Start(1)
despretry := "submission failed; retry"
if !ok {
// leader moved on really quickly
details := fmt.Sprintf("%v is no longer a leader", leader)
tester.AnnotateCheckerNeutral(despretry, details)
continue
}
cmds := []int{}
for i := 1; i < iters+2; i++ {
x := int(rand.Int31())
@ -596,13 +660,23 @@ loop:
index1, term1, ok := ts.srvs[leader].Raft().Start(x)
if term1 != term {
// Term changed while starting
details := fmt.Sprintf("term of the leader (%v) changed from %v to %v",
leader, term, term1)
tester.AnnotateCheckerNeutral(despretry, details)
continue loop
}
if !ok {
// No longer the leader, so term has changed
details := fmt.Sprintf("%v is no longer a leader", leader)
tester.AnnotateCheckerNeutral(despretry, details)
continue loop
}
if starti+i != index1 {
desp := fmt.Sprintf("leader %v adds the command at the wrong index", leader)
details := fmt.Sprintf(
"the command should locate at index %v, but the leader puts it at %v",
starti + i, index1)
tester.AnnotateCheckerFailure(desp, details)
t.Fatalf("Start() failed")
}
}
@ -612,8 +686,16 @@ loop:
if ix, ok := cmd.(int); ok == false || ix != cmds[i-1] {
if ix == -1 {
// term changed -- try again
details := fmt.Sprintf(
"term changed while waiting for %v servers to commit index %v",
servers, starti + i)
tester.AnnotateCheckerNeutral(despretry, details)
continue loop
}
details := fmt.Sprintf(
"the command submitted at index %v in term %v is %v, but read %v",
starti + i, term, cmds[i - 1], cmd)
tester.AnnotateCheckerFailure("incorrect command committed", details)
t.Fatalf("wrong value %v committed for index %v; expected %v\n", cmd, starti+i, cmds)
}
}
@ -624,6 +706,8 @@ loop:
if t, _ := ts.srvs[j].Raft().GetState(); t != term {
// term changed -- can't expect low RPC counts
// need to keep going to update total2
details := fmt.Sprintf("term of server %v changed from %v to %v", j, term, t)
tester.AnnotateCheckerNeutral(despretry, details)
failed = true
}
total2 += ts.g.RpcCount(j)
@ -634,17 +718,29 @@ loop:
}
if total2-total1 > (iters+1+3)*3 {
details := fmt.Sprintf("number of RPC used for %v entries = %v > %v",
iters, total2-total1, (iters+1+3)*3)
tester.AnnotateCheckerFailure("used too many RPCs for agreement", details)
t.Fatalf("too many RPCs (%v) for %v entries\n", total2-total1, iters)
}
details := fmt.Sprintf("number of RPC used for %v entries = %v <= %v",
iters, total2-total1, (iters+1+3)*3)
tester.AnnotateCheckerSuccess("used reasonable number of RPCs for agreement", details)
success = true
break
}
if !success {
tester.AnnotateCheckerFailure(
"agreement failed",
"unable to reach agreement after 5 attempts")
t.Fatalf("term changed too often")
}
tester.AnnotateCheckerBegin("checking reasonable RPC counts in idle")
time.Sleep(RaftElectionTimeout)
total3 := 0
@ -653,9 +749,15 @@ loop:
}
if total3-total2 > 3*20 {
details := fmt.Sprintf("number of RPC used for 1 second of idleness = %v > %v",
total3-total2, 3 * 20)
tester.AnnotateCheckerFailure("used too many RPCs in idle", details)
t.Fatalf("too many RPCs (%v) for 1 second of idleness\n", total3-total2)
}
details := fmt.Sprintf("number of RPC used for 1 second of idleness = %v <= %v",
total3-total2, 3 * 20)
tester.AnnotateCheckerSuccess(
"used a reasonable number of RPCs in idle", details)
}
func TestPersist13C(t *testing.T) {
@ -663,36 +765,47 @@ func TestPersist13C(t *testing.T) {
ts := makeTest(t, servers, true, false)
defer ts.cleanup()
tester.AnnotateTest("TestPersist13C", servers)
ts.Begin("Test (3C): basic persistence")
ts.one(11, servers, true)
ts.g.Shutdown()
tester.AnnotateShutdownAll()
ts.g.StartServers()
tester.AnnotateRestartAll()
ts.one(12, servers, true)
leader1 := ts.checkOneLeader()
ts.g.ShutdownServer(leader1)
tester.AnnotateShutdown([]int{leader1})
ts.restart(leader1)
tester.AnnotateRestart([]int{leader1})
ts.one(13, servers, true)
leader2 := ts.checkOneLeader()
ts.g.ShutdownServer(leader2)
tester.AnnotateShutdown([]int{leader2})
ts.one(14, servers-1, true)
ts.restart(leader2)
tester.AnnotateRestart([]int{leader2})
tester.AnnotateCheckerBegin("wait for all servers to commit until index 4")
ts.wait(4, servers, -1) // wait for leader2 to join before killing i3
tester.AnnotateCheckerSuccess("all committed until index 4", "OK")
i3 := (ts.checkOneLeader() + 1) % servers
ts.g.ShutdownServer(i3)
tester.AnnotateShutdown([]int{i3})
ts.one(15, servers-1, true)
ts.restart(i3)
tester.AnnotateRestart([]int{i3})
ts.one(16, servers, true)
}
@ -702,6 +815,7 @@ func TestPersist23C(t *testing.T) {
ts := makeTest(t, servers, true, false)
defer ts.cleanup()
tester.AnnotateTest("TestPersist23C", servers)
ts.Begin("Test (3C): more persistence")
index := 1
@ -713,6 +827,7 @@ func TestPersist23C(t *testing.T) {
ts.g.ShutdownServer((leader1 + 1) % servers)
ts.g.ShutdownServer((leader1 + 2) % servers)
tester.AnnotateShutdown([]int{(leader1 + 1) % servers, (leader1 + 2) % servers})
ts.one(10+index, servers-2, true)
index++
@ -720,19 +835,25 @@ func TestPersist23C(t *testing.T) {
ts.g.ShutdownServer((leader1 + 0) % servers)
ts.g.ShutdownServer((leader1 + 3) % servers)
ts.g.ShutdownServer((leader1 + 4) % servers)
tester.AnnotateShutdown([]int{
(leader1 + 0) % servers, (leader1 + 3) % servers, (leader1 + 4) % servers,
})
ts.restart((leader1 + 1) % servers)
ts.restart((leader1 + 2) % servers)
tester.AnnotateRestart([]int{(leader1 + 1) % servers, (leader1 + 2) % servers})
time.Sleep(RaftElectionTimeout)
ts.restart((leader1 + 3) % servers)
tester.AnnotateRestart([]int{(leader1 + 3) % servers})
ts.one(10+index, servers-2, true)
index++
ts.restart((leader1 + 4) % servers)
ts.restart((leader1 + 0) % servers)
tester.AnnotateRestart([]int{(leader1 + 4) % servers, (leader1 + 0) % servers})
}
ts.one(1000, servers, true)
@ -743,23 +864,29 @@ func TestPersist33C(t *testing.T) {
ts := makeTest(t, servers, true, false)
defer ts.cleanup()
tester.AnnotateTest("TestPersist33C", servers)
ts.Begin("Test (3C): partitioned leader and one follower crash, leader restarts")
ts.one(101, 3, true)
leader := ts.checkOneLeader()
ts.g.DisconnectAll((leader + 2) % servers)
tester.AnnotateConnection(ts.g.GetConnected())
ts.one(102, 2, true)
ts.g.ShutdownServer((leader + 0) % servers)
ts.g.ShutdownServer((leader + 1) % servers)
tester.AnnotateShutdown([]int{(leader + 0) % servers, (leader + 1) % servers})
ts.restart((leader + 2) % servers)
ts.restart((leader + 0) % servers)
tester.AnnotateRestart([]int{(leader + 2) % servers, (leader + 0) % servers})
tester.AnnotateConnection(ts.g.GetConnected())
ts.one(103, 2, true)
ts.restart((leader + 1) % servers)
tester.AnnotateRestart([]int{(leader + 1) % servers})
ts.one(104, servers, true)
}
@ -777,6 +904,7 @@ func TestFigure83C(t *testing.T) {
ts := makeTest(t, servers, true, false)
defer ts.cleanup()
tester.AnnotateTest("TestFigure83C", servers)
ts.Begin("Test (3C): Figure 8")
ts.one(rand.Int(), 1, true)
@ -786,8 +914,11 @@ func TestFigure83C(t *testing.T) {
leader := -1
for i := 0; i < servers; i++ {
if ts.srvs[i].Raft() != nil {
_, _, ok := ts.srvs[i].Raft().Start(rand.Int())
cmd := rand.Int()
_, _, ok := ts.srvs[i].Raft().Start(cmd)
if ok {
text := fmt.Sprintf("submitted command %v to server %v", cmd, i)
tester.AnnotateInfo(text, text)
leader = i
}
}
@ -803,6 +934,7 @@ func TestFigure83C(t *testing.T) {
if leader != -1 {
ts.g.ShutdownServer(leader)
tester.AnnotateShutdown([]int{leader})
nup -= 1
}
@ -810,6 +942,7 @@ func TestFigure83C(t *testing.T) {
s := rand.Int() % servers
if ts.srvs[s].Raft() == nil {
ts.restart(s)
tester.AnnotateRestart([]int{s})
nup += 1
}
}
@ -820,9 +953,9 @@ func TestFigure83C(t *testing.T) {
ts.restart(i)
}
}
tester.AnnotateRestartAll()
ts.one(rand.Int(), servers, true)
}
func TestUnreliableAgree3C(t *testing.T) {
@ -830,6 +963,7 @@ func TestUnreliableAgree3C(t *testing.T) {
ts := makeTest(t, servers, false, false)
defer ts.cleanup()
tester.AnnotateTest("TestUnreliableAgree3C", servers)
ts.Begin("Test (3C): unreliable agreement")
var wg sync.WaitGroup
@ -858,6 +992,7 @@ func TestFigure8Unreliable3C(t *testing.T) {
ts := makeTest(t, servers, false, false)
defer ts.cleanup()
tester.AnnotateTest("TestFigure8Unreliable3C", servers)
ts.Begin("Test (3C): Figure 8 (unreliable)")
ts.one(rand.Int()%10000, 1, true)
@ -869,7 +1004,12 @@ func TestFigure8Unreliable3C(t *testing.T) {
}
leader := -1
for i := 0; i < servers; i++ {
_, _, ok := ts.srvs[i].Raft().Start(rand.Int() % 10000)
cmd := rand.Int() % 10000
_, _, ok := ts.srvs[i].Raft().Start(cmd)
if ok {
text := fmt.Sprintf("submitted command %v to server %v", cmd, i)
tester.AnnotateInfo(text, text)
}
if ok && ts.g.IsConnected(i) {
leader = i
}
@ -885,6 +1025,7 @@ func TestFigure8Unreliable3C(t *testing.T) {
if leader != -1 && (rand.Int()%1000) < int(RaftElectionTimeout/time.Millisecond)/2 {
ts.g.DisconnectAll(leader)
tester.AnnotateConnection(ts.g.GetConnected())
nup -= 1
}
@ -892,6 +1033,7 @@ func TestFigure8Unreliable3C(t *testing.T) {
s := rand.Int() % servers
if !ts.g.IsConnected(s) {
ts.g.ConnectOne(s)
tester.AnnotateConnection(ts.g.GetConnected())
nup += 1
}
}
@ -902,9 +1044,9 @@ func TestFigure8Unreliable3C(t *testing.T) {
ts.g.ConnectOne(i)
}
}
tester.AnnotateConnection(ts.g.GetConnected())
ts.one(rand.Int()%10000, servers, true)
}
func internalChurn(t *testing.T, reliable bool) {
@ -914,8 +1056,10 @@ func internalChurn(t *testing.T, reliable bool) {
defer ts.cleanup()
if ts.IsReliable() {
tester.AnnotateTest("TestReliableChurn3C", servers)
ts.Begin("Test (3C): churn")
} else {
tester.AnnotateTest("TestUnreliableChurn3C", servers)
ts.Begin("Test (3C): unreliable churn")
}
@ -968,6 +1112,7 @@ func internalChurn(t *testing.T, reliable bool) {
ret = values
}
startcli := tester.GetAnnotateTimestamp()
ncli := 3
cha := []chan []int{}
for i := 0; i < ncli; i++ {
@ -979,20 +1124,24 @@ func internalChurn(t *testing.T, reliable bool) {
if (rand.Int() % 1000) < 200 {
i := rand.Int() % servers
ts.g.DisconnectAll(i)
tester.AnnotateConnection(ts.g.GetConnected())
}
if (rand.Int() % 1000) < 500 {
i := rand.Int() % servers
if ts.srvs[i].raft == nil {
ts.restart(i)
tester.AnnotateRestart([]int{i})
}
ts.g.ConnectOne(i)
tester.AnnotateConnection(ts.g.GetConnected())
}
if (rand.Int() % 1000) < 200 {
i := rand.Int() % servers
if ts.srvs[i].raft != nil {
ts.g.ShutdownServer(i)
tester.AnnotateShutdown([]int{i})
}
}
@ -1011,9 +1160,14 @@ func internalChurn(t *testing.T, reliable bool) {
}
ts.g.ConnectOne(i)
}
tester.AnnotateRestartAll()
tester.AnnotateConnection(ts.g.GetConnected())
atomic.StoreInt32(&stop, 1)
textcli := fmt.Sprintf("%v clients submitting commands concurrently", ncli)
tester.AnnotateInfoInterval(startcli, textcli, textcli)
tester.AnnotateCheckerBegin("checking if any client has failed")
values := []int{}
for i := 0; i < ncli; i++ {
vv := <-cha[i]
@ -1022,6 +1176,7 @@ func internalChurn(t *testing.T, reliable bool) {
}
values = append(values, vv...)
}
tester.AnnotateCheckerSuccess("none of the clients have failed", "OK")
time.Sleep(RaftElectionTimeout)
@ -1033,10 +1188,14 @@ func internalChurn(t *testing.T, reliable bool) {
if vi, ok := v.(int); ok {
really = append(really, vi)
} else {
text := fmt.Sprintf("committed value %v is not an integer", v)
tester.AnnotateCheckerFailure(text, text)
t.Fatalf("not an int")
}
}
tester.AnnotateCheckerBegin(
"checking if committed values observed by the clients remain in the log")
for _, v1 := range values {
ok := false
for _, v2 := range really {
@ -1048,7 +1207,7 @@ func internalChurn(t *testing.T, reliable bool) {
ts.t.Fatalf("didn't find a value")
}
}
tester.AnnotateCheckerSuccess("committed values remain in the log", "OK")
}
func TestReliableChurn3C(t *testing.T) {
@ -1069,6 +1228,8 @@ func snapcommon(t *testing.T, name string, disconnect bool, reliable bool, crash
ts := makeTest(t, servers, reliable, true)
defer ts.cleanup()
// Inconsistent with other test cases, but don't want to change API.
tester.AnnotateTest(name, servers)
ts.Begin(name)
ts.one(rand.Int(), servers, true)
@ -1084,18 +1245,23 @@ func snapcommon(t *testing.T, name string, disconnect bool, reliable bool, crash
if disconnect {
ts.g.DisconnectAll(victim)
tester.AnnotateConnection(ts.g.GetConnected())
ts.one(rand.Int(), servers-1, true)
}
if crash {
ts.g.ShutdownServer(victim)
tester.AnnotateShutdown([]int{victim})
ts.one(rand.Int(), servers-1, true)
}
// perhaps send enough to get a snapshot
start := tester.GetAnnotateTimestamp()
nn := (SnapShotInterval / 2) + (rand.Int() % SnapShotInterval)
for i := 0; i < nn; i++ {
ts.srvs[sender].Raft().Start(rand.Int())
}
text := fmt.Sprintf("submitting %v commands to %v", nn, sender)
tester.AnnotateInfoInterval(start, text, text)
// let applier threads catch up with the Start()'s
if disconnect == false && crash == false {
@ -1114,11 +1280,13 @@ func snapcommon(t *testing.T, name string, disconnect bool, reliable bool, crash
// reconnect a follower, who maybe behind and
// needs to rceive a snapshot to catch up.
ts.g.ConnectOne(victim)
tester.AnnotateConnection(ts.g.GetConnected())
ts.one(rand.Int(), servers, true)
leader1 = ts.checkOneLeader()
}
if crash {
ts.restart(victim)
tester.AnnotateRestart([]int{victim})
ts.one(rand.Int(), servers, true)
leader1 = ts.checkOneLeader()
}
@ -1155,6 +1323,7 @@ func TestSnapshotAllCrash3D(t *testing.T) {
ts := makeTest(t, servers, false, true)
defer ts.cleanup()
tester.AnnotateTest("TestSnapshotAllCrash3D", servers)
ts.Begin("Test (3D): crash and restart all servers")
ts.one(rand.Int(), servers, true)
@ -1170,11 +1339,15 @@ func TestSnapshotAllCrash3D(t *testing.T) {
// crash all
ts.g.Shutdown()
tester.AnnotateShutdownAll()
ts.g.StartServers()
tester.AnnotateRestartAll()
index2 := ts.one(rand.Int(), servers, true)
if index2 < index1+1 {
t.Fatalf("index decreased from %v to %v", index1, index2)
msg := fmt.Sprintf("index decreased from %v to %v", index1, index2)
tester.AnnotateCheckerFailure("incorrect behavior: index decreased", msg)
t.Fatalf(msg)
}
}
}
@ -1186,6 +1359,7 @@ func TestSnapshotInit3D(t *testing.T) {
ts := makeTest(t, servers, false, true)
defer ts.cleanup()
tester.AnnotateTest("TestSnapshotInit3D", servers)
ts.Begin("Test (3D): snapshot initialization after crash")
ts.one(rand.Int(), servers, true)
@ -1196,13 +1370,17 @@ func TestSnapshotInit3D(t *testing.T) {
}
ts.g.Shutdown()
tester.AnnotateShutdownAll()
ts.g.StartServers()
tester.AnnotateRestartAll()
// a single op, to get something to be written back to persistent storage.
ts.one(rand.Int(), servers, true)
ts.g.Shutdown()
tester.AnnotateShutdownAll()
ts.g.StartServers()
tester.AnnotateRestartAll()
// do another op to trigger potential bug
ts.one(rand.Int(), servers, true)

View File

@ -51,6 +51,7 @@ func newRfsrv(ts *Test, srv int, ends []*labrpc.ClientEnd, persister *tester.Per
// ideally Raft should send it up on applyCh...
err := s.ingestSnap(snapshot, -1)
if err != "" {
tester.AnnotateCheckerFailureBeforeExit("failed to ingest snapshot", err)
ts.t.Fatal(err)
}
}
@ -106,6 +107,7 @@ func (rs *rfsrv) applier(applyCh chan raftapi.ApplyMsg) {
err_msg = fmt.Sprintf("server %v apply out of order %v", rs.me, m.CommandIndex)
}
if err_msg != "" {
tester.AnnotateCheckerFailureBeforeExit("apply error", err_msg)
log.Fatalf("apply error: %v", err_msg)
rs.applyErr = err_msg
// keep reading after error so that Raft doesn't block
@ -149,12 +151,18 @@ func (rs *rfsrv) applierSnap(applyCh chan raftapi.ApplyMsg) {
xlog = append(xlog, rs.logs[j])
}
e.Encode(xlog)
start := tester.GetAnnotateTimestamp()
rs.raft.Snapshot(m.CommandIndex, w.Bytes())
details := fmt.Sprintf(
"snapshot created after applying the command at index %v",
m.CommandIndex)
tester.AnnotateInfoInterval(start, "snapshot created", details)
}
} else {
// Ignore other types of ApplyMsg.
}
if err_msg != "" {
tester.AnnotateCheckerFailureBeforeExit("apply error", err_msg)
log.Fatalf("apply error: %v", err_msg)
rs.applyErr = err_msg
// keep reading after error so that Raft doesn't block
@ -169,6 +177,7 @@ func (rs *rfsrv) ingestSnap(snapshot []byte, index int) string {
defer rs.mu.Unlock()
if snapshot == nil {
tester.AnnotateCheckerFailureBeforeExit("failed to ingest snapshot", "nil snapshot")
log.Fatalf("nil snapshot")
return "nil snapshot"
}
@ -178,6 +187,8 @@ func (rs *rfsrv) ingestSnap(snapshot []byte, index int) string {
var xlog []any
if d.Decode(&lastIncludedIndex) != nil ||
d.Decode(&xlog) != nil {
text := "failed to decode snapshot"
tester.AnnotateCheckerFailureBeforeExit(text, text)
log.Fatalf("snapshot decode error")
return "snapshot Decode() error"
}

View File

@ -62,6 +62,7 @@ func (ts *Test) restart(i int) {
}
func (ts *Test) checkOneLeader() int {
tester.AnnotateCheckerBegin("checking for a single leader")
for iters := 0; iters < 10; iters++ {
ms := 450 + (rand.Int63() % 100)
time.Sleep(time.Duration(ms) * time.Millisecond)
@ -78,6 +79,8 @@ func (ts *Test) checkOneLeader() int {
lastTermWithLeader := -1
for term, leaders := range leaders {
if len(leaders) > 1 {
details := fmt.Sprintf("multiple leaders in term %v = %v", term, leaders)
tester.AnnotateCheckerFailure("multiple leaders", details)
ts.Fatalf("term %d has %d (>1) leaders", term, len(leaders))
}
if term > lastTermWithLeader {
@ -86,14 +89,20 @@ func (ts *Test) checkOneLeader() int {
}
if len(leaders) != 0 {
details := fmt.Sprintf("leader in term %v = %v",
lastTermWithLeader, leaders[lastTermWithLeader][0])
tester.AnnotateCheckerSuccess(details, details)
return leaders[lastTermWithLeader][0]
}
}
details := fmt.Sprintf("unable to find a leader")
tester.AnnotateCheckerFailure("no leader", details)
ts.Fatalf("expected one leader, got none")
return -1
}
func (ts *Test) checkTerms() int {
tester.AnnotateCheckerBegin("checking term agreement")
term := -1
for i := 0; i < ts.n; i++ {
if ts.g.IsConnected(i) {
@ -101,10 +110,15 @@ func (ts *Test) checkTerms() int {
if term == -1 {
term = xterm
} else if term != xterm {
details := fmt.Sprintf("node ids -> terms = { %v -> %v; %v -> %v }",
i - 1, term, i, xterm)
tester.AnnotateCheckerFailure("term disagreed", details)
ts.Fatalf("servers disagree on term")
}
}
}
details := fmt.Sprintf("term = %v", term)
tester.AnnotateCheckerSuccess("term agreed", details)
return term
}
@ -134,14 +148,32 @@ func (ts *Test) checkLogs(i int, m raftapi.ApplyMsg) (string, bool) {
// check that none of the connected servers
// thinks it is the leader.
func (ts *Test) checkNoLeader() {
tester.AnnotateCheckerBegin("checking no unexpected leader among connected servers")
for i := 0; i < ts.n; i++ {
if ts.g.IsConnected(i) {
_, is_leader := ts.srvs[i].GetState()
if is_leader {
ts.Fatalf("expected no leader among connected servers, but %v claims to be leader", i)
details := fmt.Sprintf("leader = %v", i)
tester.AnnotateCheckerFailure("unexpected leader found", details)
ts.Fatalf(details)
}
}
}
tester.AnnotateCheckerSuccess("no unexpected leader", "no unexpected leader")
}
func (ts *Test) checkNoAgreement(index int) {
text := fmt.Sprintf("checking no unexpected agreement at index %v", index)
tester.AnnotateCheckerBegin(text)
n, _ := ts.nCommitted(index)
if n > 0 {
desp := fmt.Sprintf("unexpected agreement at index %v", index)
details := fmt.Sprintf("%v server(s) commit incorrectly index", n)
tester.AnnotateCheckerFailure(desp, details)
ts.Fatalf("%v committed but no majority", n)
}
desp := fmt.Sprintf("no unexpected agreement at index %v", index)
tester.AnnotateCheckerSuccess(desp, "OK")
}
// how many servers think a log entry is committed?
@ -153,6 +185,7 @@ func (ts *Test) nCommitted(index int) (int, any) {
var cmd any = nil
for _, rs := range ts.srvs {
if rs.applyErr != "" {
tester.AnnotateCheckerFailure("apply error", rs.applyErr)
ts.t.Fatal(rs.applyErr)
}
@ -160,8 +193,10 @@ func (ts *Test) nCommitted(index int) (int, any) {
if ok {
if count > 0 && cmd != cmd1 {
ts.Fatalf("committed values do not match: index %v, %v, %v",
text := fmt.Sprintf("committed values at index %v do not match (%v != %v)",
index, cmd, cmd1)
tester.AnnotateCheckerFailure("unmatched committed values", text)
ts.Fatalf(text)
}
count += 1
cmd = cmd1
@ -183,6 +218,16 @@ func (ts *Test) nCommitted(index int) (int, any) {
// if retry==false, calls Start() only once, in order
// to simplify the early Lab 3B tests.
func (ts *Test) one(cmd any, expectedServers int, retry bool) int {
var textretry string
if retry {
textretry = "with"
} else {
textretry = "without"
}
textcmd := fmt.Sprintf("%v", cmd)
textb := fmt.Sprintf("checking agreement of %.8s by at least %v servers %v retry",
textcmd, expectedServers, textretry)
tester.AnnotateCheckerBegin(textb)
t0 := time.Now()
starts := 0
for time.Since(t0).Seconds() < 10 && ts.checkFinished() == false {
@ -214,12 +259,16 @@ func (ts *Test) one(cmd any, expectedServers int, retry bool) int {
// committed
if cmd1 == cmd {
// and it was the command we submitted.
desp := fmt.Sprintf("agreement of %.8s reached", textcmd)
tester.AnnotateCheckerSuccess(desp, "OK")
return index
}
}
time.Sleep(20 * time.Millisecond)
}
if retry == false {
desp := fmt.Sprintf("agreement of %.8s failed", textcmd)
tester.AnnotateCheckerFailure(desp, "failed after submitting command")
ts.Fatalf("one(%v) failed to reach agreement", cmd)
}
} else {
@ -227,6 +276,8 @@ func (ts *Test) one(cmd any, expectedServers int, retry bool) int {
}
}
if ts.checkFinished() == false {
desp := fmt.Sprintf("agreement of %.8s failed", textcmd)
tester.AnnotateCheckerFailure(desp, "failed after 10-second timeout")
ts.Fatalf("one(%v) failed to reach agreement", cmd)
}
return -1
@ -262,6 +313,10 @@ func (ts *Test) wait(index int, n int, startTerm int) any {
}
nd, cmd := ts.nCommitted(index)
if nd < n {
desp := fmt.Sprintf("less than %v servers commit index %v", n, index)
details := fmt.Sprintf(
"only %v (< %v) servers commit index %v at term %v", nd, n, index, startTerm)
tester.AnnotateCheckerFailure(desp, details)
ts.Fatalf("only %d decided for index %d; wanted %d",
nd, index, n)
}

535
src/tester1/annotation.go Normal file
View File

@ -0,0 +1,535 @@
package tester
import (
"sync"
"os"
"os/signal"
"fmt"
"time"
"strings"
"slices"
"github.com/anishathalye/porcupine"
"6.5840/models1"
)
///
/// Public interface.
///
type Annotation struct {
mu *sync.Mutex
annotations []porcupine.Annotation
continuous map[string]Continuous
}
type Continuous struct {
start int64
desp string
details string
bgcolor string
}
type FrameworkInfo struct {
mu *sync.Mutex
nservers int
connected []bool
crashed []bool
ckbegin CheckerBegin
}
type CheckerBegin struct {
ts int64
details string
}
// Using global variable feels disturbing, but also can't figure out a better
// way to support user-level annotations. An alternative would be passing an
// Annotation object to the start-up function of servers and clients, but that
// doesn't feel better.
//
// One potential problem with using a global Annotation object is that when
// running multiple test cases, some zombie threads in previous test cases could
// interfere the current one. An ad-hoc fix at the user level would be adding
// annotations only if the killed flag on the server is not set.
var annotation *Annotation = mkAnnotation()
var unit struct{} = captureSignal()
var finfo *FrameworkInfo
const (
COLOR_INFO string = "#FAFAFA"
COLOR_NEUTRAL string = "#FFECB3"
COLOR_SUCCESS string = "#C8E6C9"
COLOR_FAILURE string = "#FFCDD2"
COLOR_FAULT string = "#B3E5FC"
COLOR_USER string = "#FFF176"
)
const (
TAG_CHECKER string = "$ Checker"
TAG_PARTITION string = "$ Failure"
TAG_INFO string = "$ Test Info"
)
func (cfg *Config) RetrieveAnnotations() []porcupine.Annotation{
annotations := annotation.retrieve()
return annotations
}
func AnnotatePointColor(
tag, desp, details, bgcolor string,
) {
annotation.annotatePointColor(tag, desp, details, bgcolor)
}
func GetAnnotateTimestamp() int64 {
return timestamp()
}
func AnnotateIntervalColor(
tag string, start int64, desp, details, bgcolor string,
) {
annotation.annotateIntervalColor(tag, start, desp, details, bgcolor)
}
func AnnotateContinuousColor(tag, desp, details, bgcolor string) {
annotation.annotateContinuousColor(tag, desp, details, bgcolor)
}
func AnnotateContinuousEnd(tag string) {
annotation.annotateContinuousEnd(tag)
}
// Used by users.
func AnnotatePoint(tag, desp, details string) {
annotation.annotatePointColor(tag, desp, details, COLOR_USER)
}
func AnnotateInterval(tag string, start int64, desp, details string) {
annotation.annotateIntervalColor(tag, start, desp, details, COLOR_USER)
}
func AnnotateContinuous(tag, desp, details string) {
annotation.annotateContinuousColor(tag, desp, details, COLOR_USER)
}
// Used by test framework.
func AnnotateInfo(desp, details string) {
AnnotatePointColor(TAG_INFO, desp, details, COLOR_INFO)
}
func AnnotateInfoInterval(start int64, desp, details string) {
AnnotateIntervalColor(TAG_INFO, start, desp, details, COLOR_INFO)
}
func AnnotateTest(desp string, nservers int) {
details := fmt.Sprintf("%s (%d servers)", desp, nservers)
finfo = mkFrameworkInfo(nservers)
annotation.clear()
AnnotateInfo(details, details)
}
func AnnotateCheckerBegin(details string) {
finfo.mu.Lock()
defer finfo.mu.Unlock()
finfo.ckbegin = CheckerBegin{
ts: timestamp(),
details: details,
}
}
func AnnotateCheckerEnd(desp, details, color string) {
finfo.mu.Lock()
defer finfo.mu.Unlock()
ckbegin := finfo.ckbegin
if ckbegin.ts == 0 {
// Annotate as a point-in-time if the begin timestamp is not set.
AnnotatePointColor(TAG_CHECKER, desp, details, color)
return
}
// Annotate as an interval if the begin timestamp is set.
d := fmt.Sprintf("%s: %s", ckbegin.details, details)
AnnotateIntervalColor(TAG_CHECKER, ckbegin.ts, desp, d, color)
// Reset the checker begin timestamp.
ckbegin.ts = 0
}
func AnnotateCheckerSuccess(desp, details string) {
AnnotateCheckerEnd(desp, details, COLOR_SUCCESS)
}
func AnnotateCheckerFailure(desp, details string) {
AnnotateCheckerEnd(desp, details, COLOR_FAILURE)
}
func AnnotateCheckerNeutral(desp, details string) {
AnnotateCheckerEnd(desp, details, COLOR_NEUTRAL)
}
// Used before log.Fatalf
func AnnotateCheckerFailureBeforeExit(desp, details string) {
AnnotateCheckerFailure(desp, details)
annotation.cleanup(true, "test failed")
}
// Two functions to annotate partitions: AnnotateConnection and
// AnnotateTwoPartitions. The connected field of ServerGrp (in group.go) is
// precise if and only if the ServerGrp.Partition is not used. Thus, we use the
// latter when ServerGrp.Partition is involved, and the former otherwise.
func AnnotateConnection(connection []bool) {
finfo.mu.Lock()
defer finfo.mu.Unlock()
if slices.Equal(finfo.connected, connection) {
// Nothing to do if the connection is unchanged.
return
}
copy(finfo.connected, connection)
annotateFault()
}
func annotateFault() {
trues := make([]bool, finfo.nservers)
for id := range(trues) {
trues[id] = true
}
falses := make([]bool, finfo.nservers)
if slices.Equal(trues, finfo.connected) && slices.Equal(falses, finfo.crashed) {
// No annotation when no partitions and no crashes.
AnnotateContinuousEnd(TAG_PARTITION)
return
}
// Now, each disconnected server sits in its own partition, connected
// servers in one partition; crahsed servers indicated at the end.
conn := make([]int, 0)
crashes := make([]int, 0)
var builder strings.Builder
builder.WriteString("partition = ")
for id, connected := range(finfo.connected) {
if finfo.crashed[id] {
crashes = append(crashes, id)
continue
}
if connected {
conn = append(conn, id)
} else {
builder.WriteString(fmt.Sprintf("[%v] ", id))
}
}
if len(conn) > 0 {
builder.WriteString(fmt.Sprintf("%v", conn))
}
if len(crashes) > 0 {
builder.WriteString(fmt.Sprintf(" / crash = %v", crashes))
}
text := builder.String()
AnnotateContinuousColor(TAG_PARTITION, text, text, COLOR_FAULT)
}
func AnnotateTwoPartitions(p1 []int, p2 []int) {
// A bit hard to check whether the partition actually changes, so just
// annotate on every invocation.
// TODO
text := fmt.Sprintf("%v %v", p1, p2)
AnnotateContinuousColor(TAG_PARTITION, text, text, COLOR_FAULT)
}
func AnnotateShutdown(servers []int) {
finfo.mu.Lock()
defer finfo.mu.Unlock()
changed := false
for _, id := range(servers) {
if !finfo.crashed[id] {
changed = true
}
finfo.crashed[id] = true
}
if !changed {
// Nothing to do if the set of crashed servers is unchanged.
return
}
annotateFault()
}
func AnnotateShutdownAll() {
finfo.mu.Lock()
n := finfo.nservers
finfo.mu.Unlock()
servers := make([]int, n)
for i := range(servers) {
servers[i] = i
}
AnnotateShutdown(servers)
}
func AnnotateRestart(servers []int) {
finfo.mu.Lock()
defer finfo.mu.Unlock()
changed := false
for _, id := range(servers) {
if finfo.crashed[id] {
changed = true
}
finfo.crashed[id] = false
}
if !changed {
// Nothing to do if the set of crashed servers is unchanged.
return
}
annotateFault()
}
func AnnotateRestartAll() {
finfo.mu.Lock()
n := finfo.nservers
finfo.mu.Unlock()
servers := make([]int, n)
for i := range(servers) {
servers[i] = i
}
AnnotateRestart(servers)
}
///
/// Internal.
///
func timestamp() int64 {
return int64(time.Since(time.Unix(0, 0)))
}
func (an *Annotation) retrieve() []porcupine.Annotation {
an.mu.Lock()
x := an.annotations
t := timestamp()
for tag, cont := range(an.continuous) {
a := porcupine.Annotation{
Tag: tag,
Start: cont.start,
End: t,
Description: cont.desp,
Details: cont.details,
BackgroundColor: cont.bgcolor,
}
x = append(x, a)
}
an.annotations = make([]porcupine.Annotation, 0)
an.continuous = make(map[string]Continuous)
an.mu.Unlock()
return x
}
func (an *Annotation) clear() {
an.mu.Lock()
an.annotations = make([]porcupine.Annotation, 0)
an.continuous = make(map[string]Continuous)
an.mu.Unlock()
}
func (an *Annotation) annotatePointColor(
tag, desp, details, bgcolor string,
) {
an.mu.Lock()
t := timestamp()
a := porcupine.Annotation{
Tag: tag,
Start: t,
Description: desp,
Details: details,
BackgroundColor: bgcolor,
}
an.annotations = append(an.annotations, a)
an.mu.Unlock()
}
func (an *Annotation) annotateIntervalColor(
tag string, start int64, desp, details, bgcolor string,
) {
an.mu.Lock()
a := porcupine.Annotation{
Tag: tag,
Start: start,
End: timestamp(),
Description: desp,
Details: details,
BackgroundColor: bgcolor,
}
an.annotations = append(an.annotations, a)
an.mu.Unlock()
}
func (an *Annotation) annotateContinuousColor(
tag, desp, details, bgcolor string,
) {
an.mu.Lock()
defer an.mu.Unlock()
cont, ok := an.continuous[tag]
if !ok {
// The first continuous annotation for tag. Simply add it to the
// continuous map.
an.continuous[tag] = Continuous{
start: timestamp(),
desp: desp,
details: details,
bgcolor: bgcolor,
}
return
}
// Subsequent continuous annotation for tag. Concretize the previous
// annotation and add this one to the continuous map.
t := timestamp()
aprev := porcupine.Annotation{
Tag: tag,
Start: cont.start,
End: t,
Description: cont.desp,
Details: cont.details,
BackgroundColor: cont.bgcolor,
}
an.annotations = append(an.annotations, aprev)
an.continuous[tag] = Continuous{
// XXX: If the start timestamp of an event is too closer to the end
// timestamp of another event, Porcupine seems to overlap the two
// events. We add a delta (1000) as a workaround, but remove this once
// this issue is resolved.
start: t + 1000,
desp: desp,
details: details,
bgcolor: bgcolor,
}
}
func (an *Annotation) annotateContinuousEnd(tag string) {
an.mu.Lock()
defer an.mu.Unlock()
cont, ok := an.continuous[tag]
if !ok {
// Nothing to end since there's no on-going continuous annotation for
// tag.
}
// End the on-going continuous annotation for tag.
t := timestamp()
aprev := porcupine.Annotation{
Tag: tag,
Start: cont.start,
End: t,
Description: cont.desp,
Details: cont.details,
BackgroundColor: cont.bgcolor,
}
an.annotations = append(an.annotations, aprev)
delete(an.continuous, tag)
}
func (an *Annotation) cleanup(failed bool, end string) {
enabled := os.Getenv("VIS_ENABLE")
if enabled == "never" || (!failed && enabled != "always") {
// Simply clean up the annotations without producing the vis file if
// VIS_ENABLE is set to "never", or if the test passes and VIS_ENABLE is
// not set to "always".
an.clear()
return
}
annotations := an.retrieve()
if len(annotations) == 0 {
// Skip empty annotations.
return
}
// XXX: Make the last annotation a interval one to work around Porcupine's
// issue. Consider removing this once the issue is fixed.
t := timestamp()
aend := porcupine.Annotation{
Tag: TAG_INFO,
Start: t,
End: t + 1000,
Description: end,
Details: end,
BackgroundColor: COLOR_INFO,
}
annotations = append(annotations, aend)
fpath := os.Getenv("VIS_FILE")
var file *os.File
var err error
if fpath == "" {
// Save the vis file in a temporary file.
file, err = os.CreateTemp("", "porcupine-*.html")
} else {
file, err = os.OpenFile(fpath, os.O_RDWR | os.O_CREATE | os.O_TRUNC, 0644)
}
if err != nil {
fmt.Printf("info: failed to open visualization file %s (%v)\n", fpath, err)
return
}
// Create a fresh linearization info without any client operations and use
// models.KvModel simply as a placeholder.
info := porcupine.LinearizationInfo{}
info.AddAnnotations(annotations)
porcupine.Visualize(models.KvModel, info, file)
fmt.Printf("info: wrote visualization to %s\n", file.Name())
}
func mkAnnotation() *Annotation {
an := Annotation{
mu: new(sync.Mutex),
annotations: make([]porcupine.Annotation, 0),
continuous: make(map[string]Continuous),
}
return &an
}
func mkFrameworkInfo(nservers int) *FrameworkInfo {
conn := make([]bool, nservers)
for id := range(conn) {
conn[id] = true
}
finfo := FrameworkInfo{
mu: new(sync.Mutex),
nservers: nservers,
connected: conn,
crashed: make([]bool, nservers),
}
return &finfo
}
func captureSignal() struct{} {
// Capture SIGINT to visualize on interruption.
c := make(chan os.Signal, 1)
signal.Notify(c, os.Interrupt)
go func(){
for range c {
annotation.cleanup(true, "interrupted")
os.Exit(1)
}
}()
return struct{}{}
}

View File

@ -79,6 +79,11 @@ func (cfg *Config) Cleanup() {
cfg.Clnts.cleanup()
cfg.Groups.cleanup()
cfg.net.Cleanup()
if cfg.t.Failed() {
annotation.cleanup(true, "test failed")
} else {
annotation.cleanup(false, "test passed")
}
cfg.CheckTimeout()
}

View File

@ -194,6 +194,10 @@ func (sg *ServerGrp) IsConnected(i int) bool {
return sg.connected[i]
}
func (sg *ServerGrp) GetConnected() []bool {
return sg.connected
}
// Maximum log size across all servers
func (sg *ServerGrp) LogSize() int {
logsize := 0