Delete shardkv1 for now
This commit is contained in:
parent
62fb62d447
commit
2eff57b373
@ -1,51 +0,0 @@
|
||||
package shardkv
|
||||
|
||||
//
|
||||
// client code to talk to a sharded key/value service.
|
||||
//
|
||||
// the client uses the shardctrler's clerk to query for the current
|
||||
// configuration and find the assignment of shards (keys) to groups,
|
||||
// and then talks to the group that holds the key's shard.
|
||||
//
|
||||
|
||||
import (
|
||||
|
||||
"6.5840/kvsrv1/rpc"
|
||||
"6.5840/kvtest1"
|
||||
"6.5840/shardkv1/shardctrler"
|
||||
"6.5840/tester1"
|
||||
)
|
||||
|
||||
type Clerk struct {
|
||||
clnt *tester.Clnt
|
||||
qck *shardctrler.QueryClerk
|
||||
// You will have to modify this struct.
|
||||
}
|
||||
|
||||
// The tester calls MakeClerk and passes in a clerk for the
|
||||
// shardctrler with only the Query method.
|
||||
func MakeClerk(clnt *tester.Clnt, qck *shardctrler.QueryClerk) kvtest.IKVClerk {
|
||||
ck := &Clerk{
|
||||
clnt: clnt,
|
||||
qck: qck,
|
||||
}
|
||||
// You'll have to add code here.
|
||||
return ck
|
||||
}
|
||||
|
||||
|
||||
// Get a key from a shardgrp. You can use shardcfg.Key2Shard(key) to
|
||||
// find the shard responsible for the key and ck.qck.Query() to read
|
||||
// the current configuration and lookup the servers in the group
|
||||
// responsible for key. You can make a clerk for that group by
|
||||
// calling shardgrp.MakeClerk(ck.clnt, servers).
|
||||
func (ck *Clerk) Get(key string) (string, rpc.Tversion, rpc.Err) {
|
||||
// You will have to modify this function.
|
||||
return "", 0, ""
|
||||
}
|
||||
|
||||
// Put a key to a shard group.
|
||||
func (ck *Clerk) Put(key string, value string, version rpc.Tversion) rpc.Err {
|
||||
// You will have to modify this function.
|
||||
return ""
|
||||
}
|
@ -1,275 +0,0 @@
|
||||
package shardcfg
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"hash/fnv"
|
||||
"log"
|
||||
"runtime/debug"
|
||||
"slices"
|
||||
"testing"
|
||||
|
||||
"6.5840/tester1"
|
||||
)
|
||||
|
||||
type Tshid int
|
||||
type Tnum int
|
||||
|
||||
const (
|
||||
NShards = 12 // The number of shards.
|
||||
NumFirst = Tnum(1)
|
||||
)
|
||||
|
||||
const (
|
||||
Gid1 = tester.Tgid(1)
|
||||
)
|
||||
|
||||
// which shard is a key in?
|
||||
// please use this function,
|
||||
// and please do not change it.
|
||||
func Key2Shard(key string) Tshid {
|
||||
h := fnv.New32a()
|
||||
h.Write([]byte(key))
|
||||
shard := Tshid(Tshid(h.Sum32()) % NShards)
|
||||
return shard
|
||||
}
|
||||
|
||||
// A configuration -- an assignment of shards to groups.
|
||||
// Please don't change this.
|
||||
type ShardConfig struct {
|
||||
Num Tnum // config number
|
||||
Shards [NShards]tester.Tgid // shard -> gid
|
||||
Groups map[tester.Tgid][]string // gid -> servers[]
|
||||
}
|
||||
|
||||
func MakeShardConfig() *ShardConfig {
|
||||
c := &ShardConfig{
|
||||
Groups: make(map[tester.Tgid][]string),
|
||||
}
|
||||
return c
|
||||
}
|
||||
|
||||
func (cfg *ShardConfig) String() string {
|
||||
b, err := json.Marshal(cfg)
|
||||
if err != nil {
|
||||
log.Fatalf("Unmarshall err %v", err)
|
||||
}
|
||||
return string(b)
|
||||
}
|
||||
|
||||
func FromString(s string) *ShardConfig {
|
||||
scfg := &ShardConfig{}
|
||||
if err := json.Unmarshal([]byte(s), scfg); err != nil {
|
||||
log.Fatalf("Unmarshall err %v", err)
|
||||
}
|
||||
return scfg
|
||||
}
|
||||
|
||||
func (cfg *ShardConfig) Copy() *ShardConfig {
|
||||
c := MakeShardConfig()
|
||||
c.Num = cfg.Num
|
||||
c.Shards = cfg.Shards
|
||||
for k, srvs := range cfg.Groups {
|
||||
s := make([]string, len(srvs))
|
||||
copy(s, srvs)
|
||||
c.Groups[k] = s
|
||||
}
|
||||
return c
|
||||
}
|
||||
|
||||
// mostgroup, mostn, leastgroup, leastn
|
||||
func analyze(c *ShardConfig) (tester.Tgid, int, tester.Tgid, int) {
|
||||
counts := map[tester.Tgid]int{}
|
||||
for _, g := range c.Shards {
|
||||
counts[g] += 1
|
||||
}
|
||||
|
||||
mn := -1
|
||||
var mg tester.Tgid = -1
|
||||
ln := 257
|
||||
var lg tester.Tgid = -1
|
||||
// Enforce deterministic ordering, map iteration
|
||||
// is randomized in go
|
||||
groups := make([]tester.Tgid, len(c.Groups))
|
||||
i := 0
|
||||
for k := range c.Groups {
|
||||
groups[i] = k
|
||||
i++
|
||||
}
|
||||
slices.Sort(groups)
|
||||
for _, g := range groups {
|
||||
if counts[g] < ln {
|
||||
ln = counts[g]
|
||||
lg = g
|
||||
}
|
||||
if counts[g] > mn {
|
||||
mn = counts[g]
|
||||
mg = g
|
||||
}
|
||||
}
|
||||
|
||||
return mg, mn, lg, ln
|
||||
}
|
||||
|
||||
// return GID of group with least number of
|
||||
// assigned shards.
|
||||
func least(c *ShardConfig) tester.Tgid {
|
||||
_, _, lg, _ := analyze(c)
|
||||
return lg
|
||||
}
|
||||
|
||||
// balance assignment of shards to groups.
|
||||
// modifies c.
|
||||
func (c *ShardConfig) Rebalance() {
|
||||
// if no groups, un-assign all shards
|
||||
if len(c.Groups) < 1 {
|
||||
for s, _ := range c.Shards {
|
||||
c.Shards[s] = 0
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// assign all unassigned shards
|
||||
for s, g := range c.Shards {
|
||||
_, ok := c.Groups[g]
|
||||
if ok == false {
|
||||
lg := least(c)
|
||||
c.Shards[s] = lg
|
||||
}
|
||||
}
|
||||
|
||||
// move shards from most to least heavily loaded
|
||||
for {
|
||||
mg, mn, lg, ln := analyze(c)
|
||||
if mn < ln+2 {
|
||||
break
|
||||
}
|
||||
// move 1 shard from mg to lg
|
||||
for s, g := range c.Shards {
|
||||
if g == mg {
|
||||
c.Shards[s] = lg
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (cfg *ShardConfig) Join(servers map[tester.Tgid][]string) {
|
||||
changed := false
|
||||
for gid, servers := range servers {
|
||||
_, ok := cfg.Groups[gid]
|
||||
if ok {
|
||||
log.Fatalf("re-Join %v", gid)
|
||||
}
|
||||
for xgid, xservers := range cfg.Groups {
|
||||
for _, s1 := range xservers {
|
||||
for _, s2 := range servers {
|
||||
if s1 == s2 {
|
||||
log.Fatalf("Join(%v) puts server %v in groups %v and %v", gid, s1, xgid, gid)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
// new GID
|
||||
// modify cfg to reflect the Join()
|
||||
cfg.Groups[gid] = servers
|
||||
changed = true
|
||||
}
|
||||
if changed == false {
|
||||
log.Fatalf("Join but no change")
|
||||
}
|
||||
cfg.Num += 1
|
||||
}
|
||||
|
||||
func (cfg *ShardConfig) Leave(gids []tester.Tgid) {
|
||||
changed := false
|
||||
for _, gid := range gids {
|
||||
_, ok := cfg.Groups[gid]
|
||||
if ok == false {
|
||||
// already no GID!
|
||||
debug.PrintStack()
|
||||
log.Fatalf("Leave(%v) but not in config", gid)
|
||||
} else {
|
||||
// modify op.Config to reflect the Leave()
|
||||
delete(cfg.Groups, gid)
|
||||
changed = true
|
||||
}
|
||||
}
|
||||
if changed == false {
|
||||
debug.PrintStack()
|
||||
log.Fatalf("Leave but no change")
|
||||
}
|
||||
cfg.Num += 1
|
||||
}
|
||||
|
||||
func (cfg *ShardConfig) JoinBalance(servers map[tester.Tgid][]string) {
|
||||
cfg.Join(servers)
|
||||
cfg.Rebalance()
|
||||
}
|
||||
|
||||
func (cfg *ShardConfig) LeaveBalance(gids []tester.Tgid) {
|
||||
cfg.Leave(gids)
|
||||
cfg.Rebalance()
|
||||
}
|
||||
|
||||
func (cfg *ShardConfig) GidServers(sh Tshid) (tester.Tgid, []string, bool) {
|
||||
gid := cfg.Shards[sh]
|
||||
srvs, ok := cfg.Groups[gid]
|
||||
return gid, srvs, ok
|
||||
}
|
||||
|
||||
func (cfg *ShardConfig) IsMember(gid tester.Tgid) bool {
|
||||
for _, g := range cfg.Shards {
|
||||
if g == gid {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
func (cfg *ShardConfig) CheckConfig(t *testing.T, groups []tester.Tgid) {
|
||||
if len(cfg.Groups) != len(groups) {
|
||||
fatalf(t, "wanted %v groups, got %v", len(groups), len(cfg.Groups))
|
||||
}
|
||||
|
||||
// are the groups as expected?
|
||||
for _, g := range groups {
|
||||
_, ok := cfg.Groups[g]
|
||||
if ok != true {
|
||||
fatalf(t, "missing group %v", g)
|
||||
}
|
||||
}
|
||||
|
||||
// any un-allocated shards?
|
||||
if len(groups) > 0 {
|
||||
for s, g := range cfg.Shards {
|
||||
_, ok := cfg.Groups[g]
|
||||
if ok == false {
|
||||
fatalf(t, "shard %v -> invalid group %v", s, g)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// more or less balanced sharding?
|
||||
counts := map[tester.Tgid]int{}
|
||||
for _, g := range cfg.Shards {
|
||||
counts[g] += 1
|
||||
}
|
||||
min := 257
|
||||
max := 0
|
||||
for g, _ := range cfg.Groups {
|
||||
if counts[g] > max {
|
||||
max = counts[g]
|
||||
}
|
||||
if counts[g] < min {
|
||||
min = counts[g]
|
||||
}
|
||||
}
|
||||
if max > min+1 {
|
||||
fatalf(t, "max %v too much larger than min %v", max, min)
|
||||
}
|
||||
}
|
||||
|
||||
func fatalf(t *testing.T, format string, args ...any) {
|
||||
debug.PrintStack()
|
||||
t.Fatalf(format, args...)
|
||||
}
|
@ -1,62 +0,0 @@
|
||||
package shardcfg
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"6.5840/tester1"
|
||||
)
|
||||
|
||||
func check_same_config(t *testing.T, c1 ShardConfig, c2 ShardConfig) {
|
||||
if c1.Num != c2.Num {
|
||||
t.Fatalf("Num wrong")
|
||||
}
|
||||
if c1.Shards != c2.Shards {
|
||||
t.Fatalf("Shards wrong")
|
||||
}
|
||||
if len(c1.Groups) != len(c2.Groups) {
|
||||
t.Fatalf("number of Groups is wrong")
|
||||
}
|
||||
for gid, sa := range c1.Groups {
|
||||
sa1, ok := c2.Groups[gid]
|
||||
if ok == false || len(sa1) != len(sa) {
|
||||
t.Fatalf("len(Groups) wrong")
|
||||
}
|
||||
if ok && len(sa1) == len(sa) {
|
||||
for j := 0; j < len(sa); j++ {
|
||||
if sa[j] != sa1[j] {
|
||||
t.Fatalf("Groups wrong")
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestBasic(t *testing.T) {
|
||||
const (
|
||||
Gid1 = 1
|
||||
Gid2 = 2
|
||||
)
|
||||
cfg := MakeShardConfig()
|
||||
cfg.CheckConfig(t, []tester.Tgid{})
|
||||
|
||||
cfg.JoinBalance(map[tester.Tgid][]string{Gid1: []string{"x", "y", "z"}})
|
||||
cfg.CheckConfig(t, []tester.Tgid{Gid1})
|
||||
|
||||
cfg.JoinBalance(map[tester.Tgid][]string{Gid2: []string{"a", "b", "c"}})
|
||||
cfg.CheckConfig(t, []tester.Tgid{Gid1, Gid2})
|
||||
|
||||
sa1 := cfg.Groups[Gid1]
|
||||
if len(sa1) != 3 || sa1[0] != "x" || sa1[1] != "y" || sa1[2] != "z" {
|
||||
t.Fatalf("wrong servers for gid %v: %v\n", Gid1, sa1)
|
||||
}
|
||||
sa2 := cfg.Groups[Gid2]
|
||||
if len(sa2) != 3 || sa2[0] != "a" || sa2[1] != "b" || sa2[2] != "c" {
|
||||
t.Fatalf("wrong servers for gid %v: %v\n", Gid2, sa2)
|
||||
}
|
||||
|
||||
cfg.LeaveBalance([]tester.Tgid{Gid1})
|
||||
cfg.CheckConfig(t, []tester.Tgid{Gid2})
|
||||
|
||||
cfg.LeaveBalance([]tester.Tgid{Gid2})
|
||||
cfg.CheckConfig(t, []tester.Tgid{})
|
||||
}
|
@ -1,49 +0,0 @@
|
||||
package shardctrler
|
||||
|
||||
import (
|
||||
// "log"
|
||||
"sync/atomic"
|
||||
|
||||
"6.5840/kvsrv1/rpc"
|
||||
"6.5840/tester1"
|
||||
)
|
||||
|
||||
type Clerk struct {
|
||||
clnt *tester.Clnt
|
||||
servers []string
|
||||
deposed *int32
|
||||
// You will have to modify this struct.
|
||||
}
|
||||
|
||||
// The shard controller can use MakeClerk to make a clerk for the kvraft
|
||||
// group with the servers `servers`.
|
||||
func MakeClerk(clnt *tester.Clnt, servers []string, deposed *int32) *Clerk {
|
||||
ck := &Clerk{clnt: clnt, servers: servers, deposed: deposed}
|
||||
// You may add code here.
|
||||
return ck
|
||||
}
|
||||
|
||||
func (ck *Clerk) isDeposed() bool {
|
||||
z := atomic.LoadInt32(ck.deposed)
|
||||
return z == 1
|
||||
}
|
||||
|
||||
// You can reuse your kvraft Get
|
||||
func (ck *Clerk) Get(key string) (string, rpc.Tversion, rpc.Err) {
|
||||
args := rpc.GetArgs{}
|
||||
args.Key = key
|
||||
|
||||
// You'll have to add code here.
|
||||
return "", 0, ""
|
||||
}
|
||||
|
||||
// You can reuse your kvraft Put
|
||||
func (ck *Clerk) Put(key string, value string, version rpc.Tversion) rpc.Err {
|
||||
args := rpc.PutArgs{}
|
||||
args.Key = key
|
||||
args.Value = value
|
||||
args.Version = version
|
||||
|
||||
// You'll have to add code here.
|
||||
return ""
|
||||
}
|
@ -1,58 +0,0 @@
|
||||
package lock
|
||||
|
||||
import (
|
||||
"log"
|
||||
"time"
|
||||
|
||||
"6.5840/kvsrv1/rpc"
|
||||
"6.5840/kvtest1"
|
||||
)
|
||||
|
||||
type Lock struct {
|
||||
kvtest.IKVClerk
|
||||
l string
|
||||
id string
|
||||
ver rpc.Tversion
|
||||
}
|
||||
|
||||
func MakeLock(ck kvtest.IKVClerk, l string) *Lock {
|
||||
lk := &Lock{IKVClerk: ck}
|
||||
// You may add core here
|
||||
return lk
|
||||
}
|
||||
|
||||
func (lk *Lock) AcquireLeadership() {
|
||||
for {
|
||||
if val, ver, err := lk.Get(lk.l); err == rpc.OK {
|
||||
if val == "" { // put only when lock is free
|
||||
if err := lk.Put(lk.l, lk.id, ver); err == rpc.OK {
|
||||
lk.ver = ver + 1
|
||||
return
|
||||
} else if err == rpc.ErrMaybe { // check if put succeeded?
|
||||
if val, ver, err := lk.Get(lk.l); err == rpc.OK {
|
||||
if val == lk.id {
|
||||
lk.ver = ver
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
time.Sleep(1 * time.Millisecond)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// for two testing purposes: 1) for the ctrler that is a leader to
|
||||
// give up its leadership; 2) to take back leadership from a
|
||||
// partitioned/deposed ctrler using a new ctrler.
|
||||
func (lk *Lock) ReleaseLeadership() rpc.Err {
|
||||
_, ver, err := lk.Get(lk.l)
|
||||
if err != rpc.OK {
|
||||
log.Printf("ResetLock: %v err %v", lk.l, err)
|
||||
}
|
||||
if err := lk.Put(lk.l, "", ver); err == rpc.OK || err == rpc.ErrMaybe {
|
||||
return rpc.OK
|
||||
} else {
|
||||
return err
|
||||
}
|
||||
}
|
@ -1,115 +0,0 @@
|
||||
package shardctrler
|
||||
|
||||
//
|
||||
// Shardctrler implemented as a clerk.
|
||||
//
|
||||
|
||||
import (
|
||||
|
||||
"sync/atomic"
|
||||
|
||||
"6.5840/kvraft1"
|
||||
"6.5840/kvsrv1/rpc"
|
||||
"6.5840/kvtest1"
|
||||
"6.5840/shardkv1/shardcfg"
|
||||
"6.5840/tester1"
|
||||
)
|
||||
|
||||
const (
|
||||
ErrDeposed = "ErrDeposed"
|
||||
)
|
||||
|
||||
|
||||
// The query clerk must support only Query(); it is intended for use
|
||||
// by shardkv clerks to read the current configuration (see
|
||||
// ../client.go).
|
||||
type QueryClerk struct {
|
||||
kvtest.IKVClerk
|
||||
// Your data here.
|
||||
}
|
||||
|
||||
// Make a query clerk for controller's kvraft group to invoke just
|
||||
// Query()
|
||||
func MakeQueryClerk(clnt *tester.Clnt, servers []string) *QueryClerk {
|
||||
qck := &QueryClerk{
|
||||
IKVClerk: kvraft.MakeClerk(clnt, servers),
|
||||
}
|
||||
// Your code here.
|
||||
return qck
|
||||
}
|
||||
|
||||
// Return the current configuration. You can use Get() to retrieve
|
||||
// the string representing the configuration and shardcfg.ToShardCfg
|
||||
// to unmarshal the string into a ShardConfig.
|
||||
func (qck *QueryClerk) Query() (*shardcfg.ShardConfig, rpc.Tversion) {
|
||||
// Your code here.
|
||||
return nil, 0
|
||||
}
|
||||
|
||||
// ShardCtrlerClerk for the shard controller. It implements the
|
||||
// methods for Init(), Join(), Leave(), etc.
|
||||
type ShardCtrlerClerk struct {
|
||||
clnt *tester.Clnt
|
||||
deposed int32 // set by Stepdown()
|
||||
|
||||
// Your data here.
|
||||
}
|
||||
|
||||
// Make a ShardCltlerClerk for the shard controller, which stores its
|
||||
// state in a kvraft group. You can call (and implement) the
|
||||
// MakeClerk method in client.go to make a kvraft clerk for the kvraft
|
||||
// group with the servers `servers`.
|
||||
func MakeShardCtrlerClerk(clnt *tester.Clnt, servers []string) *ShardCtrlerClerk {
|
||||
sck := &ShardCtrlerClerk{clnt: clnt}
|
||||
// Your code here.
|
||||
return sck
|
||||
}
|
||||
|
||||
|
||||
// Called once by the tester to supply the first configuration. You
|
||||
// can marshal ShardConfig into a string using shardcfg.String(), and
|
||||
// then Put it in the kvraft group for the controller at version 0.
|
||||
// You can pick the key to name the configuration.
|
||||
func (sck *ShardCtrlerClerk) Init(cfg *shardcfg.ShardConfig) rpc.Err {
|
||||
// Your code here
|
||||
return rpc.OK
|
||||
}
|
||||
|
||||
// Add group gid. Use shardcfg.JoinBalance() to compute the new
|
||||
// configuration; the supplied `srvrs` are the servers for the new
|
||||
// group. You can find the servers for existing groups in the
|
||||
// configuration (which you can retrieve using Query()) and you can
|
||||
// make a clerk for a group by calling shardgrp.MakeClerk(sck.clnt,
|
||||
// servers), and then invoke its Freeze/InstallShard methods.
|
||||
func (sck *ShardCtrlerClerk) Join(gid tester.Tgid, srvs []string) rpc.Err {
|
||||
// Your code here
|
||||
return rpc.ErrNoKey
|
||||
}
|
||||
|
||||
// Group gid leaves. You can use shardcfg.LeaveBalance() to compute
|
||||
// the new configuration.
|
||||
func (sck *ShardCtrlerClerk) Leave(gid tester.Tgid) rpc.Err {
|
||||
// Your code here
|
||||
return rpc.ErrNoKey
|
||||
}
|
||||
|
||||
// the tester calls Stepdown() to force a ctrler to step down while it
|
||||
// is perhaps in the middle of a join/move. for your convenience, we
|
||||
// also supply isDeposed() method to test rf.dead in long-running
|
||||
// loops
|
||||
func (sck *ShardCtrlerClerk) Stepdown() {
|
||||
atomic.StoreInt32(&sck.deposed, 1)
|
||||
}
|
||||
|
||||
func (sck *ShardCtrlerClerk) isDeposed() bool {
|
||||
z := atomic.LoadInt32(&sck.deposed)
|
||||
return z == 1
|
||||
}
|
||||
|
||||
|
||||
// Return the current configuration
|
||||
func (sck *ShardCtrlerClerk) Query() (*shardcfg.ShardConfig, rpc.Tversion, rpc.Err) {
|
||||
// Your code here.
|
||||
return nil, 0, ""
|
||||
}
|
||||
|
@ -1,38 +0,0 @@
|
||||
package shardgrp
|
||||
|
||||
import (
|
||||
|
||||
|
||||
"6.5840/kvsrv1/rpc"
|
||||
"6.5840/shardkv1/shardcfg"
|
||||
"6.5840/tester1"
|
||||
)
|
||||
|
||||
type Clerk struct {
|
||||
clnt *tester.Clnt
|
||||
servers []string
|
||||
leader int // last successful leader (index into servers[])
|
||||
}
|
||||
|
||||
func MakeClerk(clnt *tester.Clnt, servers []string) *Clerk {
|
||||
ck := &Clerk{clnt: clnt, servers: servers}
|
||||
return ck
|
||||
}
|
||||
|
||||
func (ck *Clerk) Get(cid shardcfg.Tnum, key string, n shardcfg.Tnum) (string, rpc.Tversion, rpc.Err) {
|
||||
// Your code here
|
||||
return "", 0, ""
|
||||
}
|
||||
|
||||
func (ck *Clerk) Put(key string, value string, version rpc.Tversion, n shardcfg.Tnum) (bool, rpc.Err) {
|
||||
// Your code here
|
||||
return false, ""
|
||||
}
|
||||
|
||||
func (ck *Clerk) Freeze(s shardcfg.Tshid, num shardcfg.Tnum) ([]byte, rpc.Err) {
|
||||
return nil, ""
|
||||
}
|
||||
|
||||
func (ck *Clerk) InstallShard(s shardcfg.Tshid, state []byte, num shardcfg.Tnum) rpc.Err {
|
||||
return ""
|
||||
}
|
@ -1,94 +0,0 @@
|
||||
package shardgrp
|
||||
|
||||
import (
|
||||
"sync/atomic"
|
||||
|
||||
|
||||
"6.5840/kvraft1/rsm"
|
||||
"6.5840/kvsrv1/rpc"
|
||||
"6.5840/labgob"
|
||||
"6.5840/labrpc"
|
||||
"6.5840/shardkv1/shardgrp/shardrpc"
|
||||
"6.5840/tester1"
|
||||
)
|
||||
|
||||
|
||||
type KVServer struct {
|
||||
gid tester.Tgid
|
||||
me int
|
||||
dead int32 // set by Kill()
|
||||
rsm *rsm.RSM
|
||||
|
||||
}
|
||||
|
||||
|
||||
func (kv *KVServer) DoOp(req any) any {
|
||||
// Your code here
|
||||
return nil
|
||||
}
|
||||
|
||||
|
||||
func (kv *KVServer) Snapshot() []byte {
|
||||
// Your code here
|
||||
return nil
|
||||
}
|
||||
|
||||
func (kv *KVServer) Restore(data []byte) {
|
||||
// Your code here
|
||||
}
|
||||
|
||||
func (kv *KVServer) Get(args *shardrpc.GetArgs, reply *rpc.GetReply) {
|
||||
// Your code here
|
||||
}
|
||||
|
||||
func (kv *KVServer) Put(args *shardrpc.PutArgs, reply *rpc.PutReply) {
|
||||
// Your code here
|
||||
}
|
||||
|
||||
// Freeze the specified shard (i.e., reject future Get/Puts for this
|
||||
// shard) and return the key/values stored in that shard.
|
||||
func (kv *KVServer) Freeze(args *shardrpc.FreezeArgs, reply *shardrpc.FreezeReply) {
|
||||
// Your code here
|
||||
}
|
||||
|
||||
// Install the supplied state for the specified shard.
|
||||
func (kv *KVServer) InstallShard(args *shardrpc.InstallShardArgs, reply *shardrpc.InstallShardReply) {
|
||||
// Your code here
|
||||
}
|
||||
|
||||
// the tester calls Kill() when a KVServer instance won't
|
||||
// be needed again. for your convenience, we supply
|
||||
// code to set rf.dead (without needing a lock),
|
||||
// and a killed() method to test rf.dead in
|
||||
// long-running loops. you can also add your own
|
||||
// code to Kill(). you're not required to do anything
|
||||
// about this, but it may be convenient (for example)
|
||||
// to suppress debug output from a Kill()ed instance.
|
||||
func (kv *KVServer) Kill() {
|
||||
atomic.StoreInt32(&kv.dead, 1)
|
||||
// Your code here, if desired.
|
||||
}
|
||||
|
||||
func (kv *KVServer) killed() bool {
|
||||
z := atomic.LoadInt32(&kv.dead)
|
||||
return z == 1
|
||||
}
|
||||
|
||||
// StartKVServer() and MakeRSM() must return quickly, so they should
|
||||
// start goroutines for any long-running work.
|
||||
func StartKVServer(servers []*labrpc.ClientEnd, gid tester.Tgid, me int, persister *tester.Persister, maxraftstate int) []tester.IService {
|
||||
// call labgob.Register on structures you want
|
||||
// Go's RPC library to marshall/unmarshall.
|
||||
labgob.Register(shardrpc.PutArgs{})
|
||||
labgob.Register(shardrpc.GetArgs{})
|
||||
labgob.Register(shardrpc.FreezeArgs{})
|
||||
labgob.Register(shardrpc.InstallShardArgs{})
|
||||
labgob.Register(shardrpc.DeleteShardArgs{})
|
||||
labgob.Register(rsm.Op{})
|
||||
|
||||
kv := &KVServer{gid: gid, me: me}
|
||||
kv.rsm = rsm.MakeRSM(servers, me, persister, maxraftstate, kv)
|
||||
|
||||
// Your code here
|
||||
return []tester.IService{kv, kv.rsm.Raft()}
|
||||
}
|
@ -1,50 +0,0 @@
|
||||
package shardrpc
|
||||
|
||||
import (
|
||||
"6.5840/kvsrv1/rpc"
|
||||
"6.5840/shardkv1/shardcfg"
|
||||
)
|
||||
|
||||
// Same as Put in kvsrv1/rpc, but with a configuration number.
|
||||
type PutArgs struct {
|
||||
Key string
|
||||
Value string
|
||||
Version rpc.Tversion
|
||||
Num shardcfg.Tnum
|
||||
}
|
||||
|
||||
// Same as Get in kvsrv1/rpc, but with a configuration number.
|
||||
type GetArgs struct {
|
||||
Key string
|
||||
Num shardcfg.Tnum
|
||||
}
|
||||
|
||||
type FreezeArgs struct {
|
||||
Shard shardcfg.Tshid
|
||||
Num shardcfg.Tnum
|
||||
}
|
||||
|
||||
type FreezeReply struct {
|
||||
State []byte
|
||||
Num shardcfg.Tnum
|
||||
Err rpc.Err
|
||||
}
|
||||
|
||||
type InstallShardArgs struct {
|
||||
Shard shardcfg.Tshid
|
||||
State []byte
|
||||
Num shardcfg.Tnum
|
||||
}
|
||||
|
||||
type InstallShardReply struct {
|
||||
Err rpc.Err
|
||||
}
|
||||
|
||||
type DeleteShardArgs struct {
|
||||
Shard shardcfg.Tshid
|
||||
Num shardcfg.Tnum
|
||||
}
|
||||
|
||||
type DeleteShardReply struct {
|
||||
Err rpc.Err
|
||||
}
|
@ -1,304 +0,0 @@
|
||||
package shardkv
|
||||
|
||||
import (
|
||||
"log"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"6.5840/kvsrv1/rpc"
|
||||
"6.5840/kvtest1"
|
||||
"6.5840/shardkv1/shardcfg"
|
||||
"6.5840/tester1"
|
||||
// "6.5840/shardkv1/shardctrler"
|
||||
)
|
||||
|
||||
const (
|
||||
NGRP = 8
|
||||
)
|
||||
|
||||
// Setup a k/v service with 1 shardgrp (group 0) for storing the
|
||||
// controller to store its state and 1 shardgrp (group 1) to store all
|
||||
// shards. Test's controller's Init() and Query(), and shardkv's
|
||||
// Get/Put without reconfiguration.
|
||||
func TestStaticOneShardGroup5A(t *testing.T) {
|
||||
ts := MakeTest(t, "Test (5A): one shard group ...", true, false)
|
||||
defer ts.Cleanup()
|
||||
|
||||
// The tester's setupKVService() sets up a kvraft group for the
|
||||
// controller to store configurations and calls the controller's
|
||||
// Init() method to create the first configuration.
|
||||
ts.setupKVService()
|
||||
sck := ts.ShardCtrler() // get the controller clerk from tester
|
||||
|
||||
// Read the initial configuration and check it
|
||||
cfg, v, err := sck.Query()
|
||||
if err != rpc.OK {
|
||||
ts.t.Fatalf("Query failed %v", err)
|
||||
}
|
||||
if v != 1 || cfg.Num != 1 || cfg.Shards[0] != shardcfg.Gid1 {
|
||||
ts.t.Fatalf("Static wrong %v %v", cfg, v)
|
||||
}
|
||||
cfg.CheckConfig(t, []tester.Tgid{shardcfg.Gid1})
|
||||
|
||||
ck := ts.MakeClerk() // make a shardkv clerk
|
||||
ka, va := ts.SpreadPuts(ck, shardcfg.NShards) // do some puts
|
||||
n := len(ka)
|
||||
for i := 0; i < n; i++ {
|
||||
ts.CheckGet(ck, ka[i], va[i], rpc.Tversion(1)) // check the puts
|
||||
}
|
||||
}
|
||||
|
||||
// test shardctrler's join, which adds a new group Gid2 and must move
|
||||
// shards to the new group and the old group should reject Get/Puts on
|
||||
// shards that moved.
|
||||
func TestJoinBasic5A(t *testing.T) {
|
||||
ts := MakeTest(t, "Test (5A): a group joins...", true, false)
|
||||
defer ts.Cleanup()
|
||||
|
||||
gid1 := ts.setupKVService()
|
||||
ck := ts.MakeClerk()
|
||||
ka, va := ts.SpreadPuts(ck, shardcfg.NShards)
|
||||
|
||||
sck := ts.ShardCtrler()
|
||||
cfg, _, err := sck.Query()
|
||||
if err != rpc.OK {
|
||||
ts.t.Fatalf("Query: err %v", err)
|
||||
}
|
||||
|
||||
gid2 := ts.newGid()
|
||||
err = ts.joinGroups(sck, []tester.Tgid{gid2})
|
||||
if err != rpc.OK {
|
||||
ts.t.Fatalf("joinGroups: err %v", err)
|
||||
}
|
||||
|
||||
cfg1, _, err := sck.Query()
|
||||
if err != rpc.OK {
|
||||
ts.t.Fatalf("Query 1: err %v", err)
|
||||
}
|
||||
|
||||
if cfg.Num+1 != cfg1.Num {
|
||||
ts.t.Fatalf("wrong num %d expected %d ", cfg1.Num, cfg.Num+1)
|
||||
}
|
||||
|
||||
if !cfg1.IsMember(gid2) {
|
||||
ts.t.Fatalf("%d isn't a member of %v", gid2, cfg1)
|
||||
}
|
||||
|
||||
// check shards at shardcfg.Gid2
|
||||
ts.checkShutdownSharding(gid1, gid2, ka, va)
|
||||
|
||||
for i := 0; i < len(ka); i++ {
|
||||
ts.CheckGet(ck, ka[i], va[i], rpc.Tversion(1))
|
||||
}
|
||||
|
||||
// check shards at shardcfg.Gid1
|
||||
ts.checkShutdownSharding(gid2, gid1, ka, va)
|
||||
|
||||
for i := 0; i < len(ka); i++ {
|
||||
ts.CheckGet(ck, ka[i], va[i], rpc.Tversion(1))
|
||||
}
|
||||
}
|
||||
|
||||
// test shardctrler's leave
|
||||
func TestJoinLeaveBasic5A(t *testing.T) {
|
||||
ts := MakeTest(t, "Test (5A): basic groups join/leave ...", true, false)
|
||||
defer ts.Cleanup()
|
||||
|
||||
gid1 := ts.setupKVService()
|
||||
ck := ts.MakeClerk()
|
||||
ka, va := ts.SpreadPuts(ck, shardcfg.NShards)
|
||||
|
||||
sck := ts.ShardCtrler()
|
||||
gid2 := ts.newGid()
|
||||
err := ts.joinGroups(sck, []tester.Tgid{gid2})
|
||||
if err != rpc.OK {
|
||||
ts.t.Fatalf("joinGroups: err %v", err)
|
||||
}
|
||||
|
||||
// check shards at shardcfg.Gid2
|
||||
ts.checkShutdownSharding(gid1, gid2, ka, va)
|
||||
|
||||
for i := 0; i < len(ka); i++ {
|
||||
ts.CheckGet(ck, ka[i], va[i], rpc.Tversion(1))
|
||||
}
|
||||
|
||||
err = sck.Leave(shardcfg.Gid1)
|
||||
if err != rpc.OK {
|
||||
ts.t.Fatalf("Leave: err %v", err)
|
||||
}
|
||||
cfg, _, err := sck.Query()
|
||||
if err != rpc.OK {
|
||||
ts.t.Fatalf("Query err %v", err)
|
||||
}
|
||||
if cfg.IsMember(shardcfg.Gid1) {
|
||||
ts.t.Fatalf("%d is a member of %v", shardcfg.Gid1, cfg)
|
||||
}
|
||||
|
||||
ts.Group(shardcfg.Gid1).Shutdown()
|
||||
|
||||
for i := 0; i < len(ka); i++ {
|
||||
ts.CheckGet(ck, ka[i], va[i], rpc.Tversion(1))
|
||||
}
|
||||
|
||||
// bring the crashed shard/group back to life.
|
||||
ts.Group(shardcfg.Gid1).StartServers()
|
||||
|
||||
// Rejoin
|
||||
sck.Join(shardcfg.Gid1, ts.Group(shardcfg.Gid1).SrvNames())
|
||||
|
||||
for i := 0; i < len(ka); i++ {
|
||||
ts.CheckGet(ck, ka[i], va[i], rpc.Tversion(1))
|
||||
}
|
||||
|
||||
// check shards at shardcfg.Gid2
|
||||
ts.checkShutdownSharding(gid2, gid1, ka, va)
|
||||
}
|
||||
|
||||
// test many groups joining and leaving, reliable or unreliable
|
||||
func joinLeave5A(t *testing.T, reliable bool, part string) {
|
||||
ts := MakeTest(t, "Test (5A): many groups join/leave ...", reliable, false)
|
||||
defer ts.Cleanup()
|
||||
|
||||
ts.setupKVService()
|
||||
ck := ts.MakeClerk()
|
||||
ka, va := ts.SpreadPuts(ck, shardcfg.NShards)
|
||||
|
||||
sck := ts.ShardCtrler()
|
||||
grps := ts.groups(NGRP)
|
||||
|
||||
ts.joinGroups(sck, grps)
|
||||
|
||||
ts.checkShutdownSharding(grps[0], grps[1], ka, va)
|
||||
|
||||
for i := 0; i < len(ka); i++ {
|
||||
ts.CheckGet(ck, ka[i], va[i], rpc.Tversion(1))
|
||||
}
|
||||
|
||||
ts.leaveGroups(sck, grps)
|
||||
|
||||
for i := 0; i < len(ka); i++ {
|
||||
ts.CheckGet(ck, ka[i], va[i], rpc.Tversion(1))
|
||||
}
|
||||
}
|
||||
|
||||
func TestManyJoinLeaveReliable5A(t *testing.T) {
|
||||
joinLeave5A(t, true, "Test (5A): many groups join/leave reliable...")
|
||||
}
|
||||
|
||||
func TestManyJoinLeaveUnreliable5A(t *testing.T) {
|
||||
joinLeave5A(t, false, "Test (5A): many groups join/leave unreliable...")
|
||||
}
|
||||
|
||||
// Test we can recover from complete shutdown using snapshots
|
||||
func TestSnapshot5A(t *testing.T) {
|
||||
const NGRP = 3
|
||||
|
||||
ts := MakeTest(t, "Test (5A): snapshots ...", true, false)
|
||||
defer ts.Cleanup()
|
||||
|
||||
ts.setupKVService()
|
||||
ck := ts.MakeClerk()
|
||||
ka, va := ts.SpreadPuts(ck, shardcfg.NShards)
|
||||
|
||||
sck := ts.ShardCtrler()
|
||||
grps := ts.groups(2)
|
||||
ts.joinGroups(sck, grps)
|
||||
|
||||
// check shards at shardcfg.Gid2
|
||||
ts.checkShutdownSharding(grps[0], grps[1], ka, va)
|
||||
|
||||
for i := 0; i < len(ka); i++ {
|
||||
ts.CheckGet(ck, ka[i], va[i], rpc.Tversion(1))
|
||||
}
|
||||
|
||||
for i := tester.Tgid(0); i < NGRP; i++ {
|
||||
ts.Group(shardcfg.Gid1).Shutdown()
|
||||
}
|
||||
for i := tester.Tgid(0); i < NGRP; i++ {
|
||||
ts.Group(shardcfg.Gid1).StartServers()
|
||||
}
|
||||
|
||||
for i := 0; i < len(ka); i++ {
|
||||
ts.CheckGet(ck, ka[i], va[i], rpc.Tversion(1))
|
||||
}
|
||||
}
|
||||
|
||||
// Test linearizability with groups joining/leaving and `nclnt`
|
||||
// concurrent clerks put/get's in `unreliable` net.
|
||||
func concurrentClerk(t *testing.T, nclnt int, reliable bool, part string) {
|
||||
const (
|
||||
NSEC = 20
|
||||
)
|
||||
|
||||
ts := MakeTest(t, part, reliable, true)
|
||||
defer ts.Cleanup()
|
||||
|
||||
ts.setupKVService()
|
||||
|
||||
ka := kvtest.MakeKeys(shardcfg.NShards)
|
||||
ch := make(chan []kvtest.ClntRes)
|
||||
|
||||
start := time.Now()
|
||||
|
||||
go func(ch chan []kvtest.ClntRes) {
|
||||
rs := ts.SpawnClientsAndWait(nclnt, NSEC*time.Second, func(me int, ck kvtest.IKVClerk, done chan struct{}) kvtest.ClntRes {
|
||||
return ts.OneClientPut(me, ck, ka, done)
|
||||
})
|
||||
ch <- rs
|
||||
}(ch)
|
||||
|
||||
sck := ts.ShardCtrler()
|
||||
grps := ts.groups(NGRP)
|
||||
ts.joinGroups(sck, grps)
|
||||
|
||||
ts.leaveGroups(sck, grps)
|
||||
|
||||
log.Printf("time joining/leaving %v", time.Since(start))
|
||||
|
||||
rsa := <-ch
|
||||
|
||||
log.Printf("rsa %v", rsa)
|
||||
|
||||
ts.CheckPorcupine()
|
||||
}
|
||||
|
||||
// Test linearizability with groups joining/leaving and 1 concurrent clerks put/get's
|
||||
func TestOneConcurrentClerkReliable5A(t *testing.T) {
|
||||
concurrentClerk(t, 1, true, "Test (5A): one concurrent clerk reliable...")
|
||||
}
|
||||
|
||||
// Test linearizability with groups joining/leaving and many concurrent clerks put/get's
|
||||
func TestManyConcurrentClerkReliable5A(t *testing.T) {
|
||||
const NCLNT = 10
|
||||
concurrentClerk(t, NCLNT, true, "Test (5A): many concurrent clerks reliable...")
|
||||
}
|
||||
|
||||
// Test linearizability with groups joining/leaving and 1 concurrent clerks put/get's
|
||||
func TestOneConcurrentClerkUnreliable5A(t *testing.T) {
|
||||
concurrentClerk(t, 1, false, "Test (5A): one concurrent clerk unreliable ...")
|
||||
}
|
||||
|
||||
// Test linearizability with groups joining/leaving and many concurrent clerks put/get's
|
||||
func TestManyConcurrentClerkUnreliable5A(t *testing.T) {
|
||||
const NCLNT = 10
|
||||
concurrentClerk(t, NCLNT, false, "Test (5A): many concurrent clerks unreliable...")
|
||||
}
|
||||
|
||||
// test recovery of partitioned controlers
|
||||
func TestRecoverCtrler5B(t *testing.T) {
|
||||
const (
|
||||
NPARITITON = 10
|
||||
)
|
||||
|
||||
ts := MakeTest(t, "Test (5B): recover controler ...", true, false)
|
||||
defer ts.Cleanup()
|
||||
|
||||
ts.setupKVService()
|
||||
ck := ts.MakeClerk()
|
||||
ka, va := ts.SpreadPuts(ck, shardcfg.NShards)
|
||||
|
||||
for i := 0; i < NPARITITON; i++ {
|
||||
ts.partitionCtrler(ck, ka, va)
|
||||
}
|
||||
}
|
||||
|
@ -1,313 +0,0 @@
|
||||
package shardkv
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"log"
|
||||
"math/rand"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"6.5840/kvraft1"
|
||||
"6.5840/kvsrv1/rpc"
|
||||
"6.5840/kvtest1"
|
||||
"6.5840/labrpc"
|
||||
"6.5840/shardkv1/shardcfg"
|
||||
"6.5840/shardkv1/shardctrler"
|
||||
"6.5840/shardkv1/shardgrp"
|
||||
"6.5840/tester1"
|
||||
)
|
||||
|
||||
type Test struct {
|
||||
t *testing.T
|
||||
*kvtest.Test
|
||||
|
||||
sck *shardctrler.ShardCtrlerClerk
|
||||
part string
|
||||
|
||||
maxraftstate int
|
||||
mu sync.Mutex
|
||||
ngid tester.Tgid
|
||||
}
|
||||
|
||||
const (
|
||||
Controler = tester.Tgid(0) // controler uses group 0 for a kvraft group
|
||||
NSRV = 3 // servers per group
|
||||
INTERGRPDELAY = 200 // time in ms between group changes
|
||||
)
|
||||
|
||||
// Setup a kvraft group (group 0) for the shard controller and make
|
||||
// the controller clerk.
|
||||
func MakeTest(t *testing.T, part string, reliable, randomkeys bool) *Test {
|
||||
ts := &Test{
|
||||
ngid: shardcfg.Gid1 + 1, // Gid1 is in use
|
||||
t: t,
|
||||
maxraftstate: -1,
|
||||
}
|
||||
cfg := tester.MakeConfig(t, NSRV, reliable, ts.StartKVServerControler)
|
||||
ts.Test = kvtest.MakeTest(t, cfg, randomkeys, ts)
|
||||
ts.sck = ts.makeShardCtrlerClerk()
|
||||
ts.Begin(part)
|
||||
return ts
|
||||
}
|
||||
|
||||
func (ts *Test) StartKVServerControler(servers []*labrpc.ClientEnd, gid tester.Tgid, me int, persister *tester.Persister) []tester.IService {
|
||||
return kvraft.StartKVServer(servers, gid, me, persister, ts.maxraftstate)
|
||||
}
|
||||
|
||||
func (ts *Test) MakeClerk() kvtest.IKVClerk {
|
||||
clnt := ts.Config.MakeClient()
|
||||
ck := MakeClerk(clnt, ts.makeQueryClerk())
|
||||
return &kvtest.TestClerk{ck, clnt}
|
||||
}
|
||||
|
||||
func (ts *Test) DeleteClerk(ck kvtest.IKVClerk) {
|
||||
tck := ck.(*kvtest.TestClerk)
|
||||
ts.DeleteClient(tck.Clnt)
|
||||
}
|
||||
|
||||
func (ts *Test) ShardCtrler() *shardctrler.ShardCtrlerClerk {
|
||||
return ts.sck
|
||||
}
|
||||
|
||||
func (ts *Test) makeShardCtrlerClerk() *shardctrler.ShardCtrlerClerk {
|
||||
ck, _ := ts.makeShardCtrlerClerkClnt()
|
||||
return ck
|
||||
}
|
||||
|
||||
func (ts *Test) makeShardCtrlerClerkClnt() (*shardctrler.ShardCtrlerClerk, *tester.Clnt) {
|
||||
srvs := ts.Group(Controler).SrvNames()
|
||||
clnt := ts.Config.MakeClient()
|
||||
return shardctrler.MakeShardCtrlerClerk(clnt, srvs), clnt
|
||||
}
|
||||
|
||||
func (ts *Test) makeQueryClerk() *shardctrler.QueryClerk {
|
||||
srvs := ts.Group(Controler).SrvNames()
|
||||
clnt := ts.Config.MakeClient()
|
||||
return shardctrler.MakeQueryClerk(clnt, srvs)
|
||||
}
|
||||
|
||||
func (ts *Test) newGid() tester.Tgid {
|
||||
ts.mu.Lock()
|
||||
defer ts.mu.Unlock()
|
||||
|
||||
gid := ts.ngid
|
||||
ts.ngid += 1
|
||||
return gid
|
||||
}
|
||||
|
||||
func (ts *Test) groups(n int) []tester.Tgid {
|
||||
grps := make([]tester.Tgid, n)
|
||||
for i := 0; i < n; i++ {
|
||||
grps[i] = ts.newGid()
|
||||
}
|
||||
return grps
|
||||
}
|
||||
|
||||
// Set up KVServervice with one group Gid1. Gid1 should initialize
|
||||
// itself to own all shards.
|
||||
func (ts *Test) setupKVService() tester.Tgid {
|
||||
scfg := shardcfg.MakeShardConfig()
|
||||
ts.Config.MakeGroupStart(shardcfg.Gid1, NSRV, ts.StartKVServerShard)
|
||||
scfg.JoinBalance(map[tester.Tgid][]string{shardcfg.Gid1: ts.Group(shardcfg.Gid1).SrvNames()})
|
||||
if err := ts.sck.Init(scfg); err != rpc.OK {
|
||||
ts.t.Fatalf("Init err %v", err)
|
||||
}
|
||||
//ts.sck.AcquireLeadership()
|
||||
return shardcfg.Gid1
|
||||
}
|
||||
|
||||
func (ts *Test) StartKVServerShard(servers []*labrpc.ClientEnd, gid tester.Tgid, me int, persister *tester.Persister) []tester.IService {
|
||||
return shardgrp.StartKVServer(servers, gid, me, persister, ts.maxraftstate)
|
||||
}
|
||||
|
||||
func (ts *Test) joinGroups(sck *shardctrler.ShardCtrlerClerk, gids []tester.Tgid) rpc.Err {
|
||||
for i, gid := range gids {
|
||||
ts.Config.MakeGroupStart(gid, NSRV, ts.StartKVServerShard)
|
||||
if err := sck.Join(gid, ts.Group(gid).SrvNames()); err != rpc.OK {
|
||||
return err
|
||||
}
|
||||
if i < len(gids)-1 {
|
||||
time.Sleep(INTERGRPDELAY * time.Millisecond)
|
||||
}
|
||||
}
|
||||
return rpc.OK
|
||||
}
|
||||
|
||||
func (ts *Test) leaveGroups(sck *shardctrler.ShardCtrlerClerk, gids []tester.Tgid) rpc.Err {
|
||||
for i, gid := range gids {
|
||||
if err := sck.Leave(gid); err != rpc.OK {
|
||||
return err
|
||||
}
|
||||
ts.Config.ExitGroup(gid)
|
||||
if i < len(gids)-1 {
|
||||
time.Sleep(INTERGRPDELAY * time.Millisecond)
|
||||
}
|
||||
}
|
||||
return rpc.OK
|
||||
}
|
||||
|
||||
func (ts *Test) checkLogs(gids []tester.Tgid) {
|
||||
for _, gid := range gids {
|
||||
n := ts.Group(gid).LogSize()
|
||||
s := ts.Group(gid).SnapshotSize()
|
||||
if ts.maxraftstate >= 0 && n > 8*ts.maxraftstate {
|
||||
ts.t.Fatalf("persister.RaftStateSize() %v, but maxraftstate %v",
|
||||
n, ts.maxraftstate)
|
||||
}
|
||||
if ts.maxraftstate < 0 && s > 0 {
|
||||
ts.t.Fatalf("maxraftstate is -1, but snapshot is non-empty!")
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
// make sure that the data really is sharded by
|
||||
// shutting down one shard and checking that some
|
||||
// Get()s don't succeed.
|
||||
func (ts *Test) checkShutdownSharding(down, up tester.Tgid, ka []string, va []string) {
|
||||
const NSEC = 2
|
||||
|
||||
ts.Group(down).Shutdown()
|
||||
|
||||
ts.checkLogs([]tester.Tgid{down, up}) // forbid snapshots
|
||||
|
||||
n := len(ka)
|
||||
ch := make(chan string)
|
||||
for xi := 0; xi < n; xi++ {
|
||||
ck1 := ts.MakeClerk()
|
||||
go func(i int) {
|
||||
v, _, _ := ck1.Get(ka[i])
|
||||
if v != va[i] {
|
||||
ch <- fmt.Sprintf("Get(%v): expected:\n%v\nreceived:\n%v", ka[i], va[i], v)
|
||||
} else {
|
||||
ch <- ""
|
||||
}
|
||||
}(xi)
|
||||
}
|
||||
|
||||
// wait a bit, only about half the Gets should succeed.
|
||||
ndone := 0
|
||||
done := false
|
||||
for done == false {
|
||||
select {
|
||||
case err := <-ch:
|
||||
if err != "" {
|
||||
ts.Fatalf(err)
|
||||
}
|
||||
ndone += 1
|
||||
case <-time.After(time.Second * NSEC):
|
||||
done = true
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
// log.Printf("%d completions out of %d with %d groups", ndone, n, ngrp)
|
||||
if ndone >= n {
|
||||
ts.Fatalf("expected less than %d completions with one shard dead\n", n)
|
||||
}
|
||||
|
||||
// bring the crashed shard/group back to life.
|
||||
ts.Group(down).StartServers()
|
||||
}
|
||||
|
||||
// Run one controler and then partitioned it forever after some time
|
||||
// Run another cntrler that must finish the first ctrler's unfinished
|
||||
// shard moves, if there are any.
|
||||
func (ts *Test) partitionCtrler(ck kvtest.IKVClerk, ka, va []string) {
|
||||
const (
|
||||
MSEC = 20
|
||||
RAND = 2000 // maybe measure?
|
||||
)
|
||||
|
||||
ch := make(chan tester.Tgid)
|
||||
|
||||
sck, clnt := ts.makeShardCtrlerClerkClnt()
|
||||
cfg, _, err := ts.ShardCtrler().Query()
|
||||
num := cfg.Num
|
||||
|
||||
go func() {
|
||||
for true {
|
||||
ngid := ts.newGid()
|
||||
//log.Printf("join %d", ngid)
|
||||
//s := time.Now()
|
||||
ch <- ngid
|
||||
err := ts.joinGroups(sck, []tester.Tgid{ngid})
|
||||
if err == rpc.OK {
|
||||
err = ts.leaveGroups(sck, []tester.Tgid{ngid})
|
||||
}
|
||||
//log.Printf("join err %v time %v", err, time.Since(s))
|
||||
if err == shardctrler.ErrDeposed {
|
||||
log.Printf("disposed")
|
||||
return
|
||||
}
|
||||
if err != rpc.OK {
|
||||
ts.t.Fatalf("join/leave err %v", err)
|
||||
}
|
||||
time.Sleep(INTERGRPDELAY * time.Millisecond)
|
||||
}
|
||||
}()
|
||||
|
||||
lastgid := <-ch
|
||||
|
||||
d := time.Duration(rand.Int()%RAND) * time.Millisecond
|
||||
time.Sleep(MSEC*time.Millisecond + d)
|
||||
|
||||
log.Printf("disconnect sck %v", d)
|
||||
|
||||
// partition sck forever
|
||||
clnt.DisconnectAll()
|
||||
|
||||
// force sck to step down
|
||||
sck.Stepdown()
|
||||
|
||||
// wait until sck has no more requests in the network
|
||||
time.Sleep(labrpc.MAXDELAY)
|
||||
|
||||
cfg, _, err = ts.ShardCtrler().Query()
|
||||
if err != rpc.OK {
|
||||
ts.Fatalf("Query err %v", err)
|
||||
}
|
||||
|
||||
recovery := false
|
||||
present := cfg.IsMember(lastgid)
|
||||
join := num == cfg.Num
|
||||
leave := num+1 == cfg.Num
|
||||
if !present && join {
|
||||
recovery = true
|
||||
}
|
||||
if present && leave {
|
||||
recovery = true
|
||||
}
|
||||
|
||||
// start new controler to pick up where sck left off
|
||||
sck0, clnt0 := ts.makeShardCtrlerClerkClnt()
|
||||
if err != rpc.OK {
|
||||
ts.Fatalf("Query err %v", err)
|
||||
}
|
||||
|
||||
cfg, _, err = sck0.Query()
|
||||
if recovery {
|
||||
s := "join"
|
||||
if leave {
|
||||
s = "leave"
|
||||
}
|
||||
//log.Printf("%v in progress", s)
|
||||
present = cfg.IsMember(lastgid)
|
||||
if (join && !present) || (leave && present) {
|
||||
ts.Fatalf("didn't recover %d correctly after %v", lastgid, s)
|
||||
}
|
||||
}
|
||||
|
||||
if present {
|
||||
// cleanup if disconnected after join but before leave
|
||||
ts.leaveGroups(sck0, []tester.Tgid{lastgid})
|
||||
}
|
||||
|
||||
for i := 0; i < len(ka); i++ {
|
||||
ts.CheckGet(ck, ka[i], va[i], rpc.Tversion(1))
|
||||
}
|
||||
|
||||
ts.Config.DeleteClient(clnt)
|
||||
ts.Config.DeleteClient(clnt0)
|
||||
}
|
Loading…
x
Reference in New Issue
Block a user