This commit is contained in:
Frans Kaashoek 2025-04-01 18:04:28 -04:00
parent 05dc50ab27
commit 8f21c11cfc
6 changed files with 72 additions and 204 deletions

View File

@ -6,8 +6,6 @@ package shardctrler
import ( import (
"sync/atomic"
"6.5840/kvsrv1" "6.5840/kvsrv1"
"6.5840/kvsrv1/rpc" "6.5840/kvsrv1/rpc"
"6.5840/kvtest1" "6.5840/kvtest1"
@ -22,14 +20,13 @@ type ShardCtrler struct {
kvtest.IKVClerk kvtest.IKVClerk
killed int32 // set by Kill() killed int32 // set by Kill()
leases bool
// Your data here. // Your data here.
} }
// Make a ShardCltler, which stores its state in a kvsrv. // Make a ShardCltler, which stores its state in a kvsrv.
func MakeShardCtrler(clnt *tester.Clnt, leases bool) *ShardCtrler { func MakeShardCtrler(clnt *tester.Clnt) *ShardCtrler {
sck := &ShardCtrler{clnt: clnt, leases: leases} sck := &ShardCtrler{clnt: clnt}
srv := tester.ServerName(tester.GRP0, 0) srv := tester.ServerName(tester.GRP0, 0)
sck.IKVClerk = kvsrv.MakeClerk(clnt, srv) sck.IKVClerk = kvsrv.MakeClerk(clnt, srv)
// Your code here. // Your code here.
@ -38,20 +35,15 @@ func MakeShardCtrler(clnt *tester.Clnt, leases bool) *ShardCtrler {
// The tester calls InitController() before starting a new // The tester calls InitController() before starting a new
// controller. In part A, this method doesn't need to do anything. In // controller. In part A, this method doesn't need to do anything. In
// B and C, this method implements recovery (part B) and uses a lock // B and C, this method implements recovery.
// to become leader (part C).
func (sck *ShardCtrler) InitController() { func (sck *ShardCtrler) InitController() {
} }
// The tester calls ExitController to exit a controller. In part B and
// C, release lock.
func (sck *ShardCtrler) ExitController() {
}
// Called once by the tester to supply the first configuration. You // Called once by the tester to supply the first configuration. You
// can marshal ShardConfig into a string using shardcfg.String(), and // can marshal ShardConfig into a string using shardcfg.String(), and
// then Put it in the kvsrv for the controller at version 0. You can // then Put it in the kvsrv for the controller at version 0. You can
// pick the key to name the configuration. // pick the key to name the configuration. The initial configuration
// lists shardgrp shardcfg.Gid1 for all shards.
func (sck *ShardCtrler) InitConfig(cfg *shardcfg.ShardConfig) { func (sck *ShardCtrler) InitConfig(cfg *shardcfg.ShardConfig) {
// Your code here // Your code here
} }
@ -64,18 +56,6 @@ func (sck *ShardCtrler) ChangeConfigTo(new *shardcfg.ShardConfig) {
return return
} }
// Tester "kills" shardctrler by calling Kill(). For your
// convenience, we also supply isKilled() method to test killed in
// loops.
func (sck *ShardCtrler) Kill() {
atomic.StoreInt32(&sck.killed, 1)
}
func (sck *ShardCtrler) isKilled() bool {
z := atomic.LoadInt32(&sck.killed)
return z == 1
}
// Return the current configuration and its version number // Return the current configuration and its version number
func (sck *ShardCtrler) Query() (*shardcfg.ShardConfig, rpc.Tversion) { func (sck *ShardCtrler) Query() (*shardcfg.ShardConfig, rpc.Tversion) {

View File

@ -19,14 +19,14 @@ func MakeClerk(clnt *tester.Clnt, servers []string) *Clerk {
return ck return ck
} }
func (ck *Clerk) Get(key string, n shardcfg.Tnum) (string, rpc.Tversion, rpc.Err) { func (ck *Clerk) Get(key string) (string, rpc.Tversion, rpc.Err) {
// Your code here // Your code here
return "", 0, "" return "", 0, ""
} }
func (ck *Clerk) Put(key string, value string, version rpc.Tversion, n shardcfg.Tnum) (bool, rpc.Err) { func (ck *Clerk) Put(key string, value string, version rpc.Tversion) rpc.Err {
// Your code here // Your code here
return false, "" return ""
} }
func (ck *Clerk) Freeze(s shardcfg.Tshid, num shardcfg.Tnum) ([]byte, rpc.Err) { func (ck *Clerk) Freeze(s shardcfg.Tshid, num shardcfg.Tnum) ([]byte, rpc.Err) {

View File

@ -5,18 +5,16 @@ import (
"6.5840/shardkv1/shardcfg" "6.5840/shardkv1/shardcfg"
) )
// Same as Put in kvsrv1/rpc, but with a configuration number // Same as Put in kvsrv1/rpc
type PutArgs struct { type PutArgs struct {
Key string Key string
Value string Value string
Version rpc.Tversion Version rpc.Tversion
Num shardcfg.Tnum
} }
// Same as Get in kvsrv1/rpc, but with a configuration number. // Same as Get in kvsrv1/rpc
type GetArgs struct { type GetArgs struct {
Key string Key string
Num shardcfg.Tnum
} }
type FreezeArgs struct { type FreezeArgs struct {

View File

@ -1,7 +1,7 @@
package shardkv package shardkv
import ( import (
"log" //"log"
"testing" "testing"
"time" "time"
@ -9,7 +9,6 @@ import (
"6.5840/kvtest1" "6.5840/kvtest1"
"6.5840/shardkv1/shardcfg" "6.5840/shardkv1/shardcfg"
"6.5840/shardkv1/shardctrler" "6.5840/shardkv1/shardctrler"
"6.5840/shardkv1/shardctrler/param"
"6.5840/tester1" "6.5840/tester1"
) )
@ -28,7 +27,7 @@ func TestInitQuery5A(t *testing.T) {
defer ts.Cleanup() defer ts.Cleanup()
// Make a shard controller // Make a shard controller
sck := shardctrler.MakeShardCtrler(ts.Config.MakeClient(), ts.leases) sck := shardctrler.MakeShardCtrler(ts.Config.MakeClient())
// Make an empty shard configuration // Make an empty shard configuration
scfg := shardcfg.MakeShardConfig() scfg := shardcfg.MakeShardConfig()
@ -409,7 +408,7 @@ func TestProgressJoin(t *testing.T) {
select { select {
case cnt := <-ch1: case cnt := <-ch1:
log.Printf("cnt %d", cnt) //log.Printf("cnt %d", cnt)
if cnt < NCNT { if cnt < NCNT {
ts.Fatalf("Two few gets finished %d; expected more than %d", cnt, NCNT) ts.Fatalf("Two few gets finished %d; expected more than %d", cnt, NCNT)
} }
@ -569,140 +568,28 @@ func TestRecoverCtrler5B(t *testing.T) {
ka, va := ts.SpreadPuts(ck, NKEYS) ka, va := ts.SpreadPuts(ck, NKEYS)
for i := 0; i < NPARTITION; i++ { for i := 0; i < NPARTITION; i++ {
ts.killCtrler(ck, gid, ka, va) ts.partitionCtrler(ck, gid, ka, va)
} }
} }
// Test concurrent ctrlers fighting for leadership reliable // Test concurrent ctrlers fighting for leadership reliable
func TestAcquireLockConcurrentReliable5C(t *testing.T) { func TestConcurrentReliable5C(t *testing.T) {
ts := MakeTestLeases(t, "Test (5C): Concurent ctrlers acquiring leadership ...", true) ts := MakeTestLeases(t, "Test (5C): Concurent ctrlers ...", true)
defer ts.Cleanup() defer ts.Cleanup()
ts.setupKVService() ts.setupKVService()
ck := ts.MakeClerk() ck := ts.MakeClerk()
ka, va := ts.SpreadPuts(ck, NKEYS) ka, va := ts.SpreadPuts(ck, NKEYS)
ts.electCtrler(ck, ka, va) ts.concurCtrler(ck, ka, va)
} }
// Test concurrent ctrlers fighting for leadership unreliable // Test concurrent ctrlers fighting for leadership unreliable
func TestAcquireLockConcurrentUnreliable5C(t *testing.T) { func TestAcquireLockConcurrentUnreliable5C(t *testing.T) {
ts := MakeTestLeases(t, "Test (5C): Concurent ctrlers acquiring leadership ...", false) ts := MakeTestLeases(t, "Test (5C): Concurent ctrlers ...", false)
defer ts.Cleanup() defer ts.Cleanup()
ts.setupKVService() ts.setupKVService()
ck := ts.MakeClerk() ck := ts.MakeClerk()
ka, va := ts.SpreadPuts(ck, NKEYS) ka, va := ts.SpreadPuts(ck, NKEYS)
ts.electCtrler(ck, ka, va) ts.concurCtrler(ck, ka, va)
}
// Test that ReleaseLock allows a new leader to start quickly
func TestLeaseBasicRelease5C(t *testing.T) {
ts := MakeTestLeases(t, "Test (5C): release lease ...", true)
defer ts.Cleanup()
ts.setupKVService()
sck0, clnt0 := ts.makeShardCtrlerClnt()
go func() {
sck0.InitController()
time.Sleep(200 * time.Millisecond)
sck0.ExitController()
}()
time.Sleep(10 * time.Millisecond)
// start new controller
sck1, clnt1 := ts.makeShardCtrlerClnt()
ch := make(chan struct{})
go func() {
sck1.InitController()
time.Sleep(200 * time.Millisecond)
sck1.ExitController()
ch <- struct{}{}
}()
select {
case <-time.After(1 * time.Second):
ts.Fatalf("Release didn't give up leadership")
case <-ch:
}
ts.Config.DeleteClient(clnt0)
ts.Config.DeleteClient(clnt1)
}
// Test lease expiring
func TestLeaseBasicExpire5C(t *testing.T) {
ts := MakeTestLeases(t, "Test (5C): lease expiring ...", true)
defer ts.Cleanup()
ts.setupKVService()
sck0, clnt0 := ts.makeShardCtrlerClnt()
go func() {
sck0.InitController()
for {
time.Sleep(10 * time.Millisecond)
}
}()
time.Sleep(100 * time.Millisecond)
// partition sck0 forever
clnt0.DisconnectAll()
// start new controller
sck1, clnt1 := ts.makeShardCtrlerClnt()
ch := make(chan struct{})
go func() {
sck1.InitController()
time.Sleep(100 * time.Millisecond)
sck1.ExitController()
ch <- struct{}{}
}()
select {
case <-time.After((param.LEASETIMESEC + 1) * time.Second):
ts.Fatalf("Lease didn't expire")
case <-ch:
}
ts.Config.DeleteClient(clnt0)
ts.Config.DeleteClient(clnt1)
}
// Test lease is being extended
func TestLeaseBasicRefresh5C(t *testing.T) {
const LEADERSEC = 3
ts := MakeTestLeases(t, "Test (5C): lease refresh ...", true)
defer ts.Cleanup()
ts.setupKVService()
sck0, clnt0 := ts.makeShardCtrlerClnt()
go func() {
sck0.InitController()
time.Sleep(LEADERSEC * param.LEASETIMESEC * time.Second)
sck0.ExitController()
}()
// give sck0 time to become leader
time.Sleep(100 * time.Millisecond)
// start new controller
sck1, clnt1 := ts.makeShardCtrlerClnt()
ch := make(chan struct{})
go func() {
sck1.InitController()
time.Sleep(100 * time.Millisecond)
sck1.ExitController()
ch <- struct{}{}
}()
select {
case <-time.After((LEADERSEC + param.LEASETIMESEC + 1) * time.Second):
case <-ch:
ts.Fatalf("Lease not refreshed")
}
ts.Config.DeleteClient(clnt0)
ts.Config.DeleteClient(clnt1)
} }
// Test if old leader is fenced off when reconnecting while it is in // Test if old leader is fenced off when reconnecting while it is in
@ -710,6 +597,7 @@ func TestLeaseBasicRefresh5C(t *testing.T) {
func TestPartitionControllerJoin5C(t *testing.T) { func TestPartitionControllerJoin5C(t *testing.T) {
const ( const (
NSLEEP = 2 NSLEEP = 2
NSEC = 1
RAND = 1000 RAND = 1000
) )
@ -739,8 +627,8 @@ func TestPartitionControllerJoin5C(t *testing.T) {
// partition sck // partition sck
clnt.DisconnectAll() clnt.DisconnectAll()
// wait until sck's lease expired before restarting shardgrp `ngid` // wait a while before restarting shardgrp `ngid`
time.Sleep((param.LEASETIMESEC + 1) * time.Second) time.Sleep(NSEC * time.Second)
ts.Group(ngid).StartServers() ts.Group(ngid).StartServers()
@ -753,8 +641,6 @@ func TestPartitionControllerJoin5C(t *testing.T) {
t.Fatalf("Didn't recover gid %d", ngid) t.Fatalf("Didn't recover gid %d", ngid)
} }
sck0.ExitController()
// reconnect old controller, which shouldn't finish ChangeConfigTo // reconnect old controller, which shouldn't finish ChangeConfigTo
clnt.ConnectAll() clnt.ConnectAll()
@ -795,7 +681,7 @@ func partitionRecovery5C(t *testing.T, reliable bool, npart, nclnt int) {
} }
for i := 0; i < npart; i++ { for i := 0; i < npart; i++ {
ts.killCtrler(ck, gid, ka, va) ts.partitionCtrler(ck, gid, ka, va)
} }
if nclnt > 0 { if nclnt > 0 {
@ -827,7 +713,7 @@ func TestPartitionRecoveryReliableClerks5C(t *testing.T) {
func TestPartitionRecoveryUnreliableClerks5C(t *testing.T) { func TestPartitionRecoveryUnreliableClerks5C(t *testing.T) {
const ( const (
NPARTITION = 5 NPARTITION = 3
) )
partitionRecovery5C(t, false, NPARTITION, 5) partitionRecovery5C(t, false, NPARTITION, 5)
} }

View File

@ -16,7 +16,6 @@ import (
"6.5840/labrpc" "6.5840/labrpc"
"6.5840/shardkv1/shardcfg" "6.5840/shardkv1/shardcfg"
"6.5840/shardkv1/shardctrler" "6.5840/shardkv1/shardctrler"
"6.5840/shardkv1/shardctrler/param"
"6.5840/shardkv1/shardgrp" "6.5840/shardkv1/shardgrp"
"6.5840/tester1" "6.5840/tester1"
) )
@ -25,9 +24,9 @@ type Test struct {
t *testing.T t *testing.T
*kvtest.Test *kvtest.Test
sck *shardctrler.ShardCtrler sck *shardctrler.ShardCtrler
part string part string
leases bool partition bool
maxraftstate int maxraftstate int
mu sync.Mutex mu sync.Mutex
@ -41,11 +40,11 @@ const (
) )
// Setup kvserver for the shard controller and make the controller // Setup kvserver for the shard controller and make the controller
func MakeTestMaxRaft(t *testing.T, part string, reliable, leases bool, maxraftstate int) *Test { func MakeTestMaxRaft(t *testing.T, part string, reliable, partition bool, maxraftstate int) *Test {
ts := &Test{ ts := &Test{
ngid: shardcfg.Gid1 + 1, // Gid1 is in use ngid: shardcfg.Gid1 + 1, // Gid1 is in use
t: t, t: t,
leases: leases, partition: partition,
maxraftstate: maxraftstate, maxraftstate: maxraftstate,
} }
cfg := tester.MakeConfig(t, 1, reliable, kvsrv.StartKVServer) cfg := tester.MakeConfig(t, 1, reliable, kvsrv.StartKVServer)
@ -86,7 +85,7 @@ func (ts *Test) makeShardCtrler() *shardctrler.ShardCtrler {
func (ts *Test) makeShardCtrlerClnt() (*shardctrler.ShardCtrler, *tester.Clnt) { func (ts *Test) makeShardCtrlerClnt() (*shardctrler.ShardCtrler, *tester.Clnt) {
clnt := ts.Config.MakeClient() clnt := ts.Config.MakeClient()
return shardctrler.MakeShardCtrler(clnt, ts.leases), clnt return shardctrler.MakeShardCtrler(clnt), clnt
} }
func (ts *Test) makeKVClerk() *kvsrv.Clerk { func (ts *Test) makeKVClerk() *kvsrv.Clerk {
@ -252,15 +251,14 @@ func (ts *Test) checkShutdownSharding(down tester.Tgid, ka []string, va []string
// Run one controler and then partition it after some time. Run // Run one controler and then partition it after some time. Run
// another cntrler that must finish the first ctrler's unfinished // another cntrler that must finish the first ctrler's unfinished
// shard moves. To ensure first ctrler is in a join/leave the test // shard moves. To make it likely that first ctrler is in a join/leave
// shuts down shardgrp `gid`. After the second controller is done, // the test shuts down shardgrp `gid`. After the second controller is
// heal the partition to test if Freeze,InstallShard, and Delete are // done, heal the partition. partitionCtrler returns if recovery
// are fenced. // happened.
func (ts *Test) killCtrler(ck kvtest.IKVClerk, gid tester.Tgid, ka, va []string) { func (ts *Test) partitionCtrler(ck kvtest.IKVClerk, gid tester.Tgid, ka, va []string) {
const ( const (
NSLEEP = 2 RAND = 400
NSEC = 1
RAND = 1000
JOIN = 1 JOIN = 1
LEAVE = 2 LEAVE = 2
@ -283,33 +281,39 @@ func (ts *Test) killCtrler(ck kvtest.IKVClerk, gid tester.Tgid, ka, va []string)
state = LEAVE state = LEAVE
ts.leaveGroups(sck, []tester.Tgid{ngid}) ts.leaveGroups(sck, []tester.Tgid{ngid})
} else { } else {
//log.Printf("deposed") //log.Printf("%v: deposed", sck.Id())
return return
} }
} }
}() }()
// let sck run for a little while
time.Sleep(1000 * time.Millisecond)
r := rand.Int() % RAND r := rand.Int() % RAND
d := time.Duration(r) * time.Millisecond d := time.Duration(r) * time.Millisecond
time.Sleep(d) time.Sleep(d)
//log.Printf("shutdown gid %d after %dms", gid, r) //log.Printf("shutdown gid %d after %dms %v", gid, r, time.Now().Sub(t))
ts.Group(gid).Shutdown() ts.Group(gid).Shutdown()
// sleep for a while to get the chance for the controler to get stuck // sleep for a while to get sck stuck in join or leave, because
// in join or leave, because gid is down // gid is down
time.Sleep(NSLEEP * time.Second) time.Sleep(1000 * time.Millisecond)
//log.Printf("disconnect sck %v ngid %d num %d state %d", d, ngid, num, state) //log.Printf("disconnect sck %v ngid %d num %d state %d", d, ngid, num, state)
// partition controller // partition controller
clnt.DisconnectAll() clnt.DisconnectAll()
if ts.leases { if ts.partition {
// wait until sck's lease expired before restarting shardgrp `gid` // wait a while before restarting shardgrp `gid`
time.Sleep((param.LEASETIMESEC + 1) * time.Second) time.Sleep(NSEC * time.Second)
} }
//log.Printf("startservers %v lease expired %t", time.Now().Sub(t), ts.leases)
ts.Group(gid).StartServers() ts.Group(gid).StartServers()
// start new controler to pick up where sck left off // start new controler to pick up where sck left off
@ -321,12 +325,13 @@ func (ts *Test) killCtrler(ck kvtest.IKVClerk, gid tester.Tgid, ka, va []string)
if state == LEAVE { if state == LEAVE {
s = "leave" s = "leave"
} }
//log.Printf("%v cfg %v recovered %s", s, cfg, s)
if cfg.Num <= num { if cfg.Num <= num {
ts.Fatalf("didn't recover; expected %d > %d", num, cfg.Num) ts.Fatalf("didn't recover; expected %d > %d", num, cfg.Num)
} }
//log.Printf("%v: recovered %v %v %v", sck0.Id(), time.Now().Sub(t), s, cfg)
present := cfg.IsMember(ngid) present := cfg.IsMember(ngid)
if (state == JOIN && !present) || (state == LEAVE && present) { if (state == JOIN && !present) || (state == LEAVE && present) {
ts.Fatalf("didn't recover %d correctly after %v", ngid, s) ts.Fatalf("didn't recover %d correctly after %v", ngid, s)
@ -337,32 +342,30 @@ func (ts *Test) killCtrler(ck kvtest.IKVClerk, gid tester.Tgid, ka, va []string)
ts.leaveGroups(sck0, []tester.Tgid{ngid}) ts.leaveGroups(sck0, []tester.Tgid{ngid})
} }
for i := 0; i < len(ka); i++ { if ts.partition {
ts.CheckGet(ck, ka[i], va[i], rpc.Tversion(1))
}
sck0.ExitController()
if ts.leases {
//log.Printf("reconnect old controller")
// reconnect old controller, which should bail out, because // reconnect old controller, which should bail out, because
// it has been superseded. // it has been superseded.
clnt.ConnectAll() clnt.ConnectAll()
time.Sleep(1 * time.Second) time.Sleep(100 * time.Millisecond)
for i := 0; i < len(ka); i++ {
ts.CheckGet(ck, ka[i], va[i], rpc.Tversion(1))
}
} }
//log.Printf("reconnected %v", time.Now().Sub(t))
for i := 0; i < len(ka); i++ {
ts.CheckGet(ck, ka[i], va[i], rpc.Tversion(1))
}
//log.Printf("done get %v", time.Now().Sub(t))
ts.Config.DeleteClient(clnt) ts.Config.DeleteClient(clnt)
ts.Config.DeleteClient(clnt0) ts.Config.DeleteClient(clnt0)
} }
func (ts *Test) electCtrler(ck kvtest.IKVClerk, ka, va []string) { func (ts *Test) concurCtrler(ck kvtest.IKVClerk, ka, va []string) {
const ( const (
NSEC = 5 NSEC = 2
N = 4 N = 4
) )
@ -376,16 +379,16 @@ func (ts *Test) electCtrler(ck kvtest.IKVClerk, ka, va []string) {
ngid := ts.newGid() ngid := ts.newGid()
sck := ts.makeShardCtrler() sck := ts.makeShardCtrler()
sck.InitController() sck.InitController()
//log.Printf("%d(%p): join/leave %v", i, sck, ngid) //log.Printf("%v: electCtrler %d join/leave %v", sck.Id(), i, ngid)
ts.joinGroups(sck, []tester.Tgid{ngid}) ts.joinGroups(sck, []tester.Tgid{ngid})
if ok := ts.checkMember(sck, ngid); ok { if ok := ts.checkMember(sck, ngid); ok {
//log.Printf("%v: electCtrler %d leave %d", sck.Id(), i, ngid)
if ok := ts.leaveGroups(sck, []tester.Tgid{ngid}); !ok { if ok := ts.leaveGroups(sck, []tester.Tgid{ngid}); !ok {
log.Fatalf("electCtrler: %d(%p): leave %v failed", i, sck, ngid) //log.Printf("%v: electCtrler %d leave %v failed", sck.Id(), i, ngid)
} }
} else { } else {
log.Fatalf("electCtrler: %d(%p): join %v failed", i, sck, ngid) //log.Printf("%v: electCtrler %d join %v failed", sck.Id(), i, ngid)
} }
sck.ExitController()
} }
} }
} }
@ -399,6 +402,7 @@ func (ts *Test) electCtrler(ck kvtest.IKVClerk, ka, va []string) {
for i := 0; i < N; i++ { for i := 0; i < N; i++ {
ch <- struct{}{} ch <- struct{}{}
} }
for i := 0; i < len(ka); i++ { for i := 0; i < len(ka); i++ {
ts.CheckGet(ck, ka[i], va[i], rpc.Tversion(1)) ts.CheckGet(ck, ka[i], va[i], rpc.Tversion(1))
} }

View File

@ -138,7 +138,7 @@ func (cfg *Config) End() {
ops := atomic.LoadInt32(&cfg.ops) // number of clerk get/put/append calls ops := atomic.LoadInt32(&cfg.ops) // number of clerk get/put/append calls
fmt.Printf(" ... Passed --") fmt.Printf(" ... Passed --")
fmt.Printf(" %4.1f %d %5d %4d\n", t, npeers, nrpc, ops) fmt.Printf(" time %4.1fs #peers %d #RPCs %5d #Ops %4d\n", t, npeers, nrpc, ops)
} }
} }