From eacb273f958ee26f2801d9554f149fc6d076aa45 Mon Sep 17 00:00:00 2001 From: Yun-Sheng Chang Date: Tue, 18 Feb 2025 15:31:00 -0500 Subject: [PATCH] update --- .check-build | 1 + src/kvtest1/porcupine.go | 58 ++++- src/raft1/raft_test.go | 224 ++++++++++++++-- src/raft1/server.go | 11 + src/raft1/test.go | 59 ++++- src/tester1/annotation.go | 535 ++++++++++++++++++++++++++++++++++++++ src/tester1/config.go | 5 + src/tester1/group.go | 4 + 8 files changed, 862 insertions(+), 35 deletions(-) create mode 100644 src/tester1/annotation.go diff --git a/.check-build b/.check-build index ec589b9..e41ee3b 100755 --- a/.check-build +++ b/.check-build @@ -37,6 +37,7 @@ REFERENCE_FILES=( src/tester1/group.go src/tester1/persister.go src/tester1/srv.go + src/tester1/annotation.go # lab 4 src/kvraft1/rsm/rsm_test.go diff --git a/src/kvtest1/porcupine.go b/src/kvtest1/porcupine.go index bcab1d3..75a96b2 100644 --- a/src/kvtest1/porcupine.go +++ b/src/kvtest1/porcupine.go @@ -2,8 +2,7 @@ package kvtest import ( "fmt" - "io/ioutil" - //"log" + "os" "sync" "testing" "time" @@ -46,7 +45,7 @@ func (log *OpLog) Read() []porcupine.Operation { // absolute timestamps with `time.Now().UnixNano()` (which uses the wall // clock), we measure time relative to `t0` using `time.Since(t0)`, which uses // the monotonic clock -var t0 = time.Now() +var t0 = time.Unix(0, 0) func Get(cfg *tester.Config, ck IKVClerk, key string, log *OpLog, cli int) (string, rpc.Tversion, rpc.Err) { start := int64(time.Since(t0)) @@ -84,14 +83,26 @@ func Put(cfg *tester.Config, ck IKVClerk, key string, value string, version rpc. // Checks that the log of Clerk.Put's and Clerk.Get's is linearizable (see // linearizability-faq.txt) -func checkPorcupine(t *testing.T, opLog *OpLog, nsec time.Duration) { - //log.Printf("oplog len %v %v", ts.oplog.Len(), ts.oplog) +func checkPorcupine( + t *testing.T, opLog *OpLog, annotations []porcupine.Annotation, nsec time.Duration, +) { + enabled := os.Getenv("VIS_ENABLE") + fpath := os.Getenv("VIS_FILE") res, info := porcupine.CheckOperationsVerbose(models.KvModel, opLog.Read(), nsec) if res == porcupine.Illegal { - file, err := ioutil.TempFile("", "porcupine-*.html") - if err != nil { - fmt.Printf("info: failed to create temp file for visualization") + var file *os.File + var err error + if fpath == "" { + // Save the vis file in a temporary file. + file, err = os.CreateTemp("", "porcupine-*.html") } else { + file, err = os.OpenFile(fpath, os.O_RDWR | os.O_CREATE | os.O_TRUNC, 0644) + } + if err != nil { + fmt.Printf("info: failed to open visualization file %s (%v)\n", fpath, err) + } else if enabled != "never" { + // Don't produce visualization file if VIS_ENABLE is set to "never". + info.AddAnnotations(annotations) err = porcupine.Visualize(models.KvModel, info, file) if err != nil { fmt.Printf("info: failed to write history visualization to %s\n", file.Name()) @@ -103,6 +114,29 @@ func checkPorcupine(t *testing.T, opLog *OpLog, nsec time.Duration) { } else if res == porcupine.Unknown { fmt.Println("info: linearizability check timed out, assuming history is ok") } + + // The result is either legal or unknown. + if enabled == "always" { + var file *os.File + var err error + if fpath == "" { + // Save the vis file in a temporary file. + file, err = os.CreateTemp("", "porcupine-*.html") + } else { + file, err = os.OpenFile(fpath, os.O_RDWR | os.O_CREATE | os.O_TRUNC, 0644) + } + if err != nil { + fmt.Printf("info: failed to open visualization file %s (%v)\n", fpath, err) + return + } + info.AddAnnotations(annotations) + err = porcupine.Visualize(models.KvModel, info, file) + if err != nil { + fmt.Printf("info: failed to write history visualization to %s\n", file.Name()) + } else { + fmt.Printf("info: wrote history visualization to %s\n", file.Name()) + } + } } // Porcupine @@ -142,9 +176,13 @@ func (ts *Test) Put(ck IKVClerk, key string, value string, version rpc.Tversion, } func (ts *Test) CheckPorcupine() { - checkPorcupine(ts.t, ts.oplog, linearizabilityCheckTimeout) + ts.CheckPorcupineT(linearizabilityCheckTimeout) } func (ts *Test) CheckPorcupineT(nsec time.Duration) { - checkPorcupine(ts.t, ts.oplog, nsec) + // ts.RetrieveAnnotations() also clears the accumulated annotations so that + // the vis file containing client operations (generated here) won't be + // overridden by that without client operations (generated at cleanup time). + annotations := ts.RetrieveAnnotations() + checkPorcupine(ts.t, ts.oplog, annotations, nsec) } diff --git a/src/raft1/raft_test.go b/src/raft1/raft_test.go index df573ec..d11b9d1 100644 --- a/src/raft1/raft_test.go +++ b/src/raft1/raft_test.go @@ -29,6 +29,7 @@ func TestInitialElection3A(t *testing.T) { ts := makeTest(t, servers, true, false) defer ts.cleanup() + tester.AnnotateTest("TestInitialElection3A", servers) ts.Begin("Test (3A): initial election") // is a leader elected? @@ -58,24 +59,28 @@ func TestReElection3A(t *testing.T) { ts := makeTest(t, servers, true, false) defer ts.cleanup() + tester.AnnotateTest("TestReElection3A", servers) ts.Begin("Test (3A): election after network failure") leader1 := ts.checkOneLeader() // if the leader disconnects, a new one should be elected. ts.g.DisconnectAll(leader1) + tester.AnnotateConnection(ts.g.GetConnected()) ts.checkOneLeader() // if the old leader rejoins, that shouldn't // disturb the new leader. and the old leader // should switch to follower. ts.g.ConnectOne(leader1) + tester.AnnotateConnection(ts.g.GetConnected()) leader2 := ts.checkOneLeader() // if there's no quorum, no new leader should // be elected. ts.g.DisconnectAll(leader2) ts.g.DisconnectAll((leader2 + 1) % servers) + tester.AnnotateConnection(ts.g.GetConnected()) time.Sleep(2 * RaftElectionTimeout) // check that the one connected server @@ -84,10 +89,12 @@ func TestReElection3A(t *testing.T) { // if a quorum arises, it should elect a leader. ts.g.ConnectOne((leader2 + 1) % servers) + tester.AnnotateConnection(ts.g.GetConnected()) ts.checkOneLeader() // re-join of last node shouldn't prevent leader from existing. ts.g.ConnectOne(leader2) + tester.AnnotateConnection(ts.g.GetConnected()) ts.checkOneLeader() } @@ -96,6 +103,7 @@ func TestManyElections3A(t *testing.T) { ts := makeTest(t, servers, true, false) defer ts.cleanup() + tester.AnnotateTest("TestManyElection3A", servers) ts.Begin("Test (3A): multiple elections") ts.checkOneLeader() @@ -109,6 +117,7 @@ func TestManyElections3A(t *testing.T) { ts.g.DisconnectAll(i1) ts.g.DisconnectAll(i2) ts.g.DisconnectAll(i3) + tester.AnnotateConnection(ts.g.GetConnected()) // either the current leader should still be alive, // or the remaining four should elect a new one. @@ -117,6 +126,7 @@ func TestManyElections3A(t *testing.T) { ts.g.ConnectOne(i1) ts.g.ConnectOne(i2) ts.g.ConnectOne(i3) + tester.AnnotateConnection(ts.g.GetConnected()) } ts.checkOneLeader() } @@ -126,6 +136,7 @@ func TestBasicAgree3B(t *testing.T) { ts := makeTest(t, servers, true, false) defer ts.cleanup() + tester.AnnotateTest("TestBasicAgree3B", servers) ts.Begin("Test (3B): basic agreement") iters := 3 @@ -149,6 +160,7 @@ func TestRPCBytes3B(t *testing.T) { ts := makeTest(t, servers, true, false) defer ts.cleanup() + tester.AnnotateTest("TestRPCBytes3B", servers) ts.Begin("Test (3B): RPC byte count") ts.one(99, servers, false) @@ -180,6 +192,7 @@ func TestFollowerFailure3B(t *testing.T) { ts := makeTest(t, servers, true, false) defer ts.cleanup() + tester.AnnotateTest("TestFollowerFailure3B", servers) ts.Begin("Test (3B): test progressive failure of followers") ts.one(101, servers, false) @@ -187,6 +200,7 @@ func TestFollowerFailure3B(t *testing.T) { // disconnect one follower from the network. leader1 := ts.checkOneLeader() ts.g.DisconnectAll((leader1 + 1) % servers) + tester.AnnotateConnection(ts.g.GetConnected()) // the leader and remaining follower should be // able to agree despite the disconnected follower. @@ -198,6 +212,7 @@ func TestFollowerFailure3B(t *testing.T) { leader2 := ts.checkOneLeader() ts.g.DisconnectAll((leader2 + 1) % servers) ts.g.DisconnectAll((leader2 + 2) % servers) + tester.AnnotateConnection(ts.g.GetConnected()) // submit a command. index, _, ok := ts.srvs[leader2].Raft().Start(104) @@ -211,11 +226,7 @@ func TestFollowerFailure3B(t *testing.T) { time.Sleep(2 * RaftElectionTimeout) // check that command 104 did not commit. - n, _ := ts.nCommitted(index) - if n > 0 { - t.Fatalf("%v committed but no majority", n) - } - + ts.checkNoAgreement(index) } // test just failure of leaders. @@ -224,6 +235,7 @@ func TestLeaderFailure3B(t *testing.T) { ts := makeTest(t, servers, true, false) defer ts.cleanup() + tester.AnnotateTest("TestLeaderFailure3B", servers) ts.Begin("Test (3B): test failure of leaders") ts.one(101, servers, false) @@ -231,6 +243,7 @@ func TestLeaderFailure3B(t *testing.T) { // disconnect the first leader. leader1 := ts.checkOneLeader() ts.g.DisconnectAll(leader1) + tester.AnnotateConnection(ts.g.GetConnected()) // the remaining followers should elect // a new leader. @@ -241,6 +254,7 @@ func TestLeaderFailure3B(t *testing.T) { // disconnect the new leader. leader2 := ts.checkOneLeader() ts.g.DisconnectAll(leader2) + tester.AnnotateConnection(ts.g.GetConnected()) // submit a command to each server. for i := 0; i < servers; i++ { @@ -250,11 +264,7 @@ func TestLeaderFailure3B(t *testing.T) { time.Sleep(2 * RaftElectionTimeout) // check that command 104 did not commit. - n, _ := ts.nCommitted(4) - if n > 0 { - t.Fatalf("%v committed but no majority", n) - } - + ts.checkNoAgreement(4) } // test that a follower participates after @@ -264,6 +274,7 @@ func TestFailAgree3B(t *testing.T) { ts := makeTest(t, servers, true, false) defer ts.cleanup() + tester.AnnotateTest("TestFailAgree3B", servers) ts.Begin("Test (3B): agreement after follower reconnects") ts.one(101, servers, false) @@ -271,6 +282,7 @@ func TestFailAgree3B(t *testing.T) { // disconnect one follower from the network. leader := ts.checkOneLeader() ts.g.DisconnectAll((leader + 1) % servers) + tester.AnnotateConnection(ts.g.GetConnected()) // the leader and remaining follower should be // able to agree despite the disconnected follower. @@ -282,6 +294,7 @@ func TestFailAgree3B(t *testing.T) { // re-connect ts.g.ConnectOne((leader + 1) % servers) + tester.AnnotateConnection(ts.g.GetConnected()) // the full set of servers should preserve // previous agreements, and be able to agree @@ -289,7 +302,6 @@ func TestFailAgree3B(t *testing.T) { ts.one(106, servers, true) time.Sleep(RaftElectionTimeout) ts.one(107, servers, true) - } func TestFailNoAgree3B(t *testing.T) { @@ -297,6 +309,7 @@ func TestFailNoAgree3B(t *testing.T) { ts := makeTest(t, servers, true, false) defer ts.cleanup() + tester.AnnotateTest("TestFailNoAgree3B", servers) ts.Begin("Test (3B): no agreement if too many followers disconnect") ts.one(10, servers, false) @@ -306,6 +319,7 @@ func TestFailNoAgree3B(t *testing.T) { ts.g.DisconnectAll((leader + 1) % servers) ts.g.DisconnectAll((leader + 2) % servers) ts.g.DisconnectAll((leader + 3) % servers) + tester.AnnotateConnection(ts.g.GetConnected()) index, _, ok := ts.srvs[leader].Raft().Start(20) if ok != true { @@ -326,6 +340,7 @@ func TestFailNoAgree3B(t *testing.T) { ts.g.ConnectOne((leader + 1) % servers) ts.g.ConnectOne((leader + 2) % servers) ts.g.ConnectOne((leader + 3) % servers) + tester.AnnotateConnection(ts.g.GetConnected()) // the disconnected majority may have chosen a leader from // among their own ranks, forgetting index 2. @@ -339,7 +354,6 @@ func TestFailNoAgree3B(t *testing.T) { } ts.one(1000, servers, true) - } func TestConcurrentStarts3B(t *testing.T) { @@ -347,6 +361,7 @@ func TestConcurrentStarts3B(t *testing.T) { ts := makeTest(t, servers, true, false) defer ts.cleanup() + tester.AnnotateTest("TestConcurrentStarts3B", servers) ts.Begin("Test (3B): concurrent Start()s") var success bool @@ -358,9 +373,15 @@ loop: } leader := ts.checkOneLeader() + textb := fmt.Sprintf("checking concurrent submission of commands (attempt %v)", try) + tester.AnnotateCheckerBegin(textb) _, term, ok := ts.srvs[leader].Raft().Start(1) + + despretry := "concurrent submission failed; retry" if !ok { // leader moved on really quickly + details := fmt.Sprintf("%v is no longer a leader", leader) + tester.AnnotateCheckerNeutral(despretry, details) continue } @@ -388,6 +409,9 @@ loop: for j := 0; j < servers; j++ { if t, _ := ts.srvs[j].Raft().GetState(); t != term { // term changed -- can't expect low RPC counts + details := fmt.Sprintf("term of server %v changed from %v to %v", + j, term, t) + tester.AnnotateCheckerNeutral(despretry, details) continue loop } } @@ -402,11 +426,17 @@ loop: // so we can't expect all Start()s to // have succeeded failed = true + details := fmt.Sprintf( + "term changed while waiting for %v servers to commit index %v", + servers, index) + tester.AnnotateCheckerNeutral(despretry, details) break } cmds = append(cmds, ix) } else { - t.Fatalf("value %v is not an int", cmd) + details := fmt.Sprintf("value %v is not an int", cmd) + tester.AnnotateCheckerFailure("read ill-typed value", details) + t.Fatalf(details) } } @@ -428,7 +458,9 @@ loop: } } if ok == false { - t.Fatalf("cmd %v missing in %v", x, cmds) + details := fmt.Sprintf("cmd %v missing in %v", x, cmds) + tester.AnnotateCheckerFailure("concurrent submission failed", details) + t.Fatalf(details) } } @@ -437,9 +469,14 @@ loop: } if !success { + tester.AnnotateCheckerFailure( + "agreement failed under concurrent submission", + "unable to reach agreement after 5 attempts") t.Fatalf("term changed too often") } + text := "agreement reached under concurrent submission" + tester.AnnotateCheckerSuccess(text, "OK") } func TestRejoin3B(t *testing.T) { @@ -447,6 +484,7 @@ func TestRejoin3B(t *testing.T) { ts := makeTest(t, servers, true, false) defer ts.cleanup() + tester.AnnotateTest("TestRejoin3B", servers) ts.Begin("Test (3B): rejoin of partitioned leader") ts.one(101, servers, true) @@ -454,11 +492,15 @@ func TestRejoin3B(t *testing.T) { // leader network failure leader1 := ts.checkOneLeader() ts.g.DisconnectAll(leader1) + tester.AnnotateConnection(ts.g.GetConnected()) // make old leader try to agree on some entries + start := tester.GetAnnotateTimestamp() ts.srvs[leader1].Raft().Start(102) ts.srvs[leader1].Raft().Start(103) ts.srvs[leader1].Raft().Start(104) + text := fmt.Sprintf("submitted commands [102 103 104] to %v", leader1) + tester.AnnotateInfoInterval(start, text, text) // new leader commits, also for index=2 ts.one(103, 2, true) @@ -469,14 +511,15 @@ func TestRejoin3B(t *testing.T) { // old leader connected again ts.g.ConnectOne(leader1) + tester.AnnotateConnection(ts.g.GetConnected()) ts.one(104, 2, true) // all together now ts.g.ConnectOne(leader2) + tester.AnnotateConnection(ts.g.GetConnected()) ts.one(105, servers, true) - } func TestBackup3B(t *testing.T) { @@ -484,6 +527,7 @@ func TestBackup3B(t *testing.T) { ts := makeTest(t, servers, true, false) defer ts.cleanup() + tester.AnnotateTest("TestBackup3B", servers) ts.Begin("Test (3B): leader backs up quickly over incorrect follower logs") ts.one(rand.Int(), servers, true) @@ -493,11 +537,15 @@ func TestBackup3B(t *testing.T) { ts.g.DisconnectAll((leader1 + 2) % servers) ts.g.DisconnectAll((leader1 + 3) % servers) ts.g.DisconnectAll((leader1 + 4) % servers) + tester.AnnotateConnection(ts.g.GetConnected()) // submit lots of commands that won't commit + start := tester.GetAnnotateTimestamp() for i := 0; i < 50; i++ { ts.srvs[leader1].Raft().Start(rand.Int()) } + text := fmt.Sprintf("submitted 50 commands to %v", leader1) + tester.AnnotateInfoInterval(start, text, text) time.Sleep(RaftElectionTimeout / 2) @@ -508,6 +556,7 @@ func TestBackup3B(t *testing.T) { ts.g.ConnectOne((leader1 + 2) % servers) ts.g.ConnectOne((leader1 + 3) % servers) ts.g.ConnectOne((leader1 + 4) % servers) + tester.AnnotateConnection(ts.g.GetConnected()) // lots of successful commands to new group. for i := 0; i < 50; i++ { @@ -521,11 +570,15 @@ func TestBackup3B(t *testing.T) { other = (leader2 + 1) % servers } ts.g.DisconnectAll(other) + tester.AnnotateConnection(ts.g.GetConnected()) // lots more commands that won't commit + start = tester.GetAnnotateTimestamp() for i := 0; i < 50; i++ { ts.srvs[leader2].Raft().Start(rand.Int()) } + text = fmt.Sprintf("submitted 50 commands to %v", leader2) + tester.AnnotateInfoInterval(start, text, text) time.Sleep(RaftElectionTimeout / 2) @@ -536,6 +589,7 @@ func TestBackup3B(t *testing.T) { ts.g.ConnectOne((leader1 + 0) % servers) ts.g.ConnectOne((leader1 + 1) % servers) ts.g.ConnectOne(other) + tester.AnnotateConnection(ts.g.GetConnected()) // lots of successful commands to new group. for i := 0; i < 50; i++ { @@ -546,6 +600,7 @@ func TestBackup3B(t *testing.T) { for i := 0; i < servers; i++ { ts.g.ConnectOne(i) } + tester.AnnotateConnection(ts.g.GetConnected()) ts.one(rand.Int(), servers, true) } @@ -554,6 +609,7 @@ func TestCount3B(t *testing.T) { ts := makeTest(t, servers, true, false) defer ts.cleanup() + tester.AnnotateTest("TestCount3B", servers) ts.Begin("Test (3B): RPC counts aren't too high") rpcs := func() (n int) { @@ -568,7 +624,9 @@ func TestCount3B(t *testing.T) { total1 := rpcs() if total1 > 30 || total1 < 1 { - t.Fatalf("too many or few RPCs (%v) to elect initial leader\n", total1) + text := fmt.Sprintf("too many or few RPCs (%v) to elect initial leader", total1) + tester.AnnotateCheckerFailure(text, text) + t.Fatalf("%s", text) } var total2 int @@ -581,14 +639,20 @@ loop: } leader = ts.checkOneLeader() + textb := fmt.Sprintf("checking reasonable RPC counts for agreement (attempt %v)", try) + tester.AnnotateCheckerBegin(textb) total1 = rpcs() iters := 10 starti, term, ok := ts.srvs[leader].Raft().Start(1) + despretry := "submission failed; retry" if !ok { // leader moved on really quickly + details := fmt.Sprintf("%v is no longer a leader", leader) + tester.AnnotateCheckerNeutral(despretry, details) continue } + cmds := []int{} for i := 1; i < iters+2; i++ { x := int(rand.Int31()) @@ -596,13 +660,23 @@ loop: index1, term1, ok := ts.srvs[leader].Raft().Start(x) if term1 != term { // Term changed while starting + details := fmt.Sprintf("term of the leader (%v) changed from %v to %v", + leader, term, term1) + tester.AnnotateCheckerNeutral(despretry, details) continue loop } if !ok { // No longer the leader, so term has changed + details := fmt.Sprintf("%v is no longer a leader", leader) + tester.AnnotateCheckerNeutral(despretry, details) continue loop } if starti+i != index1 { + desp := fmt.Sprintf("leader %v adds the command at the wrong index", leader) + details := fmt.Sprintf( + "the command should locate at index %v, but the leader puts it at %v", + starti + i, index1) + tester.AnnotateCheckerFailure(desp, details) t.Fatalf("Start() failed") } } @@ -612,8 +686,16 @@ loop: if ix, ok := cmd.(int); ok == false || ix != cmds[i-1] { if ix == -1 { // term changed -- try again + details := fmt.Sprintf( + "term changed while waiting for %v servers to commit index %v", + servers, starti + i) + tester.AnnotateCheckerNeutral(despretry, details) continue loop } + details := fmt.Sprintf( + "the command submitted at index %v in term %v is %v, but read %v", + starti + i, term, cmds[i - 1], cmd) + tester.AnnotateCheckerFailure("incorrect command committed", details) t.Fatalf("wrong value %v committed for index %v; expected %v\n", cmd, starti+i, cmds) } } @@ -624,6 +706,8 @@ loop: if t, _ := ts.srvs[j].Raft().GetState(); t != term { // term changed -- can't expect low RPC counts // need to keep going to update total2 + details := fmt.Sprintf("term of server %v changed from %v to %v", j, term, t) + tester.AnnotateCheckerNeutral(despretry, details) failed = true } total2 += ts.g.RpcCount(j) @@ -634,17 +718,29 @@ loop: } if total2-total1 > (iters+1+3)*3 { + details := fmt.Sprintf("number of RPC used for %v entries = %v > %v", + iters, total2-total1, (iters+1+3)*3) + tester.AnnotateCheckerFailure("used too many RPCs for agreement", details) t.Fatalf("too many RPCs (%v) for %v entries\n", total2-total1, iters) } + details := fmt.Sprintf("number of RPC used for %v entries = %v <= %v", + iters, total2-total1, (iters+1+3)*3) + tester.AnnotateCheckerSuccess("used reasonable number of RPCs for agreement", details) + success = true break } if !success { + tester.AnnotateCheckerFailure( + "agreement failed", + "unable to reach agreement after 5 attempts") t.Fatalf("term changed too often") } + tester.AnnotateCheckerBegin("checking reasonable RPC counts in idle") + time.Sleep(RaftElectionTimeout) total3 := 0 @@ -653,9 +749,15 @@ loop: } if total3-total2 > 3*20 { + details := fmt.Sprintf("number of RPC used for 1 second of idleness = %v > %v", + total3-total2, 3 * 20) + tester.AnnotateCheckerFailure("used too many RPCs in idle", details) t.Fatalf("too many RPCs (%v) for 1 second of idleness\n", total3-total2) } - + details := fmt.Sprintf("number of RPC used for 1 second of idleness = %v <= %v", + total3-total2, 3 * 20) + tester.AnnotateCheckerSuccess( + "used a reasonable number of RPCs in idle", details) } func TestPersist13C(t *testing.T) { @@ -663,36 +765,47 @@ func TestPersist13C(t *testing.T) { ts := makeTest(t, servers, true, false) defer ts.cleanup() + tester.AnnotateTest("TestPersist13C", servers) ts.Begin("Test (3C): basic persistence") ts.one(11, servers, true) ts.g.Shutdown() + tester.AnnotateShutdownAll() ts.g.StartServers() + tester.AnnotateRestartAll() ts.one(12, servers, true) leader1 := ts.checkOneLeader() ts.g.ShutdownServer(leader1) + tester.AnnotateShutdown([]int{leader1}) ts.restart(leader1) + tester.AnnotateRestart([]int{leader1}) ts.one(13, servers, true) leader2 := ts.checkOneLeader() ts.g.ShutdownServer(leader2) + tester.AnnotateShutdown([]int{leader2}) ts.one(14, servers-1, true) ts.restart(leader2) + tester.AnnotateRestart([]int{leader2}) + tester.AnnotateCheckerBegin("wait for all servers to commit until index 4") ts.wait(4, servers, -1) // wait for leader2 to join before killing i3 + tester.AnnotateCheckerSuccess("all committed until index 4", "OK") i3 := (ts.checkOneLeader() + 1) % servers ts.g.ShutdownServer(i3) + tester.AnnotateShutdown([]int{i3}) ts.one(15, servers-1, true) ts.restart(i3) + tester.AnnotateRestart([]int{i3}) ts.one(16, servers, true) } @@ -702,6 +815,7 @@ func TestPersist23C(t *testing.T) { ts := makeTest(t, servers, true, false) defer ts.cleanup() + tester.AnnotateTest("TestPersist23C", servers) ts.Begin("Test (3C): more persistence") index := 1 @@ -713,6 +827,7 @@ func TestPersist23C(t *testing.T) { ts.g.ShutdownServer((leader1 + 1) % servers) ts.g.ShutdownServer((leader1 + 2) % servers) + tester.AnnotateShutdown([]int{(leader1 + 1) % servers, (leader1 + 2) % servers}) ts.one(10+index, servers-2, true) index++ @@ -720,19 +835,25 @@ func TestPersist23C(t *testing.T) { ts.g.ShutdownServer((leader1 + 0) % servers) ts.g.ShutdownServer((leader1 + 3) % servers) ts.g.ShutdownServer((leader1 + 4) % servers) + tester.AnnotateShutdown([]int{ + (leader1 + 0) % servers, (leader1 + 3) % servers, (leader1 + 4) % servers, + }) ts.restart((leader1 + 1) % servers) ts.restart((leader1 + 2) % servers) + tester.AnnotateRestart([]int{(leader1 + 1) % servers, (leader1 + 2) % servers}) time.Sleep(RaftElectionTimeout) ts.restart((leader1 + 3) % servers) + tester.AnnotateRestart([]int{(leader1 + 3) % servers}) ts.one(10+index, servers-2, true) index++ ts.restart((leader1 + 4) % servers) ts.restart((leader1 + 0) % servers) + tester.AnnotateRestart([]int{(leader1 + 4) % servers, (leader1 + 0) % servers}) } ts.one(1000, servers, true) @@ -743,23 +864,29 @@ func TestPersist33C(t *testing.T) { ts := makeTest(t, servers, true, false) defer ts.cleanup() + tester.AnnotateTest("TestPersist33C", servers) ts.Begin("Test (3C): partitioned leader and one follower crash, leader restarts") ts.one(101, 3, true) leader := ts.checkOneLeader() ts.g.DisconnectAll((leader + 2) % servers) + tester.AnnotateConnection(ts.g.GetConnected()) ts.one(102, 2, true) ts.g.ShutdownServer((leader + 0) % servers) ts.g.ShutdownServer((leader + 1) % servers) + tester.AnnotateShutdown([]int{(leader + 0) % servers, (leader + 1) % servers}) ts.restart((leader + 2) % servers) ts.restart((leader + 0) % servers) + tester.AnnotateRestart([]int{(leader + 2) % servers, (leader + 0) % servers}) + tester.AnnotateConnection(ts.g.GetConnected()) ts.one(103, 2, true) ts.restart((leader + 1) % servers) + tester.AnnotateRestart([]int{(leader + 1) % servers}) ts.one(104, servers, true) } @@ -777,6 +904,7 @@ func TestFigure83C(t *testing.T) { ts := makeTest(t, servers, true, false) defer ts.cleanup() + tester.AnnotateTest("TestFigure83C", servers) ts.Begin("Test (3C): Figure 8") ts.one(rand.Int(), 1, true) @@ -786,8 +914,11 @@ func TestFigure83C(t *testing.T) { leader := -1 for i := 0; i < servers; i++ { if ts.srvs[i].Raft() != nil { - _, _, ok := ts.srvs[i].Raft().Start(rand.Int()) + cmd := rand.Int() + _, _, ok := ts.srvs[i].Raft().Start(cmd) if ok { + text := fmt.Sprintf("submitted command %v to server %v", cmd, i) + tester.AnnotateInfo(text, text) leader = i } } @@ -803,6 +934,7 @@ func TestFigure83C(t *testing.T) { if leader != -1 { ts.g.ShutdownServer(leader) + tester.AnnotateShutdown([]int{leader}) nup -= 1 } @@ -810,6 +942,7 @@ func TestFigure83C(t *testing.T) { s := rand.Int() % servers if ts.srvs[s].Raft() == nil { ts.restart(s) + tester.AnnotateRestart([]int{s}) nup += 1 } } @@ -820,9 +953,9 @@ func TestFigure83C(t *testing.T) { ts.restart(i) } } + tester.AnnotateRestartAll() ts.one(rand.Int(), servers, true) - } func TestUnreliableAgree3C(t *testing.T) { @@ -830,6 +963,7 @@ func TestUnreliableAgree3C(t *testing.T) { ts := makeTest(t, servers, false, false) defer ts.cleanup() + tester.AnnotateTest("TestUnreliableAgree3C", servers) ts.Begin("Test (3C): unreliable agreement") var wg sync.WaitGroup @@ -858,6 +992,7 @@ func TestFigure8Unreliable3C(t *testing.T) { ts := makeTest(t, servers, false, false) defer ts.cleanup() + tester.AnnotateTest("TestFigure8Unreliable3C", servers) ts.Begin("Test (3C): Figure 8 (unreliable)") ts.one(rand.Int()%10000, 1, true) @@ -869,7 +1004,12 @@ func TestFigure8Unreliable3C(t *testing.T) { } leader := -1 for i := 0; i < servers; i++ { - _, _, ok := ts.srvs[i].Raft().Start(rand.Int() % 10000) + cmd := rand.Int() % 10000 + _, _, ok := ts.srvs[i].Raft().Start(cmd) + if ok { + text := fmt.Sprintf("submitted command %v to server %v", cmd, i) + tester.AnnotateInfo(text, text) + } if ok && ts.g.IsConnected(i) { leader = i } @@ -885,6 +1025,7 @@ func TestFigure8Unreliable3C(t *testing.T) { if leader != -1 && (rand.Int()%1000) < int(RaftElectionTimeout/time.Millisecond)/2 { ts.g.DisconnectAll(leader) + tester.AnnotateConnection(ts.g.GetConnected()) nup -= 1 } @@ -892,6 +1033,7 @@ func TestFigure8Unreliable3C(t *testing.T) { s := rand.Int() % servers if !ts.g.IsConnected(s) { ts.g.ConnectOne(s) + tester.AnnotateConnection(ts.g.GetConnected()) nup += 1 } } @@ -902,9 +1044,9 @@ func TestFigure8Unreliable3C(t *testing.T) { ts.g.ConnectOne(i) } } + tester.AnnotateConnection(ts.g.GetConnected()) ts.one(rand.Int()%10000, servers, true) - } func internalChurn(t *testing.T, reliable bool) { @@ -914,8 +1056,10 @@ func internalChurn(t *testing.T, reliable bool) { defer ts.cleanup() if ts.IsReliable() { + tester.AnnotateTest("TestReliableChurn3C", servers) ts.Begin("Test (3C): churn") } else { + tester.AnnotateTest("TestUnreliableChurn3C", servers) ts.Begin("Test (3C): unreliable churn") } @@ -968,6 +1112,7 @@ func internalChurn(t *testing.T, reliable bool) { ret = values } + startcli := tester.GetAnnotateTimestamp() ncli := 3 cha := []chan []int{} for i := 0; i < ncli; i++ { @@ -979,20 +1124,24 @@ func internalChurn(t *testing.T, reliable bool) { if (rand.Int() % 1000) < 200 { i := rand.Int() % servers ts.g.DisconnectAll(i) + tester.AnnotateConnection(ts.g.GetConnected()) } if (rand.Int() % 1000) < 500 { i := rand.Int() % servers if ts.srvs[i].raft == nil { ts.restart(i) + tester.AnnotateRestart([]int{i}) } ts.g.ConnectOne(i) + tester.AnnotateConnection(ts.g.GetConnected()) } if (rand.Int() % 1000) < 200 { i := rand.Int() % servers if ts.srvs[i].raft != nil { ts.g.ShutdownServer(i) + tester.AnnotateShutdown([]int{i}) } } @@ -1011,9 +1160,14 @@ func internalChurn(t *testing.T, reliable bool) { } ts.g.ConnectOne(i) } + tester.AnnotateRestartAll() + tester.AnnotateConnection(ts.g.GetConnected()) atomic.StoreInt32(&stop, 1) + textcli := fmt.Sprintf("%v clients submitting commands concurrently", ncli) + tester.AnnotateInfoInterval(startcli, textcli, textcli) + tester.AnnotateCheckerBegin("checking if any client has failed") values := []int{} for i := 0; i < ncli; i++ { vv := <-cha[i] @@ -1022,6 +1176,7 @@ func internalChurn(t *testing.T, reliable bool) { } values = append(values, vv...) } + tester.AnnotateCheckerSuccess("none of the clients have failed", "OK") time.Sleep(RaftElectionTimeout) @@ -1033,10 +1188,14 @@ func internalChurn(t *testing.T, reliable bool) { if vi, ok := v.(int); ok { really = append(really, vi) } else { + text := fmt.Sprintf("committed value %v is not an integer", v) + tester.AnnotateCheckerFailure(text, text) t.Fatalf("not an int") } } + tester.AnnotateCheckerBegin( + "checking if committed values observed by the clients remain in the log") for _, v1 := range values { ok := false for _, v2 := range really { @@ -1048,7 +1207,7 @@ func internalChurn(t *testing.T, reliable bool) { ts.t.Fatalf("didn't find a value") } } - + tester.AnnotateCheckerSuccess("committed values remain in the log", "OK") } func TestReliableChurn3C(t *testing.T) { @@ -1069,6 +1228,8 @@ func snapcommon(t *testing.T, name string, disconnect bool, reliable bool, crash ts := makeTest(t, servers, reliable, true) defer ts.cleanup() + // Inconsistent with other test cases, but don't want to change API. + tester.AnnotateTest(name, servers) ts.Begin(name) ts.one(rand.Int(), servers, true) @@ -1084,18 +1245,23 @@ func snapcommon(t *testing.T, name string, disconnect bool, reliable bool, crash if disconnect { ts.g.DisconnectAll(victim) + tester.AnnotateConnection(ts.g.GetConnected()) ts.one(rand.Int(), servers-1, true) } if crash { ts.g.ShutdownServer(victim) + tester.AnnotateShutdown([]int{victim}) ts.one(rand.Int(), servers-1, true) } // perhaps send enough to get a snapshot + start := tester.GetAnnotateTimestamp() nn := (SnapShotInterval / 2) + (rand.Int() % SnapShotInterval) for i := 0; i < nn; i++ { ts.srvs[sender].Raft().Start(rand.Int()) } + text := fmt.Sprintf("submitting %v commands to %v", nn, sender) + tester.AnnotateInfoInterval(start, text, text) // let applier threads catch up with the Start()'s if disconnect == false && crash == false { @@ -1114,11 +1280,13 @@ func snapcommon(t *testing.T, name string, disconnect bool, reliable bool, crash // reconnect a follower, who maybe behind and // needs to rceive a snapshot to catch up. ts.g.ConnectOne(victim) + tester.AnnotateConnection(ts.g.GetConnected()) ts.one(rand.Int(), servers, true) leader1 = ts.checkOneLeader() } if crash { ts.restart(victim) + tester.AnnotateRestart([]int{victim}) ts.one(rand.Int(), servers, true) leader1 = ts.checkOneLeader() } @@ -1155,6 +1323,7 @@ func TestSnapshotAllCrash3D(t *testing.T) { ts := makeTest(t, servers, false, true) defer ts.cleanup() + tester.AnnotateTest("TestSnapshotAllCrash3D", servers) ts.Begin("Test (3D): crash and restart all servers") ts.one(rand.Int(), servers, true) @@ -1170,11 +1339,15 @@ func TestSnapshotAllCrash3D(t *testing.T) { // crash all ts.g.Shutdown() + tester.AnnotateShutdownAll() ts.g.StartServers() + tester.AnnotateRestartAll() index2 := ts.one(rand.Int(), servers, true) if index2 < index1+1 { - t.Fatalf("index decreased from %v to %v", index1, index2) + msg := fmt.Sprintf("index decreased from %v to %v", index1, index2) + tester.AnnotateCheckerFailure("incorrect behavior: index decreased", msg) + t.Fatalf(msg) } } } @@ -1186,6 +1359,7 @@ func TestSnapshotInit3D(t *testing.T) { ts := makeTest(t, servers, false, true) defer ts.cleanup() + tester.AnnotateTest("TestSnapshotInit3D", servers) ts.Begin("Test (3D): snapshot initialization after crash") ts.one(rand.Int(), servers, true) @@ -1196,13 +1370,17 @@ func TestSnapshotInit3D(t *testing.T) { } ts.g.Shutdown() + tester.AnnotateShutdownAll() ts.g.StartServers() + tester.AnnotateRestartAll() // a single op, to get something to be written back to persistent storage. ts.one(rand.Int(), servers, true) ts.g.Shutdown() + tester.AnnotateShutdownAll() ts.g.StartServers() + tester.AnnotateRestartAll() // do another op to trigger potential bug ts.one(rand.Int(), servers, true) diff --git a/src/raft1/server.go b/src/raft1/server.go index 2cac4a9..3f1967f 100644 --- a/src/raft1/server.go +++ b/src/raft1/server.go @@ -51,6 +51,7 @@ func newRfsrv(ts *Test, srv int, ends []*labrpc.ClientEnd, persister *tester.Per // ideally Raft should send it up on applyCh... err := s.ingestSnap(snapshot, -1) if err != "" { + tester.AnnotateCheckerFailureBeforeExit("failed to ingest snapshot", err) ts.t.Fatal(err) } } @@ -106,6 +107,7 @@ func (rs *rfsrv) applier(applyCh chan raftapi.ApplyMsg) { err_msg = fmt.Sprintf("server %v apply out of order %v", rs.me, m.CommandIndex) } if err_msg != "" { + tester.AnnotateCheckerFailureBeforeExit("apply error", err_msg) log.Fatalf("apply error: %v", err_msg) rs.applyErr = err_msg // keep reading after error so that Raft doesn't block @@ -149,12 +151,18 @@ func (rs *rfsrv) applierSnap(applyCh chan raftapi.ApplyMsg) { xlog = append(xlog, rs.logs[j]) } e.Encode(xlog) + start := tester.GetAnnotateTimestamp() rs.raft.Snapshot(m.CommandIndex, w.Bytes()) + details := fmt.Sprintf( + "snapshot created after applying the command at index %v", + m.CommandIndex) + tester.AnnotateInfoInterval(start, "snapshot created", details) } } else { // Ignore other types of ApplyMsg. } if err_msg != "" { + tester.AnnotateCheckerFailureBeforeExit("apply error", err_msg) log.Fatalf("apply error: %v", err_msg) rs.applyErr = err_msg // keep reading after error so that Raft doesn't block @@ -169,6 +177,7 @@ func (rs *rfsrv) ingestSnap(snapshot []byte, index int) string { defer rs.mu.Unlock() if snapshot == nil { + tester.AnnotateCheckerFailureBeforeExit("failed to ingest snapshot", "nil snapshot") log.Fatalf("nil snapshot") return "nil snapshot" } @@ -178,6 +187,8 @@ func (rs *rfsrv) ingestSnap(snapshot []byte, index int) string { var xlog []any if d.Decode(&lastIncludedIndex) != nil || d.Decode(&xlog) != nil { + text := "failed to decode snapshot" + tester.AnnotateCheckerFailureBeforeExit(text, text) log.Fatalf("snapshot decode error") return "snapshot Decode() error" } diff --git a/src/raft1/test.go b/src/raft1/test.go index 0eb1aac..8e2207f 100644 --- a/src/raft1/test.go +++ b/src/raft1/test.go @@ -62,6 +62,7 @@ func (ts *Test) restart(i int) { } func (ts *Test) checkOneLeader() int { + tester.AnnotateCheckerBegin("checking for a single leader") for iters := 0; iters < 10; iters++ { ms := 450 + (rand.Int63() % 100) time.Sleep(time.Duration(ms) * time.Millisecond) @@ -78,6 +79,8 @@ func (ts *Test) checkOneLeader() int { lastTermWithLeader := -1 for term, leaders := range leaders { if len(leaders) > 1 { + details := fmt.Sprintf("multiple leaders in term %v = %v", term, leaders) + tester.AnnotateCheckerFailure("multiple leaders", details) ts.Fatalf("term %d has %d (>1) leaders", term, len(leaders)) } if term > lastTermWithLeader { @@ -86,14 +89,20 @@ func (ts *Test) checkOneLeader() int { } if len(leaders) != 0 { + details := fmt.Sprintf("leader in term %v = %v", + lastTermWithLeader, leaders[lastTermWithLeader][0]) + tester.AnnotateCheckerSuccess(details, details) return leaders[lastTermWithLeader][0] } } + details := fmt.Sprintf("unable to find a leader") + tester.AnnotateCheckerFailure("no leader", details) ts.Fatalf("expected one leader, got none") return -1 } func (ts *Test) checkTerms() int { + tester.AnnotateCheckerBegin("checking term agreement") term := -1 for i := 0; i < ts.n; i++ { if ts.g.IsConnected(i) { @@ -101,10 +110,15 @@ func (ts *Test) checkTerms() int { if term == -1 { term = xterm } else if term != xterm { + details := fmt.Sprintf("node ids -> terms = { %v -> %v; %v -> %v }", + i - 1, term, i, xterm) + tester.AnnotateCheckerFailure("term disagreed", details) ts.Fatalf("servers disagree on term") } } } + details := fmt.Sprintf("term = %v", term) + tester.AnnotateCheckerSuccess("term agreed", details) return term } @@ -134,14 +148,32 @@ func (ts *Test) checkLogs(i int, m raftapi.ApplyMsg) (string, bool) { // check that none of the connected servers // thinks it is the leader. func (ts *Test) checkNoLeader() { + tester.AnnotateCheckerBegin("checking no unexpected leader among connected servers") for i := 0; i < ts.n; i++ { if ts.g.IsConnected(i) { _, is_leader := ts.srvs[i].GetState() if is_leader { - ts.Fatalf("expected no leader among connected servers, but %v claims to be leader", i) + details := fmt.Sprintf("leader = %v", i) + tester.AnnotateCheckerFailure("unexpected leader found", details) + ts.Fatalf(details) } } } + tester.AnnotateCheckerSuccess("no unexpected leader", "no unexpected leader") +} + +func (ts *Test) checkNoAgreement(index int) { + text := fmt.Sprintf("checking no unexpected agreement at index %v", index) + tester.AnnotateCheckerBegin(text) + n, _ := ts.nCommitted(index) + if n > 0 { + desp := fmt.Sprintf("unexpected agreement at index %v", index) + details := fmt.Sprintf("%v server(s) commit incorrectly index", n) + tester.AnnotateCheckerFailure(desp, details) + ts.Fatalf("%v committed but no majority", n) + } + desp := fmt.Sprintf("no unexpected agreement at index %v", index) + tester.AnnotateCheckerSuccess(desp, "OK") } // how many servers think a log entry is committed? @@ -153,6 +185,7 @@ func (ts *Test) nCommitted(index int) (int, any) { var cmd any = nil for _, rs := range ts.srvs { if rs.applyErr != "" { + tester.AnnotateCheckerFailure("apply error", rs.applyErr) ts.t.Fatal(rs.applyErr) } @@ -160,8 +193,10 @@ func (ts *Test) nCommitted(index int) (int, any) { if ok { if count > 0 && cmd != cmd1 { - ts.Fatalf("committed values do not match: index %v, %v, %v", + text := fmt.Sprintf("committed values at index %v do not match (%v != %v)", index, cmd, cmd1) + tester.AnnotateCheckerFailure("unmatched committed values", text) + ts.Fatalf(text) } count += 1 cmd = cmd1 @@ -183,6 +218,16 @@ func (ts *Test) nCommitted(index int) (int, any) { // if retry==false, calls Start() only once, in order // to simplify the early Lab 3B tests. func (ts *Test) one(cmd any, expectedServers int, retry bool) int { + var textretry string + if retry { + textretry = "with" + } else { + textretry = "without" + } + textcmd := fmt.Sprintf("%v", cmd) + textb := fmt.Sprintf("checking agreement of %.8s by at least %v servers %v retry", + textcmd, expectedServers, textretry) + tester.AnnotateCheckerBegin(textb) t0 := time.Now() starts := 0 for time.Since(t0).Seconds() < 10 && ts.checkFinished() == false { @@ -214,12 +259,16 @@ func (ts *Test) one(cmd any, expectedServers int, retry bool) int { // committed if cmd1 == cmd { // and it was the command we submitted. + desp := fmt.Sprintf("agreement of %.8s reached", textcmd) + tester.AnnotateCheckerSuccess(desp, "OK") return index } } time.Sleep(20 * time.Millisecond) } if retry == false { + desp := fmt.Sprintf("agreement of %.8s failed", textcmd) + tester.AnnotateCheckerFailure(desp, "failed after submitting command") ts.Fatalf("one(%v) failed to reach agreement", cmd) } } else { @@ -227,6 +276,8 @@ func (ts *Test) one(cmd any, expectedServers int, retry bool) int { } } if ts.checkFinished() == false { + desp := fmt.Sprintf("agreement of %.8s failed", textcmd) + tester.AnnotateCheckerFailure(desp, "failed after 10-second timeout") ts.Fatalf("one(%v) failed to reach agreement", cmd) } return -1 @@ -262,6 +313,10 @@ func (ts *Test) wait(index int, n int, startTerm int) any { } nd, cmd := ts.nCommitted(index) if nd < n { + desp := fmt.Sprintf("less than %v servers commit index %v", n, index) + details := fmt.Sprintf( + "only %v (< %v) servers commit index %v at term %v", nd, n, index, startTerm) + tester.AnnotateCheckerFailure(desp, details) ts.Fatalf("only %d decided for index %d; wanted %d", nd, index, n) } diff --git a/src/tester1/annotation.go b/src/tester1/annotation.go new file mode 100644 index 0000000..102cadf --- /dev/null +++ b/src/tester1/annotation.go @@ -0,0 +1,535 @@ +package tester + +import ( + "sync" + "os" + "os/signal" + "fmt" + "time" + "strings" + "slices" + "github.com/anishathalye/porcupine" + "6.5840/models1" +) + +/// +/// Public interface. +/// + +type Annotation struct { + mu *sync.Mutex + annotations []porcupine.Annotation + continuous map[string]Continuous +} + +type Continuous struct { + start int64 + desp string + details string + bgcolor string +} + +type FrameworkInfo struct { + mu *sync.Mutex + nservers int + connected []bool + crashed []bool + ckbegin CheckerBegin +} + +type CheckerBegin struct { + ts int64 + details string +} + +// Using global variable feels disturbing, but also can't figure out a better +// way to support user-level annotations. An alternative would be passing an +// Annotation object to the start-up function of servers and clients, but that +// doesn't feel better. +// +// One potential problem with using a global Annotation object is that when +// running multiple test cases, some zombie threads in previous test cases could +// interfere the current one. An ad-hoc fix at the user level would be adding +// annotations only if the killed flag on the server is not set. +var annotation *Annotation = mkAnnotation() +var unit struct{} = captureSignal() +var finfo *FrameworkInfo + +const ( + COLOR_INFO string = "#FAFAFA" + COLOR_NEUTRAL string = "#FFECB3" + COLOR_SUCCESS string = "#C8E6C9" + COLOR_FAILURE string = "#FFCDD2" + COLOR_FAULT string = "#B3E5FC" + COLOR_USER string = "#FFF176" +) + +const ( + TAG_CHECKER string = "$ Checker" + TAG_PARTITION string = "$ Failure" + TAG_INFO string = "$ Test Info" +) + +func (cfg *Config) RetrieveAnnotations() []porcupine.Annotation{ + annotations := annotation.retrieve() + return annotations +} + +func AnnotatePointColor( + tag, desp, details, bgcolor string, +) { + annotation.annotatePointColor(tag, desp, details, bgcolor) +} + +func GetAnnotateTimestamp() int64 { + return timestamp() +} + +func AnnotateIntervalColor( + tag string, start int64, desp, details, bgcolor string, +) { + annotation.annotateIntervalColor(tag, start, desp, details, bgcolor) +} + +func AnnotateContinuousColor(tag, desp, details, bgcolor string) { + annotation.annotateContinuousColor(tag, desp, details, bgcolor) +} + +func AnnotateContinuousEnd(tag string) { + annotation.annotateContinuousEnd(tag) +} + +// Used by users. + +func AnnotatePoint(tag, desp, details string) { + annotation.annotatePointColor(tag, desp, details, COLOR_USER) +} + +func AnnotateInterval(tag string, start int64, desp, details string) { + annotation.annotateIntervalColor(tag, start, desp, details, COLOR_USER) +} + +func AnnotateContinuous(tag, desp, details string) { + annotation.annotateContinuousColor(tag, desp, details, COLOR_USER) +} + +// Used by test framework. + +func AnnotateInfo(desp, details string) { + AnnotatePointColor(TAG_INFO, desp, details, COLOR_INFO) +} + +func AnnotateInfoInterval(start int64, desp, details string) { + AnnotateIntervalColor(TAG_INFO, start, desp, details, COLOR_INFO) +} + +func AnnotateTest(desp string, nservers int) { + details := fmt.Sprintf("%s (%d servers)", desp, nservers) + finfo = mkFrameworkInfo(nservers) + annotation.clear() + + AnnotateInfo(details, details) +} + +func AnnotateCheckerBegin(details string) { + finfo.mu.Lock() + defer finfo.mu.Unlock() + + finfo.ckbegin = CheckerBegin{ + ts: timestamp(), + details: details, + } +} + +func AnnotateCheckerEnd(desp, details, color string) { + finfo.mu.Lock() + defer finfo.mu.Unlock() + + ckbegin := finfo.ckbegin + + if ckbegin.ts == 0 { + // Annotate as a point-in-time if the begin timestamp is not set. + AnnotatePointColor(TAG_CHECKER, desp, details, color) + return + } + + // Annotate as an interval if the begin timestamp is set. + d := fmt.Sprintf("%s: %s", ckbegin.details, details) + AnnotateIntervalColor(TAG_CHECKER, ckbegin.ts, desp, d, color) + + // Reset the checker begin timestamp. + ckbegin.ts = 0 +} + +func AnnotateCheckerSuccess(desp, details string) { + AnnotateCheckerEnd(desp, details, COLOR_SUCCESS) +} + +func AnnotateCheckerFailure(desp, details string) { + AnnotateCheckerEnd(desp, details, COLOR_FAILURE) +} + +func AnnotateCheckerNeutral(desp, details string) { + AnnotateCheckerEnd(desp, details, COLOR_NEUTRAL) +} + +// Used before log.Fatalf +func AnnotateCheckerFailureBeforeExit(desp, details string) { + AnnotateCheckerFailure(desp, details) + annotation.cleanup(true, "test failed") +} + +// Two functions to annotate partitions: AnnotateConnection and +// AnnotateTwoPartitions. The connected field of ServerGrp (in group.go) is +// precise if and only if the ServerGrp.Partition is not used. Thus, we use the +// latter when ServerGrp.Partition is involved, and the former otherwise. +func AnnotateConnection(connection []bool) { + finfo.mu.Lock() + defer finfo.mu.Unlock() + + if slices.Equal(finfo.connected, connection) { + // Nothing to do if the connection is unchanged. + return + } + + copy(finfo.connected, connection) + + annotateFault() +} + +func annotateFault() { + trues := make([]bool, finfo.nservers) + for id := range(trues) { + trues[id] = true + } + falses := make([]bool, finfo.nservers) + if slices.Equal(trues, finfo.connected) && slices.Equal(falses, finfo.crashed) { + // No annotation when no partitions and no crashes. + AnnotateContinuousEnd(TAG_PARTITION) + return + } + + // Now, each disconnected server sits in its own partition, connected + // servers in one partition; crahsed servers indicated at the end. + conn := make([]int, 0) + crashes := make([]int, 0) + var builder strings.Builder + builder.WriteString("partition = ") + for id, connected := range(finfo.connected) { + if finfo.crashed[id] { + crashes = append(crashes, id) + continue + } + if connected { + conn = append(conn, id) + } else { + builder.WriteString(fmt.Sprintf("[%v] ", id)) + } + } + if len(conn) > 0 { + builder.WriteString(fmt.Sprintf("%v", conn)) + } + if len(crashes) > 0 { + builder.WriteString(fmt.Sprintf(" / crash = %v", crashes)) + } + text := builder.String() + AnnotateContinuousColor(TAG_PARTITION, text, text, COLOR_FAULT) +} + +func AnnotateTwoPartitions(p1 []int, p2 []int) { + // A bit hard to check whether the partition actually changes, so just + // annotate on every invocation. + // TODO + text := fmt.Sprintf("%v %v", p1, p2) + AnnotateContinuousColor(TAG_PARTITION, text, text, COLOR_FAULT) +} + +func AnnotateShutdown(servers []int) { + finfo.mu.Lock() + defer finfo.mu.Unlock() + + changed := false + for _, id := range(servers) { + if !finfo.crashed[id] { + changed = true + } + finfo.crashed[id] = true + } + + if !changed { + // Nothing to do if the set of crashed servers is unchanged. + return + } + + annotateFault() +} + +func AnnotateShutdownAll() { + finfo.mu.Lock() + n := finfo.nservers + finfo.mu.Unlock() + + servers := make([]int, n) + for i := range(servers) { + servers[i] = i + } + AnnotateShutdown(servers) +} + +func AnnotateRestart(servers []int) { + finfo.mu.Lock() + defer finfo.mu.Unlock() + + changed := false + for _, id := range(servers) { + if finfo.crashed[id] { + changed = true + } + finfo.crashed[id] = false + } + + if !changed { + // Nothing to do if the set of crashed servers is unchanged. + return + } + + annotateFault() +} + +func AnnotateRestartAll() { + finfo.mu.Lock() + n := finfo.nservers + finfo.mu.Unlock() + + servers := make([]int, n) + for i := range(servers) { + servers[i] = i + } + AnnotateRestart(servers) +} + +/// +/// Internal. +/// + +func timestamp() int64 { + return int64(time.Since(time.Unix(0, 0))) +} + +func (an *Annotation) retrieve() []porcupine.Annotation { + an.mu.Lock() + x := an.annotations + t := timestamp() + for tag, cont := range(an.continuous) { + a := porcupine.Annotation{ + Tag: tag, + Start: cont.start, + End: t, + Description: cont.desp, + Details: cont.details, + BackgroundColor: cont.bgcolor, + } + x = append(x, a) + } + an.annotations = make([]porcupine.Annotation, 0) + an.continuous = make(map[string]Continuous) + an.mu.Unlock() + return x +} + +func (an *Annotation) clear() { + an.mu.Lock() + an.annotations = make([]porcupine.Annotation, 0) + an.continuous = make(map[string]Continuous) + an.mu.Unlock() +} + +func (an *Annotation) annotatePointColor( + tag, desp, details, bgcolor string, +) { + an.mu.Lock() + t := timestamp() + a := porcupine.Annotation{ + Tag: tag, + Start: t, + Description: desp, + Details: details, + BackgroundColor: bgcolor, + } + an.annotations = append(an.annotations, a) + an.mu.Unlock() +} + +func (an *Annotation) annotateIntervalColor( + tag string, start int64, desp, details, bgcolor string, +) { + an.mu.Lock() + a := porcupine.Annotation{ + Tag: tag, + Start: start, + End: timestamp(), + Description: desp, + Details: details, + BackgroundColor: bgcolor, + } + an.annotations = append(an.annotations, a) + an.mu.Unlock() +} + +func (an *Annotation) annotateContinuousColor( + tag, desp, details, bgcolor string, +) { + an.mu.Lock() + defer an.mu.Unlock() + + cont, ok := an.continuous[tag] + if !ok { + // The first continuous annotation for tag. Simply add it to the + // continuous map. + an.continuous[tag] = Continuous{ + start: timestamp(), + desp: desp, + details: details, + bgcolor: bgcolor, + } + return + } + + // Subsequent continuous annotation for tag. Concretize the previous + // annotation and add this one to the continuous map. + t := timestamp() + aprev := porcupine.Annotation{ + Tag: tag, + Start: cont.start, + End: t, + Description: cont.desp, + Details: cont.details, + BackgroundColor: cont.bgcolor, + } + an.annotations = append(an.annotations, aprev) + an.continuous[tag] = Continuous{ + // XXX: If the start timestamp of an event is too closer to the end + // timestamp of another event, Porcupine seems to overlap the two + // events. We add a delta (1000) as a workaround, but remove this once + // this issue is resolved. + start: t + 1000, + desp: desp, + details: details, + bgcolor: bgcolor, + } +} + +func (an *Annotation) annotateContinuousEnd(tag string) { + an.mu.Lock() + defer an.mu.Unlock() + + cont, ok := an.continuous[tag] + if !ok { + // Nothing to end since there's no on-going continuous annotation for + // tag. + } + + // End the on-going continuous annotation for tag. + t := timestamp() + aprev := porcupine.Annotation{ + Tag: tag, + Start: cont.start, + End: t, + Description: cont.desp, + Details: cont.details, + BackgroundColor: cont.bgcolor, + } + an.annotations = append(an.annotations, aprev) + delete(an.continuous, tag) +} + +func (an *Annotation) cleanup(failed bool, end string) { + enabled := os.Getenv("VIS_ENABLE") + if enabled == "never" || (!failed && enabled != "always") { + // Simply clean up the annotations without producing the vis file if + // VIS_ENABLE is set to "never", or if the test passes and VIS_ENABLE is + // not set to "always". + an.clear() + return + } + + annotations := an.retrieve() + if len(annotations) == 0 { + // Skip empty annotations. + return + } + + // XXX: Make the last annotation a interval one to work around Porcupine's + // issue. Consider removing this once the issue is fixed. + t := timestamp() + aend := porcupine.Annotation{ + Tag: TAG_INFO, + Start: t, + End: t + 1000, + Description: end, + Details: end, + BackgroundColor: COLOR_INFO, + } + annotations = append(annotations, aend) + + fpath := os.Getenv("VIS_FILE") + var file *os.File + var err error + if fpath == "" { + // Save the vis file in a temporary file. + file, err = os.CreateTemp("", "porcupine-*.html") + } else { + file, err = os.OpenFile(fpath, os.O_RDWR | os.O_CREATE | os.O_TRUNC, 0644) + } + if err != nil { + fmt.Printf("info: failed to open visualization file %s (%v)\n", fpath, err) + return + } + + // Create a fresh linearization info without any client operations and use + // models.KvModel simply as a placeholder. + info := porcupine.LinearizationInfo{} + info.AddAnnotations(annotations) + porcupine.Visualize(models.KvModel, info, file) + fmt.Printf("info: wrote visualization to %s\n", file.Name()) +} + +func mkAnnotation() *Annotation { + an := Annotation{ + mu: new(sync.Mutex), + annotations: make([]porcupine.Annotation, 0), + continuous: make(map[string]Continuous), + } + + return &an +} + +func mkFrameworkInfo(nservers int) *FrameworkInfo { + conn := make([]bool, nservers) + for id := range(conn) { + conn[id] = true + } + + finfo := FrameworkInfo{ + mu: new(sync.Mutex), + nservers: nservers, + connected: conn, + crashed: make([]bool, nservers), + } + + return &finfo +} + +func captureSignal() struct{} { + // Capture SIGINT to visualize on interruption. + c := make(chan os.Signal, 1) + signal.Notify(c, os.Interrupt) + go func(){ + for range c { + annotation.cleanup(true, "interrupted") + os.Exit(1) + } + }() + + return struct{}{} +} diff --git a/src/tester1/config.go b/src/tester1/config.go index 939df47..1a37782 100644 --- a/src/tester1/config.go +++ b/src/tester1/config.go @@ -79,6 +79,11 @@ func (cfg *Config) Cleanup() { cfg.Clnts.cleanup() cfg.Groups.cleanup() cfg.net.Cleanup() + if cfg.t.Failed() { + annotation.cleanup(true, "test failed") + } else { + annotation.cleanup(false, "test passed") + } cfg.CheckTimeout() } diff --git a/src/tester1/group.go b/src/tester1/group.go index 5c4fa28..bbc2828 100644 --- a/src/tester1/group.go +++ b/src/tester1/group.go @@ -194,6 +194,10 @@ func (sg *ServerGrp) IsConnected(i int) bool { return sg.connected[i] } +func (sg *ServerGrp) GetConnected() []bool { + return sg.connected +} + // Maximum log size across all servers func (sg *ServerGrp) LogSize() int { logsize := 0