This commit is contained in:
Yun-Sheng Chang 2025-03-15 22:11:08 -04:00
parent fe6ae451c1
commit ea04b4c78f
7 changed files with 55 additions and 70 deletions

View File

@ -68,7 +68,9 @@ func (ts *Test) onePartition(p []int, req any) any {
// try all the servers, maybe one is the leader but give up after NSEC
t0 := time.Now()
for time.Since(t0).Seconds() < NSEC {
ts.mu.Lock()
index := ts.leader
ts.mu.Unlock()
for range ts.srvs {
if ts.g.IsConnected(index) {
s := ts.srvs[index]

View File

@ -878,10 +878,11 @@ func TestPersist33C(t *testing.T) {
ts.g.ShutdownServer((leader + 0) % servers)
ts.g.ShutdownServer((leader + 1) % servers)
tester.AnnotateShutdown([]int{(leader + 0) % servers, (leader + 1) % servers})
ts.restart((leader + 2) % servers)
ts.restart((leader + 0) % servers)
tester.AnnotateRestart([]int{(leader + 2) % servers, (leader + 0) % servers})
ts.g.ConnectOne((leader + 2) % servers)
tester.AnnotateConnection(ts.g.GetConnected())
ts.restart((leader + 0) % servers)
tester.AnnotateRestart([]int{(leader + 0) % servers})
ts.one(103, 2, true)

View File

@ -37,12 +37,14 @@ func MakeShardCtrler(clnt *tester.Clnt, leases bool) *ShardCtrler {
}
// The tester calls InitController() before starting a new
// controller. In this method you can implement recovery (part B) and
// use a lock to become leader (part C). InitController may fail when
// another controller supersedes (e.g., when this controller is
// partitioned during recovery).
// controller. In part A, this method doesn't need to do anything. In
// B and C, this method implements recovery (part B) and uses a lock
// to become leader (part C). InitController should return
// rpc.ErrVersion when another controller supersedes it (e.g., when
// this controller is partitioned during recovery); this happens only
// in Part C. Otherwise, it returns rpc.OK.
func (sck *ShardCtrler) InitController() rpc.Err {
return rpc.ErrNoKey
return rpc.ErrVersion
}
// The tester calls ExitController to exit a controller. In part B and
@ -59,9 +61,10 @@ func (sck *ShardCtrler) InitConfig(cfg *shardcfg.ShardConfig) {
}
// Called by the tester to ask the controller to change the
// configuration from the current one to new. It may return an error
// if this controller is disconnected for a while and another
// controller takes over in the mean time, as in part C.
// configuration from the current one to new. It should return
// rpc.ErrVersion if this controller is superseded by another
// controller, as in part C. In all other cases, it should return
// rpc.OK.
func (sck *ShardCtrler) ChangeConfigTo(new *shardcfg.ShardConfig) rpc.Err {
return rpc.OK
}
@ -79,7 +82,7 @@ func (sck *ShardCtrler) isKilled() bool {
}
// Return the current configuration
// Return the current configuration and its version number
func (sck *ShardCtrler) Query() (*shardcfg.ShardConfig, rpc.Tversion) {
// Your code here.
return nil, 0

View File

@ -104,15 +104,13 @@ func TestJoinBasic5A(t *testing.T) {
ts.t.Fatalf("%d isn't a member of %v", gid2, cfg1)
}
// check shards at shardcfg.Gid2
ts.checkShutdownSharding(gid1, gid2, ka, va)
ts.checkShutdownSharding(gid1, ka, va)
for i := 0; i < len(ka); i++ {
ts.CheckGet(ck, ka[i], va[i], rpc.Tversion(1))
}
// check shards at shardcfg.Gid1
ts.checkShutdownSharding(gid2, gid1, ka, va)
ts.checkShutdownSharding(gid2, ka, va)
for i := 0; i < len(ka); i++ {
ts.CheckGet(ck, ka[i], va[i], rpc.Tversion(1))
@ -172,8 +170,7 @@ func TestJoinLeaveBasic5A(t *testing.T) {
ts.t.Fatalf("joinGroups: err %v", err)
}
// check shards at shardcfg.Gid2
ts.checkShutdownSharding(gid1, gid2, ka, va)
ts.checkShutdownSharding(gid1, ka, va)
for i := 0; i < len(ka); i++ {
ts.CheckGet(ck, ka[i], va[i], rpc.Tversion(1))
@ -204,8 +201,7 @@ func TestJoinLeaveBasic5A(t *testing.T) {
ts.CheckGet(ck, ka[i], va[i], rpc.Tversion(1))
}
// check shards at shardcfg.Gid2
ts.checkShutdownSharding(gid2, gid1, ka, va)
ts.checkShutdownSharding(gid2, ka, va)
}
// test many groups joining and leaving, reliable or unreliable
@ -222,7 +218,7 @@ func joinLeave5A(t *testing.T, reliable bool, part string) {
ts.joinGroups(sck, grps)
ts.checkShutdownSharding(grps[0], grps[1], ka, va)
ts.checkShutdownSharding(grps[0], ka, va)
for i := 0; i < len(ka); i++ {
ts.CheckGet(ck, ka[i], va[i], rpc.Tversion(1))
@ -260,7 +256,7 @@ func TestShutdown5A(t *testing.T) {
grps := ts.groups(NJOIN)
ts.joinGroups(sck, grps)
ts.checkShutdownSharding(grps[0], grps[1], ka, va)
ts.checkShutdownSharding(grps[0], ka, va)
for i := 0; i < len(ka); i++ {
ts.CheckGet(ck, ka[i], va[i], rpc.Tversion(1))
@ -569,13 +565,13 @@ func TestJoinLeave5B(t *testing.T) {
}
}
// test recovery of partitioned controlers
// test recovery of partitioned controllers
func TestRecoverCtrler5B(t *testing.T) {
const (
NPARTITION = 5
)
ts := MakeTest(t, "Test (5B): recover controler ...", true)
ts := MakeTest(t, "Test (5B): recover controller ...", true)
defer ts.Cleanup()
gid := ts.setupKVService()
@ -733,7 +729,7 @@ func TestLeaseBasicRefresh5C(t *testing.T) {
// Test if old leader is fenced off when reconnecting while it is in
// the middle of a Join.
func TestPartitionControlerJoin5C(t *testing.T) {
func TestPartitionControllerJoin5C(t *testing.T) {
const (
NSLEEP = 2
RAND = 1000
@ -759,8 +755,8 @@ func TestPartitionControlerJoin5C(t *testing.T) {
ch <- ts.join(sck, ngid, ts.Group(ngid).SrvNames())
}()
// sleep for a while to get the chance for the controler to get stuck
// in join or leave, because gid is down
// sleep for a while to get the chance for the controller to get
// stuck in join, because gid is down
time.Sleep(1 * time.Second)
// partition sck
@ -771,19 +767,20 @@ func TestPartitionControlerJoin5C(t *testing.T) {
ts.Group(ngid).StartServers()
// start new controler to supersede partitioned one,
// it will also be stuck
// start new controller to supersede partitioned one,
sck0 := ts.makeShardCtrler()
if err := sck0.InitController(); err != rpc.OK {
t.Fatalf("failed to init controller %v", err)
}
scfg, _ := sck0.Query()
if !scfg.IsMember(ngid) {
t.Fatalf("Didn't recover gid %d", ngid)
}
sck0.ExitController()
//log.Printf("reconnect")
// reconnect old controller, which shouldn't be able
// to do anything
// reconnect old controller, which shouldn't finish ChangeConfigTo
clnt.ConnectAll()
err := <-ch

View File

@ -2,7 +2,7 @@ package shardkv
import (
"fmt"
//"log"
"log"
"math/rand"
"sync"
"sync/atomic"
@ -131,22 +131,19 @@ func (ts *Test) join(sck *shardctrler.ShardCtrler, gid tester.Tgid, srvs []strin
newcfg := cfg.Copy()
ok := newcfg.JoinBalance(map[tester.Tgid][]string{gid: srvs})
if !ok {
return rpc.ErrVersion
log.Fatalf("join: group %d is already present", gid)
}
err := sck.ChangeConfigTo(newcfg)
return err
return sck.ChangeConfigTo(newcfg)
}
func (ts *Test) joinGroups(sck *shardctrler.ShardCtrler, gids []tester.Tgid) rpc.Err {
for i, gid := range gids {
for _, gid := range gids {
ts.Config.MakeGroupStart(gid, NSRV, ts.StartServerShardGrp)
if err := ts.join(sck, gid, ts.Group(gid).SrvNames()); err != rpc.OK {
return err
}
if i < len(gids)-1 {
time.Sleep(INTERGRPDELAY * time.Millisecond)
}
}
return rpc.OK
}
@ -156,21 +153,19 @@ func (ts *Test) leave(sck *shardctrler.ShardCtrler, gid tester.Tgid) rpc.Err {
newcfg := cfg.Copy()
ok := newcfg.LeaveBalance([]tester.Tgid{gid})
if !ok {
return rpc.ErrVersion
log.Fatalf("leave: group %d is already not present", gid)
}
return sck.ChangeConfigTo(newcfg)
}
func (ts *Test) leaveGroups(sck *shardctrler.ShardCtrler, gids []tester.Tgid) rpc.Err {
for i, gid := range gids {
for _, gid := range gids {
if err := ts.leave(sck, gid); err != rpc.OK {
return err
}
ts.Config.ExitGroup(gid)
if i < len(gids)-1 {
time.Sleep(INTERGRPDELAY * time.Millisecond)
}
}
return rpc.OK
}
@ -196,31 +191,14 @@ func (ts *Test) disconnectClntFromLeader(clnt *tester.Clnt, gid tester.Tgid) int
return l
}
func (ts *Test) checkLogs(gids []tester.Tgid) {
for _, gid := range gids {
n := ts.Group(gid).LogSize()
s := ts.Group(gid).SnapshotSize()
if ts.maxraftstate >= 0 && n > 8*ts.maxraftstate {
ts.t.Fatalf("persister.RaftStateSize() %v, but maxraftstate %v",
n, ts.maxraftstate)
}
if ts.maxraftstate < 0 && s > 0 {
ts.t.Fatalf("maxraftstate is -1, but snapshot is non-empty!")
}
}
}
// make sure that the data really is sharded by
// shutting down one shard and checking that some
// Get()s don't succeed.
func (ts *Test) checkShutdownSharding(down, up tester.Tgid, ka []string, va []string) {
func (ts *Test) checkShutdownSharding(down tester.Tgid, ka []string, va []string) {
const NSEC = 2
ts.Group(down).Shutdown()
ts.checkLogs([]tester.Tgid{down, up}) // forbid snapshots
n := len(ka)
ch := make(chan string)
done := int32(0)
@ -239,7 +217,6 @@ func (ts *Test) checkShutdownSharding(down, up tester.Tgid, ka []string, va []st
}(xi)
}
// wait a bit, only about half the Gets should succeed.
ndone := 0
for atomic.LoadInt32(&done) != 1 {
select {
@ -254,9 +231,9 @@ func (ts *Test) checkShutdownSharding(down, up tester.Tgid, ka []string, va []st
}
}
//log.Printf("%d completions out of %d; down %d", ndone, n, down)
// log.Printf("%d completions out of %d; down %d", ndone, n, down)
if ndone >= n {
ts.Fatalf("expected less than %d completions with one shard dead\n", n)
ts.Fatalf("expected less than %d completions with shard %d down\n", n, down)
}
// bring the crashed shard/group back to life.
@ -360,8 +337,8 @@ func (ts *Test) killCtrler(ck kvtest.IKVClerk, gid tester.Tgid, ka, va []string)
sck0.ExitController()
if ts.leases {
// reconnect old controller, which shouldn't be able
// to do anything
// reconnect old controller, which should bail out, because
// it has been superseded.
clnt.ConnectAll()
time.Sleep(1 * time.Second)

View File

@ -74,6 +74,7 @@ type ServerGrp struct {
gid Tgid
connected []bool // whether each server is on the net
mks FstartServer
mu sync.Mutex
}
func makeSrvGrp(net *labrpc.Network, gid Tgid, n int, mks FstartServer) *ServerGrp {
@ -174,7 +175,9 @@ func (sg *ServerGrp) connect(i int, to []int) {
func (sg *ServerGrp) disconnect(i int, from []int) {
// log.Printf("%p: disconnect peer %d from %v\n", sg, i, from)
sg.mu.Lock()
sg.connected[i] = false
sg.mu.Unlock()
// outgoing socket files
sg.srvs[i].disconnect(from)
@ -195,6 +198,8 @@ func (sg *ServerGrp) DisconnectAll(i int) {
}
func (sg *ServerGrp) IsConnected(i int) bool {
defer sg.mu.Unlock()
sg.mu.Lock()
return sg.connected[i]
}

View File

@ -87,7 +87,7 @@ func (s *Server) shutdownServer() {
// inform all services to stop
for _, svc := range s.svcs {
if svc != nil {
go svc.Kill()
svc.Kill()
}
}
s.svcs = nil