update
This commit is contained in:
parent
3c969dfb05
commit
3d190166fd
@ -118,8 +118,8 @@ check_lab4() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
check_lab5a() {
|
check_lab5a() {
|
||||||
check_cmd cd src/shardctrler1
|
# check_cmd cd src/shardctrler1
|
||||||
check_cmd go test -c
|
# check_cmd go test -c
|
||||||
}
|
}
|
||||||
|
|
||||||
check_lab5b() {
|
check_lab5b() {
|
||||||
|
@ -1,51 +0,0 @@
|
|||||||
package shardkv
|
|
||||||
|
|
||||||
//
|
|
||||||
// client code to talk to a sharded key/value service.
|
|
||||||
//
|
|
||||||
// the client uses the shardctrler to query for the current
|
|
||||||
// configuration and find the assignment of shards (keys) to groups,
|
|
||||||
// and then talks to the group that holds the key's shard.
|
|
||||||
//
|
|
||||||
|
|
||||||
import (
|
|
||||||
|
|
||||||
"6.5840/kvsrv1/rpc"
|
|
||||||
"6.5840/kvtest1"
|
|
||||||
"6.5840/shardkv1/shardctrler"
|
|
||||||
"6.5840/tester1"
|
|
||||||
)
|
|
||||||
|
|
||||||
type Clerk struct {
|
|
||||||
clnt *tester.Clnt
|
|
||||||
sck *shardctrler.ShardCtrler
|
|
||||||
// You will have to modify this struct.
|
|
||||||
}
|
|
||||||
|
|
||||||
// The tester calls MakeClerk and passes in a shardctrler so that
|
|
||||||
// client can call it's Query method
|
|
||||||
func MakeClerk(clnt *tester.Clnt, sck *shardctrler.ShardCtrler) kvtest.IKVClerk {
|
|
||||||
ck := &Clerk{
|
|
||||||
clnt: clnt,
|
|
||||||
sck: sck,
|
|
||||||
}
|
|
||||||
// You'll have to add code here.
|
|
||||||
return ck
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
// Get a key from a shardgrp. You can use shardcfg.Key2Shard(key) to
|
|
||||||
// find the shard responsible for the key and ck.sck.Query() to read
|
|
||||||
// the current configuration and lookup the servers in the group
|
|
||||||
// responsible for key. You can make a clerk for that group by
|
|
||||||
// calling shardgrp.MakeClerk(ck.clnt, servers).
|
|
||||||
func (ck *Clerk) Get(key string) (string, rpc.Tversion, rpc.Err) {
|
|
||||||
// You will have to modify this function.
|
|
||||||
return "", 0, ""
|
|
||||||
}
|
|
||||||
|
|
||||||
// Put a key to a shard group.
|
|
||||||
func (ck *Clerk) Put(key string, value string, version rpc.Tversion) rpc.Err {
|
|
||||||
// You will have to modify this function.
|
|
||||||
return ""
|
|
||||||
}
|
|
@ -1,56 +0,0 @@
|
|||||||
package kvsrv
|
|
||||||
|
|
||||||
import (
|
|
||||||
"6.5840/kvsrv1/rpc"
|
|
||||||
"6.5840/kvtest1"
|
|
||||||
"6.5840/tester1"
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
type Clerk struct {
|
|
||||||
clnt *tester.Clnt
|
|
||||||
server string
|
|
||||||
}
|
|
||||||
|
|
||||||
func MakeClerk(clnt *tester.Clnt, server string) kvtest.IKVClerk {
|
|
||||||
ck := &Clerk{clnt: clnt, server: server}
|
|
||||||
// You may add code here.
|
|
||||||
return ck
|
|
||||||
}
|
|
||||||
|
|
||||||
// Get fetches the current value and version for a key. It returns
|
|
||||||
// ErrNoKey if the key does not exist. It keeps trying forever in the
|
|
||||||
// face of all other errors.
|
|
||||||
//
|
|
||||||
// You can send an RPC with code like this:
|
|
||||||
// ok := ck.clnt.Call(ck.server, "KVServer.Get", &args, &reply)
|
|
||||||
//
|
|
||||||
// The types of args and reply (including whether they are pointers)
|
|
||||||
// must match the declared types of the RPC handler function's
|
|
||||||
// arguments. Additionally, reply must be passed as a pointer.
|
|
||||||
func (ck *Clerk) Get(key string) (string, rpc.Tversion, rpc.Err) {
|
|
||||||
// You will have to modify this function.
|
|
||||||
return "", 0, rpc.ErrNoKey
|
|
||||||
}
|
|
||||||
|
|
||||||
// Put updates key with value only if the version in the
|
|
||||||
// request matches the version of the key at the server. If the
|
|
||||||
// versions numbers don't match, the server should return
|
|
||||||
// ErrVersion. If Put receives an ErrVersion on its first RPC, Put
|
|
||||||
// should return ErrVersion, since the Put was definitely not
|
|
||||||
// performed at the server. If the server returns ErrVersion on a
|
|
||||||
// resend RPC, then Put must return ErrMaybe to the application, since
|
|
||||||
// its earlier RPC might have been processed by the server successfully
|
|
||||||
// but the response was lost, and the the Clerk doesn't know if
|
|
||||||
// the Put was performed or not.
|
|
||||||
//
|
|
||||||
// You can send an RPC with code like this:
|
|
||||||
// ok := ck.clnt.Call(ck.server, "KVServer.Put", &args, &reply)
|
|
||||||
//
|
|
||||||
// The types of args and reply (including whether they are pointers)
|
|
||||||
// must match the declared types of the RPC handler function's
|
|
||||||
// arguments. Additionally, reply must be passed as a pointer.
|
|
||||||
func (ck *Clerk) Put(key, value string, version rpc.Tversion) rpc.Err {
|
|
||||||
// You will have to modify this function.
|
|
||||||
return rpc.ErrNoKey
|
|
||||||
}
|
|
@ -1,162 +0,0 @@
|
|||||||
package kvsrv
|
|
||||||
|
|
||||||
import (
|
|
||||||
// "log"
|
|
||||||
"runtime"
|
|
||||||
"testing"
|
|
||||||
"time"
|
|
||||||
|
|
||||||
"6.5840/kvsrv1/rpc"
|
|
||||||
"6.5840/kvtest1"
|
|
||||||
)
|
|
||||||
|
|
||||||
// Test Put with a single client and a reliable network
|
|
||||||
func TestReliablePut(t *testing.T) {
|
|
||||||
const Val = "6.5840"
|
|
||||||
const Ver = 0
|
|
||||||
|
|
||||||
ts := MakeTestKV(t, true)
|
|
||||||
defer ts.Cleanup()
|
|
||||||
|
|
||||||
ts.Begin("One client and reliable Put")
|
|
||||||
|
|
||||||
ck := ts.MakeClerk()
|
|
||||||
if err := ck.Put("k", Val, Ver); err != rpc.OK {
|
|
||||||
t.Fatalf("Put err %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
if val, ver, err := ck.Get("k"); err != rpc.OK {
|
|
||||||
t.Fatalf("Get err %v; expected OK", err)
|
|
||||||
} else if val != Val {
|
|
||||||
t.Fatalf("Get value err %v; expected %v", val, Val)
|
|
||||||
} else if ver != Ver+1 {
|
|
||||||
t.Fatalf("Get wrong version %v; expected %v", ver, Ver+1)
|
|
||||||
}
|
|
||||||
|
|
||||||
if err := ck.Put("k", Val, 0); err != rpc.ErrVersion {
|
|
||||||
t.Fatalf("expected Put to fail with ErrVersion; got err=%v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
if err := ck.Put("y", Val, rpc.Tversion(1)); err != rpc.ErrNoKey {
|
|
||||||
t.Fatalf("expected Put to fail with ErrNoKey; got err=%v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
if _, _, err := ck.Get("y"); err != rpc.ErrNoKey {
|
|
||||||
t.Fatalf("expected Get to fail with ErrNoKey; got err=%v", err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Many clients putting on same key.
|
|
||||||
func TestPutConcurrentReliable(t *testing.T) {
|
|
||||||
const (
|
|
||||||
PORCUPINETIME = 10 * time.Second
|
|
||||||
NCLNT = 10
|
|
||||||
NSEC = 1
|
|
||||||
)
|
|
||||||
|
|
||||||
ts := MakeTestKV(t, true)
|
|
||||||
defer ts.Cleanup()
|
|
||||||
|
|
||||||
ts.Begin("Test: many clients racing to put values to the same key")
|
|
||||||
|
|
||||||
rs := ts.SpawnClientsAndWait(NCLNT, NSEC*time.Second, func(me int, ck kvtest.IKVClerk, done chan struct{}) kvtest.ClntRes {
|
|
||||||
return ts.OneClientPut(me, ck, []string{"k"}, done)
|
|
||||||
})
|
|
||||||
ck := ts.MakeClerk()
|
|
||||||
ts.CheckPutConcurrent(ck, "k", rs, &kvtest.ClntRes{})
|
|
||||||
ts.CheckPorcupineT(PORCUPINETIME)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Check if memory used on server is reasonable
|
|
||||||
func TestMemPutManyClientsReliable(t *testing.T) {
|
|
||||||
const (
|
|
||||||
NCLIENT = 100_000
|
|
||||||
MEM = 1000
|
|
||||||
)
|
|
||||||
|
|
||||||
ts := MakeTestKV(t, true)
|
|
||||||
defer ts.Cleanup()
|
|
||||||
|
|
||||||
v := kvtest.RandValue(MEM)
|
|
||||||
|
|
||||||
cks := make([]kvtest.IKVClerk, NCLIENT)
|
|
||||||
for i, _ := range cks {
|
|
||||||
cks[i] = ts.MakeClerk()
|
|
||||||
}
|
|
||||||
|
|
||||||
// force allocation of ends for server in each client
|
|
||||||
for i := 0; i < NCLIENT; i++ {
|
|
||||||
if err := cks[i].Put("k", "", 1); err != rpc.ErrNoKey {
|
|
||||||
t.Fatalf("Put failed %v", err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
ts.Begin("Test: memory use many put clients")
|
|
||||||
|
|
||||||
// allow threads started by labrpc to start
|
|
||||||
time.Sleep(1 * time.Second)
|
|
||||||
|
|
||||||
runtime.GC()
|
|
||||||
runtime.GC()
|
|
||||||
|
|
||||||
var st runtime.MemStats
|
|
||||||
runtime.ReadMemStats(&st)
|
|
||||||
m0 := st.HeapAlloc
|
|
||||||
|
|
||||||
for i := 0; i < NCLIENT; i++ {
|
|
||||||
if err := cks[i].Put("k", v, rpc.Tversion(i)); err != rpc.OK {
|
|
||||||
t.Fatalf("Put failed %v", err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
runtime.GC()
|
|
||||||
time.Sleep(1 * time.Second)
|
|
||||||
runtime.GC()
|
|
||||||
|
|
||||||
runtime.ReadMemStats(&st)
|
|
||||||
m1 := st.HeapAlloc
|
|
||||||
f := (float64(m1) - float64(m0)) / NCLIENT
|
|
||||||
if m1 > m0+(NCLIENT*200) {
|
|
||||||
t.Fatalf("error: server using too much memory %d %d (%.2f per client)\n", m0, m1, f)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Test with one client and unreliable network. If Clerk.Put returns
|
|
||||||
// ErrMaybe, the Put must have happened, since the test uses only one
|
|
||||||
// client.
|
|
||||||
func TestUnreliableNet(t *testing.T) {
|
|
||||||
const NTRY = 100
|
|
||||||
|
|
||||||
ts := MakeTestKV(t, false)
|
|
||||||
defer ts.Cleanup()
|
|
||||||
|
|
||||||
ts.Begin("One client")
|
|
||||||
|
|
||||||
ck := ts.MakeClerk()
|
|
||||||
|
|
||||||
retried := false
|
|
||||||
for try := 0; try < NTRY; try++ {
|
|
||||||
for i := 0; true; i++ {
|
|
||||||
if err := ts.PutJson(ck, "k", i, rpc.Tversion(try), 0); err != rpc.ErrMaybe {
|
|
||||||
if i > 0 && err != rpc.ErrVersion {
|
|
||||||
t.Fatalf("Put shouldn't have happen more than once %v", err)
|
|
||||||
}
|
|
||||||
break
|
|
||||||
}
|
|
||||||
// Try put again; it should fail with ErrVersion
|
|
||||||
retried = true
|
|
||||||
}
|
|
||||||
v := 0
|
|
||||||
if ver := ts.GetJson(ck, "k", 0, &v); ver != rpc.Tversion(try+1) {
|
|
||||||
t.Fatalf("Wrong version %d expect %d", ver, try+1)
|
|
||||||
}
|
|
||||||
if v != 0 {
|
|
||||||
t.Fatalf("Wrong value %d expect %d", v, 0)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if !retried {
|
|
||||||
t.Fatalf("Clerk.Put never returned ErrMaybe")
|
|
||||||
}
|
|
||||||
|
|
||||||
ts.CheckPorcupine()
|
|
||||||
}
|
|
@ -1,31 +0,0 @@
|
|||||||
package lock
|
|
||||||
|
|
||||||
import (
|
|
||||||
|
|
||||||
"6.5840/kvsrv1/rpc"
|
|
||||||
"6.5840/shardkv1/kvsrv1"
|
|
||||||
"6.5840/shardkv1/shardctrler/param"
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
type Lock struct {
|
|
||||||
ck *kvsrv.Clerk
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
// Use l as the key to store the "lock state" (you would have to decide
|
|
||||||
// precisely what the lock state is).
|
|
||||||
func MakeLock(ck kvtest.IKVClerk, l string) *Lock {
|
|
||||||
lk := &Lock{ck: ck.(*kvsrv.Clerk)}
|
|
||||||
// You may add code here
|
|
||||||
return lk
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
func (lk *Lock) Acquire() {
|
|
||||||
// You may add code here.
|
|
||||||
}
|
|
||||||
|
|
||||||
func (lk *Lock) Release() {
|
|
||||||
// You may add code here.
|
|
||||||
}
|
|
@ -1,89 +0,0 @@
|
|||||||
package lock
|
|
||||||
|
|
||||||
import (
|
|
||||||
"fmt"
|
|
||||||
// "log"
|
|
||||||
"strconv"
|
|
||||||
"testing"
|
|
||||||
"time"
|
|
||||||
|
|
||||||
"6.5840/kvsrv1"
|
|
||||||
"6.5840/kvsrv1/rpc"
|
|
||||||
"6.5840/kvtest1"
|
|
||||||
)
|
|
||||||
|
|
||||||
const (
|
|
||||||
NACQUIRE = 10
|
|
||||||
NCLNT = 10
|
|
||||||
NSEC = 2
|
|
||||||
)
|
|
||||||
|
|
||||||
func oneClient(t *testing.T, me int, ck kvtest.IKVClerk, done chan struct{}) kvtest.ClntRes {
|
|
||||||
lk := MakeLock(ck, "l")
|
|
||||||
ck.Put("l0", "", 0)
|
|
||||||
for i := 1; true; i++ {
|
|
||||||
select {
|
|
||||||
case <-done:
|
|
||||||
return kvtest.ClntRes{i, 0}
|
|
||||||
default:
|
|
||||||
lk.Acquire()
|
|
||||||
|
|
||||||
// log.Printf("%d: acquired lock", me)
|
|
||||||
|
|
||||||
b := strconv.Itoa(me)
|
|
||||||
val, ver, err := ck.Get("l0")
|
|
||||||
if err == rpc.OK {
|
|
||||||
if val != "" {
|
|
||||||
t.Fatalf("%d: two clients acquired lock %v", me, val)
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
t.Fatalf("%d: get failed %v", me, err)
|
|
||||||
}
|
|
||||||
|
|
||||||
err = ck.Put("l0", string(b), ver)
|
|
||||||
if !(err == rpc.OK || err == rpc.ErrMaybe) {
|
|
||||||
t.Fatalf("%d: put failed %v", me, err)
|
|
||||||
}
|
|
||||||
|
|
||||||
time.Sleep(10 * time.Millisecond)
|
|
||||||
|
|
||||||
err = ck.Put("l0", "", ver+1)
|
|
||||||
if !(err == rpc.OK || err == rpc.ErrMaybe) {
|
|
||||||
t.Fatalf("%d: put failed %v", me, err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// log.Printf("%d: release lock", me)
|
|
||||||
|
|
||||||
lk.Release()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return kvtest.ClntRes{}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Run test clients
|
|
||||||
func runClients(t *testing.T, nclnt int, reliable bool) {
|
|
||||||
ts := kvsrv.MakeTestKV(t, reliable)
|
|
||||||
defer ts.Cleanup()
|
|
||||||
|
|
||||||
ts.Begin(fmt.Sprintf("Test: %d lock clients", nclnt))
|
|
||||||
|
|
||||||
ts.SpawnClientsAndWait(nclnt, NSEC*time.Second, func(me int, myck kvtest.IKVClerk, done chan struct{}) kvtest.ClntRes {
|
|
||||||
return oneClient(t, me, myck, done)
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestOneClientReliable(t *testing.T) {
|
|
||||||
runClients(t, 1, true)
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestManyClientsReliable(t *testing.T) {
|
|
||||||
runClients(t, NCLNT, true)
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestOneClientUnreliable(t *testing.T) {
|
|
||||||
runClients(t, 1, false)
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestManyClientsUnreliable(t *testing.T) {
|
|
||||||
runClients(t, NCLNT, false)
|
|
||||||
}
|
|
@ -1,48 +0,0 @@
|
|||||||
package kvsrv
|
|
||||||
|
|
||||||
import (
|
|
||||||
"sync"
|
|
||||||
|
|
||||||
|
|
||||||
"6.5840/kvsrv1/rpc"
|
|
||||||
"6.5840/labrpc"
|
|
||||||
"6.5840/tester1"
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
type KVServer struct {
|
|
||||||
mu sync.Mutex
|
|
||||||
|
|
||||||
// Your definitions here.
|
|
||||||
}
|
|
||||||
|
|
||||||
func MakeKVServer() *KVServer {
|
|
||||||
kv := &KVServer{}
|
|
||||||
// Your code here.
|
|
||||||
return kv
|
|
||||||
}
|
|
||||||
|
|
||||||
// Get returns the value and version for args.Key, if args.Key
|
|
||||||
// exists. Otherwise, Get returns ErrNoKey.
|
|
||||||
func (kv *KVServer) Get(args *rpc.GetArgs, reply *rpc.GetReply) {
|
|
||||||
// Your code here.
|
|
||||||
}
|
|
||||||
|
|
||||||
// Update the value for a key if args.Version matches the version of
|
|
||||||
// the key on the server. If versions don't match, return ErrVersion.
|
|
||||||
// If the key doesn't exist, Put installs the value if the
|
|
||||||
// args.Version is 0, and returns ErrNoKey otherwise.
|
|
||||||
func (kv *KVServer) Put(args *rpc.PutArgs, reply *rpc.PutReply) {
|
|
||||||
// Your code here.
|
|
||||||
}
|
|
||||||
|
|
||||||
// You can ignore Kill() for this lab
|
|
||||||
func (kv *KVServer) Kill() {
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
// You can ignore all arguments; they are for replicated KVservers
|
|
||||||
func StartKVServer(ends []*labrpc.ClientEnd, gid tester.Tgid, srv int, persister *tester.Persister) []tester.IService {
|
|
||||||
kv := MakeKVServer()
|
|
||||||
return []tester.IService{kv}
|
|
||||||
}
|
|
@ -1,36 +0,0 @@
|
|||||||
package kvsrv
|
|
||||||
|
|
||||||
import (
|
|
||||||
// "log"
|
|
||||||
"testing"
|
|
||||||
|
|
||||||
"6.5840/kvtest1"
|
|
||||||
"6.5840/tester1"
|
|
||||||
)
|
|
||||||
|
|
||||||
type TestKV struct {
|
|
||||||
*kvtest.Test
|
|
||||||
t *testing.T
|
|
||||||
reliable bool
|
|
||||||
}
|
|
||||||
|
|
||||||
func MakeTestKV(t *testing.T, reliable bool) *TestKV {
|
|
||||||
cfg := tester.MakeConfig(t, 1, reliable, StartKVServer)
|
|
||||||
ts := &TestKV{
|
|
||||||
t: t,
|
|
||||||
reliable: reliable,
|
|
||||||
}
|
|
||||||
ts.Test = kvtest.MakeTest(t, cfg, false, ts)
|
|
||||||
return ts
|
|
||||||
}
|
|
||||||
|
|
||||||
func (ts *TestKV) MakeClerk() kvtest.IKVClerk {
|
|
||||||
clnt := ts.Config.MakeClient()
|
|
||||||
ck := MakeClerk(clnt, tester.ServerName(tester.GRP0, 0))
|
|
||||||
return &kvtest.TestClerk{ck, clnt}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (ts *TestKV) DeleteClerk(ck kvtest.IKVClerk) {
|
|
||||||
tck := ck.(*kvtest.TestClerk)
|
|
||||||
ts.DeleteClient(tck.Clnt)
|
|
||||||
}
|
|
@ -1,284 +0,0 @@
|
|||||||
package shardcfg
|
|
||||||
|
|
||||||
import (
|
|
||||||
"encoding/json"
|
|
||||||
"hash/fnv"
|
|
||||||
"log"
|
|
||||||
"runtime/debug"
|
|
||||||
"slices"
|
|
||||||
"testing"
|
|
||||||
|
|
||||||
"6.5840/tester1"
|
|
||||||
)
|
|
||||||
|
|
||||||
type Tshid int
|
|
||||||
type Tnum int
|
|
||||||
|
|
||||||
const (
|
|
||||||
NShards = 12 // The number of shards.
|
|
||||||
NumFirst = Tnum(1)
|
|
||||||
)
|
|
||||||
|
|
||||||
const (
|
|
||||||
Gid1 = tester.Tgid(1)
|
|
||||||
)
|
|
||||||
|
|
||||||
// which shard is a key in?
|
|
||||||
// please use this function,
|
|
||||||
// and please do not change it.
|
|
||||||
func Key2Shard(key string) Tshid {
|
|
||||||
h := fnv.New32a()
|
|
||||||
h.Write([]byte(key))
|
|
||||||
shard := Tshid(Tshid(h.Sum32()) % NShards)
|
|
||||||
return shard
|
|
||||||
}
|
|
||||||
|
|
||||||
// A configuration -- an assignment of shards to groups.
|
|
||||||
// Please don't change this.
|
|
||||||
type ShardConfig struct {
|
|
||||||
Num Tnum // config number
|
|
||||||
Shards [NShards]tester.Tgid // shard -> gid
|
|
||||||
Groups map[tester.Tgid][]string // gid -> servers[]
|
|
||||||
}
|
|
||||||
|
|
||||||
func MakeShardConfig() *ShardConfig {
|
|
||||||
c := &ShardConfig{
|
|
||||||
Groups: make(map[tester.Tgid][]string),
|
|
||||||
}
|
|
||||||
return c
|
|
||||||
}
|
|
||||||
|
|
||||||
func (cfg *ShardConfig) String() string {
|
|
||||||
b, err := json.Marshal(cfg)
|
|
||||||
if err != nil {
|
|
||||||
log.Fatalf("Unmarshall err %v", err)
|
|
||||||
}
|
|
||||||
return string(b)
|
|
||||||
}
|
|
||||||
|
|
||||||
func FromString(s string) *ShardConfig {
|
|
||||||
scfg := &ShardConfig{}
|
|
||||||
if err := json.Unmarshal([]byte(s), scfg); err != nil {
|
|
||||||
log.Fatalf("Unmarshall err %v", err)
|
|
||||||
}
|
|
||||||
return scfg
|
|
||||||
}
|
|
||||||
|
|
||||||
func (cfg *ShardConfig) Copy() *ShardConfig {
|
|
||||||
c := MakeShardConfig()
|
|
||||||
c.Num = cfg.Num
|
|
||||||
c.Shards = cfg.Shards
|
|
||||||
for k, srvs := range cfg.Groups {
|
|
||||||
s := make([]string, len(srvs))
|
|
||||||
copy(s, srvs)
|
|
||||||
c.Groups[k] = s
|
|
||||||
}
|
|
||||||
return c
|
|
||||||
}
|
|
||||||
|
|
||||||
// mostgroup, mostn, leastgroup, leastn
|
|
||||||
func analyze(c *ShardConfig) (tester.Tgid, int, tester.Tgid, int) {
|
|
||||||
counts := map[tester.Tgid]int{}
|
|
||||||
for _, g := range c.Shards {
|
|
||||||
counts[g] += 1
|
|
||||||
}
|
|
||||||
|
|
||||||
mn := -1
|
|
||||||
var mg tester.Tgid = -1
|
|
||||||
ln := 257
|
|
||||||
var lg tester.Tgid = -1
|
|
||||||
// Enforce deterministic ordering, map iteration
|
|
||||||
// is randomized in go
|
|
||||||
groups := make([]tester.Tgid, len(c.Groups))
|
|
||||||
i := 0
|
|
||||||
for k := range c.Groups {
|
|
||||||
groups[i] = k
|
|
||||||
i++
|
|
||||||
}
|
|
||||||
slices.Sort(groups)
|
|
||||||
for _, g := range groups {
|
|
||||||
if counts[g] < ln {
|
|
||||||
ln = counts[g]
|
|
||||||
lg = g
|
|
||||||
}
|
|
||||||
if counts[g] > mn {
|
|
||||||
mn = counts[g]
|
|
||||||
mg = g
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return mg, mn, lg, ln
|
|
||||||
}
|
|
||||||
|
|
||||||
// return GID of group with least number of
|
|
||||||
// assigned shards.
|
|
||||||
func least(c *ShardConfig) tester.Tgid {
|
|
||||||
_, _, lg, _ := analyze(c)
|
|
||||||
return lg
|
|
||||||
}
|
|
||||||
|
|
||||||
// balance assignment of shards to groups.
|
|
||||||
// modifies c.
|
|
||||||
func (c *ShardConfig) Rebalance() {
|
|
||||||
// if no groups, un-assign all shards
|
|
||||||
if len(c.Groups) < 1 {
|
|
||||||
for s, _ := range c.Shards {
|
|
||||||
c.Shards[s] = 0
|
|
||||||
}
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
// assign all unassigned shards
|
|
||||||
for s, g := range c.Shards {
|
|
||||||
_, ok := c.Groups[g]
|
|
||||||
if ok == false {
|
|
||||||
lg := least(c)
|
|
||||||
c.Shards[s] = lg
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// move shards from most to least heavily loaded
|
|
||||||
for {
|
|
||||||
mg, mn, lg, ln := analyze(c)
|
|
||||||
if mn < ln+2 {
|
|
||||||
break
|
|
||||||
}
|
|
||||||
// move 1 shard from mg to lg
|
|
||||||
for s, g := range c.Shards {
|
|
||||||
if g == mg {
|
|
||||||
c.Shards[s] = lg
|
|
||||||
break
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (cfg *ShardConfig) Join(servers map[tester.Tgid][]string) bool {
|
|
||||||
changed := false
|
|
||||||
for gid, servers := range servers {
|
|
||||||
_, ok := cfg.Groups[gid]
|
|
||||||
if ok {
|
|
||||||
log.Printf("re-Join %v", gid)
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
for xgid, xservers := range cfg.Groups {
|
|
||||||
for _, s1 := range xservers {
|
|
||||||
for _, s2 := range servers {
|
|
||||||
if s1 == s2 {
|
|
||||||
log.Fatalf("Join(%v) puts server %v in groups %v and %v", gid, s1, xgid, gid)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
// new GID
|
|
||||||
// modify cfg to reflect the Join()
|
|
||||||
cfg.Groups[gid] = servers
|
|
||||||
changed = true
|
|
||||||
}
|
|
||||||
if changed == false {
|
|
||||||
log.Fatalf("Join but no change")
|
|
||||||
}
|
|
||||||
cfg.Num += 1
|
|
||||||
return true
|
|
||||||
}
|
|
||||||
|
|
||||||
func (cfg *ShardConfig) Leave(gids []tester.Tgid) bool {
|
|
||||||
changed := false
|
|
||||||
for _, gid := range gids {
|
|
||||||
_, ok := cfg.Groups[gid]
|
|
||||||
if ok == false {
|
|
||||||
// already no GID!
|
|
||||||
log.Printf("Leave(%v) but not in config", gid)
|
|
||||||
return false
|
|
||||||
} else {
|
|
||||||
// modify op.Config to reflect the Leave()
|
|
||||||
delete(cfg.Groups, gid)
|
|
||||||
changed = true
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if changed == false {
|
|
||||||
debug.PrintStack()
|
|
||||||
log.Fatalf("Leave but no change")
|
|
||||||
}
|
|
||||||
cfg.Num += 1
|
|
||||||
return true
|
|
||||||
}
|
|
||||||
|
|
||||||
func (cfg *ShardConfig) JoinBalance(servers map[tester.Tgid][]string) bool {
|
|
||||||
if !cfg.Join(servers) {
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
cfg.Rebalance()
|
|
||||||
return true
|
|
||||||
}
|
|
||||||
|
|
||||||
func (cfg *ShardConfig) LeaveBalance(gids []tester.Tgid) bool {
|
|
||||||
if !cfg.Leave(gids) {
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
cfg.Rebalance()
|
|
||||||
return true
|
|
||||||
}
|
|
||||||
|
|
||||||
func (cfg *ShardConfig) GidServers(sh Tshid) (tester.Tgid, []string, bool) {
|
|
||||||
gid := cfg.Shards[sh]
|
|
||||||
srvs, ok := cfg.Groups[gid]
|
|
||||||
return gid, srvs, ok
|
|
||||||
}
|
|
||||||
|
|
||||||
func (cfg *ShardConfig) IsMember(gid tester.Tgid) bool {
|
|
||||||
for _, g := range cfg.Shards {
|
|
||||||
if g == gid {
|
|
||||||
return true
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
|
|
||||||
func (cfg *ShardConfig) CheckConfig(t *testing.T, groups []tester.Tgid) {
|
|
||||||
if len(cfg.Groups) != len(groups) {
|
|
||||||
fatalf(t, "wanted %v groups, got %v", len(groups), len(cfg.Groups))
|
|
||||||
}
|
|
||||||
|
|
||||||
// are the groups as expected?
|
|
||||||
for _, g := range groups {
|
|
||||||
_, ok := cfg.Groups[g]
|
|
||||||
if ok != true {
|
|
||||||
fatalf(t, "missing group %v", g)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// any un-allocated shards?
|
|
||||||
if len(groups) > 0 {
|
|
||||||
for s, g := range cfg.Shards {
|
|
||||||
_, ok := cfg.Groups[g]
|
|
||||||
if ok == false {
|
|
||||||
fatalf(t, "shard %v -> invalid group %v", s, g)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// more or less balanced sharding?
|
|
||||||
counts := map[tester.Tgid]int{}
|
|
||||||
for _, g := range cfg.Shards {
|
|
||||||
counts[g] += 1
|
|
||||||
}
|
|
||||||
min := 257
|
|
||||||
max := 0
|
|
||||||
for g, _ := range cfg.Groups {
|
|
||||||
if counts[g] > max {
|
|
||||||
max = counts[g]
|
|
||||||
}
|
|
||||||
if counts[g] < min {
|
|
||||||
min = counts[g]
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if max > min+1 {
|
|
||||||
fatalf(t, "max %v too much larger than min %v", max, min)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func fatalf(t *testing.T, format string, args ...any) {
|
|
||||||
debug.PrintStack()
|
|
||||||
t.Fatalf(format, args...)
|
|
||||||
}
|
|
@ -1,62 +0,0 @@
|
|||||||
package shardcfg
|
|
||||||
|
|
||||||
import (
|
|
||||||
"testing"
|
|
||||||
|
|
||||||
"6.5840/tester1"
|
|
||||||
)
|
|
||||||
|
|
||||||
func check_same_config(t *testing.T, c1 ShardConfig, c2 ShardConfig) {
|
|
||||||
if c1.Num != c2.Num {
|
|
||||||
t.Fatalf("Num wrong")
|
|
||||||
}
|
|
||||||
if c1.Shards != c2.Shards {
|
|
||||||
t.Fatalf("Shards wrong")
|
|
||||||
}
|
|
||||||
if len(c1.Groups) != len(c2.Groups) {
|
|
||||||
t.Fatalf("number of Groups is wrong")
|
|
||||||
}
|
|
||||||
for gid, sa := range c1.Groups {
|
|
||||||
sa1, ok := c2.Groups[gid]
|
|
||||||
if ok == false || len(sa1) != len(sa) {
|
|
||||||
t.Fatalf("len(Groups) wrong")
|
|
||||||
}
|
|
||||||
if ok && len(sa1) == len(sa) {
|
|
||||||
for j := 0; j < len(sa); j++ {
|
|
||||||
if sa[j] != sa1[j] {
|
|
||||||
t.Fatalf("Groups wrong")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestBasic(t *testing.T) {
|
|
||||||
const (
|
|
||||||
Gid1 = 1
|
|
||||||
Gid2 = 2
|
|
||||||
)
|
|
||||||
cfg := MakeShardConfig()
|
|
||||||
cfg.CheckConfig(t, []tester.Tgid{})
|
|
||||||
|
|
||||||
cfg.JoinBalance(map[tester.Tgid][]string{Gid1: []string{"x", "y", "z"}})
|
|
||||||
cfg.CheckConfig(t, []tester.Tgid{Gid1})
|
|
||||||
|
|
||||||
cfg.JoinBalance(map[tester.Tgid][]string{Gid2: []string{"a", "b", "c"}})
|
|
||||||
cfg.CheckConfig(t, []tester.Tgid{Gid1, Gid2})
|
|
||||||
|
|
||||||
sa1 := cfg.Groups[Gid1]
|
|
||||||
if len(sa1) != 3 || sa1[0] != "x" || sa1[1] != "y" || sa1[2] != "z" {
|
|
||||||
t.Fatalf("wrong servers for gid %v: %v\n", Gid1, sa1)
|
|
||||||
}
|
|
||||||
sa2 := cfg.Groups[Gid2]
|
|
||||||
if len(sa2) != 3 || sa2[0] != "a" || sa2[1] != "b" || sa2[2] != "c" {
|
|
||||||
t.Fatalf("wrong servers for gid %v: %v\n", Gid2, sa2)
|
|
||||||
}
|
|
||||||
|
|
||||||
cfg.LeaveBalance([]tester.Tgid{Gid1})
|
|
||||||
cfg.CheckConfig(t, []tester.Tgid{Gid2})
|
|
||||||
|
|
||||||
cfg.LeaveBalance([]tester.Tgid{Gid2})
|
|
||||||
cfg.CheckConfig(t, []tester.Tgid{})
|
|
||||||
}
|
|
@ -1,7 +0,0 @@
|
|||||||
package param
|
|
||||||
|
|
||||||
const (
|
|
||||||
// Length of time that a lease is valid. If a client doesn't
|
|
||||||
// refresh the lease within this time, the lease will expire.
|
|
||||||
LEASETIMESEC = 3
|
|
||||||
)
|
|
@ -1,87 +0,0 @@
|
|||||||
package shardctrler
|
|
||||||
|
|
||||||
//
|
|
||||||
// Shardctrler with InitConfig, Query, and ChangeConfigTo methods
|
|
||||||
//
|
|
||||||
|
|
||||||
import (
|
|
||||||
|
|
||||||
"sync/atomic"
|
|
||||||
|
|
||||||
"6.5840/kvsrv1/rpc"
|
|
||||||
"6.5840/kvtest1"
|
|
||||||
"6.5840/shardkv1/kvsrv1"
|
|
||||||
"6.5840/shardkv1/shardcfg"
|
|
||||||
"6.5840/tester1"
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
// ShardCtrler for the controller and kv clerk.
|
|
||||||
type ShardCtrler struct {
|
|
||||||
clnt *tester.Clnt
|
|
||||||
kvtest.IKVClerk
|
|
||||||
|
|
||||||
killed int32 // set by Kill()
|
|
||||||
leases bool
|
|
||||||
|
|
||||||
// Your data here.
|
|
||||||
}
|
|
||||||
|
|
||||||
// Make a ShardCltler, which stores its state in a kvsrv.
|
|
||||||
func MakeShardCtrler(clnt *tester.Clnt, leases bool) *ShardCtrler {
|
|
||||||
sck := &ShardCtrler{clnt: clnt, leases: leases}
|
|
||||||
srv := tester.ServerName(tester.GRP0, 0)
|
|
||||||
sck.IKVClerk = kvsrv.MakeClerk(clnt, srv)
|
|
||||||
// Your code here.
|
|
||||||
return sck
|
|
||||||
}
|
|
||||||
|
|
||||||
// The tester calls InitController() before starting a new
|
|
||||||
// controller. In this method you can implement recovery (part B) and
|
|
||||||
// use a lock to become leader (part C). InitController may fail when
|
|
||||||
// another controller supersedes (e.g., when this controller is
|
|
||||||
// partitioned during recovery).
|
|
||||||
func (sck *ShardCtrler) InitController() rpc.Err {
|
|
||||||
return rpc.ErrNoKey
|
|
||||||
}
|
|
||||||
|
|
||||||
// The tester calls ExitController to exit a controller. In part B and
|
|
||||||
// C, release lock.
|
|
||||||
func (sck *ShardCtrler) ExitController() {
|
|
||||||
}
|
|
||||||
|
|
||||||
// Called once by the tester to supply the first configuration. You
|
|
||||||
// can marshal ShardConfig into a string using shardcfg.String(), and
|
|
||||||
// then Put it in the kvsrv for the controller at version 0. You can
|
|
||||||
// pick the key to name the configuration.
|
|
||||||
func (sck *ShardCtrler) InitConfig(cfg *shardcfg.ShardConfig) {
|
|
||||||
// Your code here
|
|
||||||
}
|
|
||||||
|
|
||||||
// Called by the tester to ask the controller to change the
|
|
||||||
// configuration from the current one to new. It may return an error
|
|
||||||
// if this controller is disconnected for a while and another
|
|
||||||
// controller takes over in the mean time, as in part C.
|
|
||||||
func (sck *ShardCtrler) ChangeConfigTo(new *shardcfg.ShardConfig) rpc.Err {
|
|
||||||
return rpc.OK
|
|
||||||
}
|
|
||||||
|
|
||||||
// Tester "kills" shardctrler by calling Kill(). For your
|
|
||||||
// convenience, we also supply isKilled() method to test killed in
|
|
||||||
// loops.
|
|
||||||
func (sck *ShardCtrler) Kill() {
|
|
||||||
atomic.StoreInt32(&sck.killed, 1)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (sck *ShardCtrler) isKilled() bool {
|
|
||||||
z := atomic.LoadInt32(&sck.killed)
|
|
||||||
return z == 1
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
// Return the current configuration
|
|
||||||
func (sck *ShardCtrler) Query() (*shardcfg.ShardConfig, rpc.Tversion) {
|
|
||||||
// Your code here.
|
|
||||||
return nil, 0
|
|
||||||
}
|
|
||||||
|
|
@ -1,42 +0,0 @@
|
|||||||
package shardgrp
|
|
||||||
|
|
||||||
import (
|
|
||||||
|
|
||||||
|
|
||||||
"6.5840/kvsrv1/rpc"
|
|
||||||
"6.5840/shardkv1/shardcfg"
|
|
||||||
"6.5840/tester1"
|
|
||||||
)
|
|
||||||
|
|
||||||
type Clerk struct {
|
|
||||||
clnt *tester.Clnt
|
|
||||||
servers []string
|
|
||||||
leader int // last successful leader (index into servers[])
|
|
||||||
}
|
|
||||||
|
|
||||||
func MakeClerk(clnt *tester.Clnt, servers []string) *Clerk {
|
|
||||||
ck := &Clerk{clnt: clnt, servers: servers}
|
|
||||||
return ck
|
|
||||||
}
|
|
||||||
|
|
||||||
func (ck *Clerk) Get(key string, n shardcfg.Tnum) (string, rpc.Tversion, rpc.Err) {
|
|
||||||
// Your code here
|
|
||||||
return "", 0, ""
|
|
||||||
}
|
|
||||||
|
|
||||||
func (ck *Clerk) Put(key string, value string, version rpc.Tversion, n shardcfg.Tnum) (bool, rpc.Err) {
|
|
||||||
// Your code here
|
|
||||||
return false, ""
|
|
||||||
}
|
|
||||||
|
|
||||||
func (ck *Clerk) Freeze(s shardcfg.Tshid, num shardcfg.Tnum) ([]byte, rpc.Err) {
|
|
||||||
return nil, ""
|
|
||||||
}
|
|
||||||
|
|
||||||
func (ck *Clerk) InstallShard(s shardcfg.Tshid, state []byte, num shardcfg.Tnum) rpc.Err {
|
|
||||||
return ""
|
|
||||||
}
|
|
||||||
|
|
||||||
func (ck *Clerk) Delete(s shardcfg.Tshid, num shardcfg.Tnum) rpc.Err {
|
|
||||||
return ""
|
|
||||||
}
|
|
@ -1,102 +0,0 @@
|
|||||||
package shardgrp
|
|
||||||
|
|
||||||
import (
|
|
||||||
"sync/atomic"
|
|
||||||
|
|
||||||
|
|
||||||
"6.5840/kvraft1/rsm"
|
|
||||||
"6.5840/kvsrv1/rpc"
|
|
||||||
"6.5840/labgob"
|
|
||||||
"6.5840/labrpc"
|
|
||||||
"6.5840/shardkv1/shardgrp/shardrpc"
|
|
||||||
"6.5840/tester1"
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
type KVServer struct {
|
|
||||||
gid tester.Tgid
|
|
||||||
me int
|
|
||||||
dead int32 // set by Kill()
|
|
||||||
rsm *rsm.RSM
|
|
||||||
frozen bool // for testing purposes
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
func (kv *KVServer) DoOp(req any) any {
|
|
||||||
// Your code here
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
func (kv *KVServer) Snapshot() []byte {
|
|
||||||
// Your code here
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (kv *KVServer) Restore(data []byte) {
|
|
||||||
// Your code here
|
|
||||||
}
|
|
||||||
|
|
||||||
func (kv *KVServer) Get(args *shardrpc.GetArgs, reply *rpc.GetReply) {
|
|
||||||
// Your code here
|
|
||||||
}
|
|
||||||
|
|
||||||
func (kv *KVServer) Put(args *shardrpc.PutArgs, reply *rpc.PutReply) {
|
|
||||||
// Your code here
|
|
||||||
}
|
|
||||||
|
|
||||||
// Freeze the specified shard (i.e., reject future Get/Puts for this
|
|
||||||
// shard) and return the key/values stored in that shard.
|
|
||||||
func (kv *KVServer) Freeze(args *shardrpc.FreezeArgs, reply *shardrpc.FreezeReply) {
|
|
||||||
// Your code here
|
|
||||||
}
|
|
||||||
|
|
||||||
// Install the supplied state for the specified shard.
|
|
||||||
func (kv *KVServer) InstallShard(args *shardrpc.InstallShardArgs, reply *shardrpc.InstallShardReply) {
|
|
||||||
// Your code here
|
|
||||||
}
|
|
||||||
|
|
||||||
// Delete the specified shard.
|
|
||||||
func (kv *KVServer) Delete(args *shardrpc.DeleteShardArgs, reply *shardrpc.DeleteShardReply) {
|
|
||||||
// Your code here
|
|
||||||
}
|
|
||||||
|
|
||||||
// the tester calls Kill() when a KVServer instance won't
|
|
||||||
// be needed again. for your convenience, we supply
|
|
||||||
// code to set rf.dead (without needing a lock),
|
|
||||||
// and a killed() method to test rf.dead in
|
|
||||||
// long-running loops. you can also add your own
|
|
||||||
// code to Kill(). you're not required to do anything
|
|
||||||
// about this, but it may be convenient (for example)
|
|
||||||
// to suppress debug output from a Kill()ed instance.
|
|
||||||
func (kv *KVServer) Kill() {
|
|
||||||
atomic.StoreInt32(&kv.dead, 1)
|
|
||||||
// Your code here, if desired.
|
|
||||||
}
|
|
||||||
|
|
||||||
func (kv *KVServer) killed() bool {
|
|
||||||
z := atomic.LoadInt32(&kv.dead)
|
|
||||||
return z == 1
|
|
||||||
}
|
|
||||||
|
|
||||||
// StartShardServerGrp starts a server for shardgrp `gid`.
|
|
||||||
//
|
|
||||||
// StartShardServerGrp() and MakeRSM() must return quickly, so they should
|
|
||||||
// start goroutines for any long-running work.
|
|
||||||
func StartServerShardGrp(servers []*labrpc.ClientEnd, gid tester.Tgid, me int, persister *tester.Persister, maxraftstate int) []tester.IService {
|
|
||||||
// call labgob.Register on structures you want
|
|
||||||
// Go's RPC library to marshall/unmarshall.
|
|
||||||
labgob.Register(shardrpc.PutArgs{})
|
|
||||||
labgob.Register(shardrpc.GetArgs{})
|
|
||||||
labgob.Register(shardrpc.FreezeArgs{})
|
|
||||||
labgob.Register(shardrpc.InstallShardArgs{})
|
|
||||||
labgob.Register(shardrpc.DeleteShardArgs{})
|
|
||||||
labgob.Register(rsm.Op{})
|
|
||||||
|
|
||||||
kv := &KVServer{gid: gid, me: me}
|
|
||||||
kv.rsm = rsm.MakeRSM(servers, me, persister, maxraftstate, kv)
|
|
||||||
|
|
||||||
// Your code here
|
|
||||||
return []tester.IService{kv, kv.rsm.Raft()}
|
|
||||||
}
|
|
@ -1,50 +0,0 @@
|
|||||||
package shardrpc
|
|
||||||
|
|
||||||
import (
|
|
||||||
"6.5840/kvsrv1/rpc"
|
|
||||||
"6.5840/shardkv1/shardcfg"
|
|
||||||
)
|
|
||||||
|
|
||||||
// Same as Put in kvsrv1/rpc, but with a configuration number
|
|
||||||
type PutArgs struct {
|
|
||||||
Key string
|
|
||||||
Value string
|
|
||||||
Version rpc.Tversion
|
|
||||||
Num shardcfg.Tnum
|
|
||||||
}
|
|
||||||
|
|
||||||
// Same as Get in kvsrv1/rpc, but with a configuration number.
|
|
||||||
type GetArgs struct {
|
|
||||||
Key string
|
|
||||||
Num shardcfg.Tnum
|
|
||||||
}
|
|
||||||
|
|
||||||
type FreezeArgs struct {
|
|
||||||
Shard shardcfg.Tshid
|
|
||||||
Num shardcfg.Tnum
|
|
||||||
}
|
|
||||||
|
|
||||||
type FreezeReply struct {
|
|
||||||
State []byte
|
|
||||||
Num shardcfg.Tnum
|
|
||||||
Err rpc.Err
|
|
||||||
}
|
|
||||||
|
|
||||||
type InstallShardArgs struct {
|
|
||||||
Shard shardcfg.Tshid
|
|
||||||
State []byte
|
|
||||||
Num shardcfg.Tnum
|
|
||||||
}
|
|
||||||
|
|
||||||
type InstallShardReply struct {
|
|
||||||
Err rpc.Err
|
|
||||||
}
|
|
||||||
|
|
||||||
type DeleteShardArgs struct {
|
|
||||||
Shard shardcfg.Tshid
|
|
||||||
Num shardcfg.Tnum
|
|
||||||
}
|
|
||||||
|
|
||||||
type DeleteShardReply struct {
|
|
||||||
Err rpc.Err
|
|
||||||
}
|
|
@ -1,818 +0,0 @@
|
|||||||
package shardkv
|
|
||||||
|
|
||||||
import (
|
|
||||||
"log"
|
|
||||||
"testing"
|
|
||||||
"time"
|
|
||||||
|
|
||||||
"6.5840/kvsrv1/rpc"
|
|
||||||
"6.5840/kvtest1"
|
|
||||||
"6.5840/shardkv1/shardcfg"
|
|
||||||
"6.5840/shardkv1/shardctrler"
|
|
||||||
"6.5840/shardkv1/shardctrler/param"
|
|
||||||
"6.5840/tester1"
|
|
||||||
)
|
|
||||||
|
|
||||||
const (
|
|
||||||
NGRP = 8
|
|
||||||
NKEYS = 5 * shardcfg.NShards
|
|
||||||
)
|
|
||||||
|
|
||||||
// Test shard controller's Init and Query with a key/value server from
|
|
||||||
// kvsrv1 lab.
|
|
||||||
func TestInitQuery5A(t *testing.T) {
|
|
||||||
|
|
||||||
// MakeTest starts a key/value server using `kvsrv.StartKVServer`,
|
|
||||||
// which is defined in shardkv1/kvsrv1.
|
|
||||||
ts := MakeTest(t, "Test (5A): Init and Query ...", true)
|
|
||||||
defer ts.Cleanup()
|
|
||||||
|
|
||||||
// Make a shard controller
|
|
||||||
sck := shardctrler.MakeShardCtrler(ts.Config.MakeClient(), ts.leases)
|
|
||||||
|
|
||||||
// Make an empty shard configuration
|
|
||||||
scfg := shardcfg.MakeShardConfig()
|
|
||||||
|
|
||||||
// Compute a new shard configuration as if `shardcfg.Gid1` joins the cluster,
|
|
||||||
// assigning all shards to `shardcfg.Gid1`.
|
|
||||||
scfg.JoinBalance(map[tester.Tgid][]string{shardcfg.Gid1: []string{"xxx"}})
|
|
||||||
|
|
||||||
// Invoke the controller to initialize to store the first configuration
|
|
||||||
sck.InitConfig(scfg)
|
|
||||||
|
|
||||||
// Read the initial configuration and check it
|
|
||||||
cfg, v := sck.Query()
|
|
||||||
if v != 1 || cfg.Num != 1 || cfg.Shards[0] != shardcfg.Gid1 {
|
|
||||||
ts.t.Fatalf("Static wrong %v %v", cfg, v)
|
|
||||||
}
|
|
||||||
cfg.CheckConfig(t, []tester.Tgid{shardcfg.Gid1})
|
|
||||||
}
|
|
||||||
|
|
||||||
// Test shardkv clerk's Get/Put with 1 shardgrp (without reconfiguration)
|
|
||||||
func TestStaticOneShardGroup5A(t *testing.T) {
|
|
||||||
ts := MakeTest(t, "Test (5A): one shard group ...", true)
|
|
||||||
defer ts.Cleanup()
|
|
||||||
|
|
||||||
// The tester's setupKVService() sets up a kvsrv for the
|
|
||||||
// controller to store configurations and calls the controller's
|
|
||||||
// Init() method to create the first configuration with 1
|
|
||||||
// shardgrp.
|
|
||||||
ts.setupKVService()
|
|
||||||
|
|
||||||
ck := ts.MakeClerk() // make a shardkv clerk
|
|
||||||
ka, va := ts.SpreadPuts(ck, NKEYS) // do some puts
|
|
||||||
n := len(ka)
|
|
||||||
for i := 0; i < n; i++ {
|
|
||||||
ts.CheckGet(ck, ka[i], va[i], rpc.Tversion(1)) // check the puts
|
|
||||||
}
|
|
||||||
|
|
||||||
// disconnect raft leader of shardgrp and check that keys are
|
|
||||||
// still avaialable
|
|
||||||
ts.disconnectClntFromLeader(ck.(*kvtest.TestClerk).Clnt, shardcfg.Gid1)
|
|
||||||
|
|
||||||
for i := 0; i < n; i++ {
|
|
||||||
ts.CheckGet(ck, ka[i], va[i], rpc.Tversion(1)) // check the puts
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// test shardctrler's join, which adds a new group Gid2 and must move
|
|
||||||
// shards to the new group and the old group should reject Get/Puts on
|
|
||||||
// shards that moved.
|
|
||||||
func TestJoinBasic5A(t *testing.T) {
|
|
||||||
ts := MakeTest(t, "Test (5A): a group joins...", true)
|
|
||||||
defer ts.Cleanup()
|
|
||||||
|
|
||||||
gid1 := ts.setupKVService()
|
|
||||||
ck := ts.MakeClerk()
|
|
||||||
ka, va := ts.SpreadPuts(ck, NKEYS)
|
|
||||||
|
|
||||||
sck := ts.ShardCtrler()
|
|
||||||
cfg, _ := sck.Query()
|
|
||||||
|
|
||||||
gid2 := ts.newGid()
|
|
||||||
err := ts.joinGroups(sck, []tester.Tgid{gid2})
|
|
||||||
if err != rpc.OK {
|
|
||||||
ts.t.Fatalf("joinGroups: err %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
cfg1, _ := sck.Query()
|
|
||||||
if cfg.Num+1 != cfg1.Num {
|
|
||||||
ts.t.Fatalf("wrong num %d expected %d ", cfg1.Num, cfg.Num+1)
|
|
||||||
}
|
|
||||||
|
|
||||||
if !cfg1.IsMember(gid2) {
|
|
||||||
ts.t.Fatalf("%d isn't a member of %v", gid2, cfg1)
|
|
||||||
}
|
|
||||||
|
|
||||||
// check shards at shardcfg.Gid2
|
|
||||||
ts.checkShutdownSharding(gid1, gid2, ka, va)
|
|
||||||
|
|
||||||
for i := 0; i < len(ka); i++ {
|
|
||||||
ts.CheckGet(ck, ka[i], va[i], rpc.Tversion(1))
|
|
||||||
}
|
|
||||||
|
|
||||||
// check shards at shardcfg.Gid1
|
|
||||||
ts.checkShutdownSharding(gid2, gid1, ka, va)
|
|
||||||
|
|
||||||
for i := 0; i < len(ka); i++ {
|
|
||||||
ts.CheckGet(ck, ka[i], va[i], rpc.Tversion(1))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// test shardgrps delete moved shards
|
|
||||||
func TestDeleteBasic5A(t *testing.T) {
|
|
||||||
const (
|
|
||||||
MAXRAFTSTATE = 1000
|
|
||||||
VALUESIZE = 10000
|
|
||||||
)
|
|
||||||
|
|
||||||
ts := MakeTestMaxRaft(t, "Test (5A): delete ...", true, false, VALUESIZE)
|
|
||||||
defer ts.Cleanup()
|
|
||||||
|
|
||||||
gid1 := ts.setupKVService()
|
|
||||||
ck := ts.MakeClerk()
|
|
||||||
|
|
||||||
ka, va := ts.SpreadPutsSize(ck, NKEYS, MAXRAFTSTATE)
|
|
||||||
|
|
||||||
sz := ts.Group(gid1).SnapshotSize()
|
|
||||||
|
|
||||||
sck := ts.ShardCtrler()
|
|
||||||
gid2 := ts.newGid()
|
|
||||||
err := ts.joinGroups(sck, []tester.Tgid{gid2})
|
|
||||||
if err != rpc.OK {
|
|
||||||
ts.t.Fatalf("joinGroups: err %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// push more Get's through so that all peers snapshot
|
|
||||||
for j := 0; j < 5; j++ {
|
|
||||||
for i := 0; i < len(ka); i++ {
|
|
||||||
ts.CheckGet(ck, ka[i], va[i], rpc.Tversion(1))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
sz1 := ts.Group(gid1).SnapshotSize()
|
|
||||||
sz2 := ts.Group(gid2).SnapshotSize()
|
|
||||||
if sz1+sz2 > sz+10000 {
|
|
||||||
ts.t.Fatalf("gid1 %d + gid2 %d = %d use too much space %d", sz1, sz2, sz1+sz2, sz)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// test shardctrler's leave
|
|
||||||
func TestJoinLeaveBasic5A(t *testing.T) {
|
|
||||||
ts := MakeTest(t, "Test (5A): basic groups join/leave ...", true)
|
|
||||||
defer ts.Cleanup()
|
|
||||||
|
|
||||||
gid1 := ts.setupKVService()
|
|
||||||
ck := ts.MakeClerk()
|
|
||||||
ka, va := ts.SpreadPuts(ck, NKEYS)
|
|
||||||
|
|
||||||
sck := ts.ShardCtrler()
|
|
||||||
gid2 := ts.newGid()
|
|
||||||
err := ts.joinGroups(sck, []tester.Tgid{gid2})
|
|
||||||
if err != rpc.OK {
|
|
||||||
ts.t.Fatalf("joinGroups: err %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// check shards at shardcfg.Gid2
|
|
||||||
ts.checkShutdownSharding(gid1, gid2, ka, va)
|
|
||||||
|
|
||||||
for i := 0; i < len(ka); i++ {
|
|
||||||
ts.CheckGet(ck, ka[i], va[i], rpc.Tversion(1))
|
|
||||||
}
|
|
||||||
|
|
||||||
err = ts.leave(sck, shardcfg.Gid1)
|
|
||||||
if err != rpc.OK {
|
|
||||||
ts.t.Fatalf("Leave: err %v", err)
|
|
||||||
}
|
|
||||||
cfg, _ := sck.Query()
|
|
||||||
if cfg.IsMember(shardcfg.Gid1) {
|
|
||||||
ts.t.Fatalf("%d is a member of %v", shardcfg.Gid1, cfg)
|
|
||||||
}
|
|
||||||
|
|
||||||
ts.Group(shardcfg.Gid1).Shutdown()
|
|
||||||
|
|
||||||
for i := 0; i < len(ka); i++ {
|
|
||||||
ts.CheckGet(ck, ka[i], va[i], rpc.Tversion(1))
|
|
||||||
}
|
|
||||||
|
|
||||||
// bring the crashed shard/group back to life.
|
|
||||||
ts.Group(shardcfg.Gid1).StartServers()
|
|
||||||
|
|
||||||
// Rejoin
|
|
||||||
ts.join(sck, shardcfg.Gid1, ts.Group(shardcfg.Gid1).SrvNames())
|
|
||||||
|
|
||||||
for i := 0; i < len(ka); i++ {
|
|
||||||
ts.CheckGet(ck, ka[i], va[i], rpc.Tversion(1))
|
|
||||||
}
|
|
||||||
|
|
||||||
// check shards at shardcfg.Gid2
|
|
||||||
ts.checkShutdownSharding(gid2, gid1, ka, va)
|
|
||||||
}
|
|
||||||
|
|
||||||
// test many groups joining and leaving, reliable or unreliable
|
|
||||||
func joinLeave5A(t *testing.T, reliable bool, part string) {
|
|
||||||
ts := MakeTest(t, "Test (5A): many groups join/leave ...", reliable)
|
|
||||||
defer ts.Cleanup()
|
|
||||||
|
|
||||||
ts.setupKVService()
|
|
||||||
ck := ts.MakeClerk()
|
|
||||||
ka, va := ts.SpreadPuts(ck, NKEYS)
|
|
||||||
|
|
||||||
sck := ts.ShardCtrler()
|
|
||||||
grps := ts.groups(NGRP)
|
|
||||||
|
|
||||||
ts.joinGroups(sck, grps)
|
|
||||||
|
|
||||||
ts.checkShutdownSharding(grps[0], grps[1], ka, va)
|
|
||||||
|
|
||||||
for i := 0; i < len(ka); i++ {
|
|
||||||
ts.CheckGet(ck, ka[i], va[i], rpc.Tversion(1))
|
|
||||||
}
|
|
||||||
|
|
||||||
ts.leaveGroups(sck, grps)
|
|
||||||
|
|
||||||
for i := 0; i < len(ka); i++ {
|
|
||||||
ts.CheckGet(ck, ka[i], va[i], rpc.Tversion(1))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestManyJoinLeaveReliable5A(t *testing.T) {
|
|
||||||
joinLeave5A(t, true, "Test (5A): many groups join/leave reliable...")
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestManyJoinLeaveUnreliable5A(t *testing.T) {
|
|
||||||
joinLeave5A(t, false, "Test (5A): many groups join/leave unreliable...")
|
|
||||||
}
|
|
||||||
|
|
||||||
// Test recovery from complete shutdown
|
|
||||||
func TestShutdown5A(t *testing.T) {
|
|
||||||
const NJOIN = 2
|
|
||||||
const NGRP = 2 + NJOIN
|
|
||||||
|
|
||||||
ts := MakeTest(t, "Test (5A): shutdown ...", true)
|
|
||||||
defer ts.Cleanup()
|
|
||||||
|
|
||||||
ts.setupKVService()
|
|
||||||
|
|
||||||
ck := ts.MakeClerk()
|
|
||||||
ka, va := ts.SpreadPuts(ck, NKEYS)
|
|
||||||
|
|
||||||
sck := ts.ShardCtrler()
|
|
||||||
grps := ts.groups(NJOIN)
|
|
||||||
ts.joinGroups(sck, grps)
|
|
||||||
|
|
||||||
ts.checkShutdownSharding(grps[0], grps[1], ka, va)
|
|
||||||
|
|
||||||
for i := 0; i < len(ka); i++ {
|
|
||||||
ts.CheckGet(ck, ka[i], va[i], rpc.Tversion(1))
|
|
||||||
}
|
|
||||||
|
|
||||||
for i := shardcfg.Gid1; i < NGRP; i++ {
|
|
||||||
ts.Group(i).Shutdown()
|
|
||||||
}
|
|
||||||
|
|
||||||
for i := shardcfg.Gid1; i < NGRP; i++ {
|
|
||||||
ts.Group(i).StartServers()
|
|
||||||
}
|
|
||||||
|
|
||||||
for i := 0; i < len(ka); i++ {
|
|
||||||
ts.CheckGet(ck, ka[i], va[i], rpc.Tversion(1))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Test that Gets for keys at groups that are alive
|
|
||||||
// return
|
|
||||||
func TestProgressShutdown(t *testing.T) {
|
|
||||||
const (
|
|
||||||
NJOIN = 4
|
|
||||||
NSEC = 2
|
|
||||||
)
|
|
||||||
|
|
||||||
ts := MakeTest(t, "Test (5A): progress ...", true)
|
|
||||||
defer ts.Cleanup()
|
|
||||||
|
|
||||||
ts.setupKVService()
|
|
||||||
|
|
||||||
ck := ts.MakeClerk()
|
|
||||||
ka, va := ts.SpreadPuts(ck, NKEYS)
|
|
||||||
|
|
||||||
sck := ts.ShardCtrler()
|
|
||||||
grps := ts.groups(NJOIN)
|
|
||||||
ts.joinGroups(sck, grps)
|
|
||||||
|
|
||||||
end := 2
|
|
||||||
for _, g := range grps[0:2] {
|
|
||||||
//log.Printf("shutdown %d", g)
|
|
||||||
ts.Group(g).Shutdown()
|
|
||||||
}
|
|
||||||
|
|
||||||
alive := make(map[tester.Tgid]bool)
|
|
||||||
for _, g := range grps[end:] {
|
|
||||||
alive[g] = true
|
|
||||||
}
|
|
||||||
|
|
||||||
cfg, _ := sck.Query()
|
|
||||||
|
|
||||||
ch := make(chan rpc.Err)
|
|
||||||
go func() {
|
|
||||||
for i := 0; i < len(ka); i++ {
|
|
||||||
s := shardcfg.Key2Shard(ka[i])
|
|
||||||
g := cfg.Shards[s]
|
|
||||||
if _, ok := alive[g]; ok {
|
|
||||||
//log.Printf("key lookup %v(%d) gid %d", ka[i], s, g)
|
|
||||||
ts.CheckGet(ck, ka[i], va[i], rpc.Tversion(1))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
ch <- rpc.OK
|
|
||||||
}()
|
|
||||||
|
|
||||||
select {
|
|
||||||
case <-ch:
|
|
||||||
case <-time.After(NSEC * time.Second):
|
|
||||||
ts.Fatalf("Gets didn't finish")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Test that Gets from a non-moving shard return quickly
|
|
||||||
func TestProgressJoin(t *testing.T) {
|
|
||||||
const (
|
|
||||||
NJOIN = 4
|
|
||||||
NSEC = 4
|
|
||||||
NCNT = 100
|
|
||||||
)
|
|
||||||
|
|
||||||
ts := MakeTest(t, "Test (5A): progress ...", true)
|
|
||||||
defer ts.Cleanup()
|
|
||||||
|
|
||||||
ts.setupKVService()
|
|
||||||
|
|
||||||
ck := ts.MakeClerk()
|
|
||||||
ka, va := ts.SpreadPuts(ck, NKEYS)
|
|
||||||
|
|
||||||
sck := ts.ShardCtrler()
|
|
||||||
grps := ts.groups(NJOIN)
|
|
||||||
ts.joinGroups(sck, grps)
|
|
||||||
|
|
||||||
cfg, _ := sck.Query()
|
|
||||||
newcfg := cfg.Copy()
|
|
||||||
newgid := tester.Tgid(NJOIN + 3)
|
|
||||||
if ok := newcfg.JoinBalance(map[tester.Tgid][]string{newgid: []string{"xxx"}}); !ok {
|
|
||||||
t.Fatalf("JoinBalance failed")
|
|
||||||
}
|
|
||||||
newcfg1 := newcfg.Copy()
|
|
||||||
if ok := newcfg1.LeaveBalance([]tester.Tgid{newgid}); !ok {
|
|
||||||
t.Fatalf("JoinBalance failed")
|
|
||||||
}
|
|
||||||
|
|
||||||
// compute which shards don't move and which groups are involved
|
|
||||||
// in moving shards
|
|
||||||
stable := make(map[shardcfg.Tshid]bool)
|
|
||||||
participating := make(map[tester.Tgid]bool)
|
|
||||||
for i, g := range newcfg1.Shards {
|
|
||||||
if newcfg.Shards[i] == g {
|
|
||||||
stable[shardcfg.Tshid(i)] = true
|
|
||||||
} else {
|
|
||||||
participating[g] = true
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
//log.Printf("groups participating %v stable %v", participating, stable)
|
|
||||||
//log.Printf("\ncfg %v\n %v\n %v", cfg.Shards, newcfg.Shards, newcfg1.Shards)
|
|
||||||
|
|
||||||
ch0 := make(chan rpc.Err)
|
|
||||||
go func() {
|
|
||||||
for true {
|
|
||||||
select {
|
|
||||||
case <-ch0:
|
|
||||||
return
|
|
||||||
default:
|
|
||||||
//log.Printf("join/leave %v", newgid)
|
|
||||||
if err := ts.joinGroups(sck, []tester.Tgid{newgid}); err != rpc.OK {
|
|
||||||
t.Fatalf("joined err %v", err)
|
|
||||||
}
|
|
||||||
if err := ts.leaveGroups(sck, []tester.Tgid{newgid}); err != rpc.OK {
|
|
||||||
t.Fatalf("leave err %v", err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
|
|
||||||
ch1 := make(chan int)
|
|
||||||
go func() {
|
|
||||||
// get the keys that are on groups that are involved in the
|
|
||||||
// join but not in the shards that are moving
|
|
||||||
t := time.Now().Add(NSEC * time.Second)
|
|
||||||
nget := 0
|
|
||||||
for time.Now().Before(t) {
|
|
||||||
for i := 0; i < len(ka); i++ {
|
|
||||||
s := shardcfg.Key2Shard(ka[i])
|
|
||||||
if _, ok := stable[s]; ok {
|
|
||||||
g := newcfg1.Shards[s]
|
|
||||||
if _, ok := participating[g]; ok {
|
|
||||||
// log.Printf("key lookup %v(%d) gid %d", ka[i], s, g)
|
|
||||||
ts.CheckGet(ck, ka[i], va[i], rpc.Tversion(1))
|
|
||||||
nget++
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
ch1 <- nget
|
|
||||||
}()
|
|
||||||
|
|
||||||
select {
|
|
||||||
case cnt := <-ch1:
|
|
||||||
log.Printf("cnt %d", cnt)
|
|
||||||
if cnt < NCNT {
|
|
||||||
ts.Fatalf("Two few gets finished %d; expected more than %d", cnt, NCNT)
|
|
||||||
}
|
|
||||||
|
|
||||||
case <-time.After(2 * NSEC * time.Second):
|
|
||||||
ts.Fatalf("Gets didn't finish")
|
|
||||||
}
|
|
||||||
ch0 <- rpc.OK
|
|
||||||
}
|
|
||||||
|
|
||||||
// Test linearizability with groups joining/leaving and `nclnt`
|
|
||||||
// concurrent clerks put/get's in `unreliable` net.
|
|
||||||
func concurrentClerk(t *testing.T, nclnt int, reliable bool, part string) {
|
|
||||||
const (
|
|
||||||
NSEC = 20
|
|
||||||
)
|
|
||||||
|
|
||||||
ts := MakeTest(t, part, reliable)
|
|
||||||
defer ts.Cleanup()
|
|
||||||
|
|
||||||
ts.setupKVService()
|
|
||||||
|
|
||||||
ka := kvtest.MakeKeys(NKEYS)
|
|
||||||
ch := make(chan []kvtest.ClntRes)
|
|
||||||
|
|
||||||
go func(ch chan []kvtest.ClntRes) {
|
|
||||||
rs := ts.SpawnClientsAndWait(nclnt, NSEC*time.Second, func(me int, ck kvtest.IKVClerk, done chan struct{}) kvtest.ClntRes {
|
|
||||||
return ts.OneClientPut(me, ck, ka, done)
|
|
||||||
})
|
|
||||||
ch <- rs
|
|
||||||
}(ch)
|
|
||||||
|
|
||||||
sck := ts.ShardCtrler()
|
|
||||||
grps := ts.groups(NGRP)
|
|
||||||
if err := ts.joinGroups(sck, grps); err != rpc.OK {
|
|
||||||
t.Fatalf("joinGroups err %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
if err := ts.leaveGroups(sck, grps); err != rpc.OK {
|
|
||||||
t.Fatalf("leaveGroups err %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
<-ch
|
|
||||||
|
|
||||||
ts.CheckPorcupine()
|
|
||||||
}
|
|
||||||
|
|
||||||
// Test linearizability with groups joining/leaving and 1 concurrent clerks put/get's
|
|
||||||
func TestOneConcurrentClerkReliable5A(t *testing.T) {
|
|
||||||
concurrentClerk(t, 1, true, "Test (5A): one concurrent clerk reliable...")
|
|
||||||
}
|
|
||||||
|
|
||||||
// Test linearizability with groups joining/leaving and many concurrent clerks put/get's
|
|
||||||
func TestManyConcurrentClerkReliable5A(t *testing.T) {
|
|
||||||
const NCLNT = 10
|
|
||||||
concurrentClerk(t, NCLNT, true, "Test (5A): many concurrent clerks reliable...")
|
|
||||||
}
|
|
||||||
|
|
||||||
// Test linearizability with groups joining/leaving and 1 concurrent clerks put/get's
|
|
||||||
func TestOneConcurrentClerkUnreliable5A(t *testing.T) {
|
|
||||||
concurrentClerk(t, 1, false, "Test (5A): one concurrent clerk unreliable ...")
|
|
||||||
}
|
|
||||||
|
|
||||||
// Test linearizability with groups joining/leaving and many concurrent clerks put/get's
|
|
||||||
func TestManyConcurrentClerkUnreliable5A(t *testing.T) {
|
|
||||||
const NCLNT = 10
|
|
||||||
concurrentClerk(t, NCLNT, false, "Test (5A): many concurrent clerks unreliable...")
|
|
||||||
}
|
|
||||||
|
|
||||||
// Test if join/leave complete even if shardgrp is down for a while, but
|
|
||||||
// don't complete while the shardgrp is down.
|
|
||||||
func TestJoinLeave5B(t *testing.T) {
|
|
||||||
const NSEC = 2
|
|
||||||
|
|
||||||
ts := MakeTest(t, "Test (5B): Join/leave while a shardgrp is down...", true)
|
|
||||||
defer ts.Cleanup()
|
|
||||||
|
|
||||||
gid1 := ts.setupKVService()
|
|
||||||
ck := ts.MakeClerk()
|
|
||||||
ka, va := ts.SpreadPuts(ck, NKEYS)
|
|
||||||
|
|
||||||
sck := ts.ShardCtrler()
|
|
||||||
cfg, _ := sck.Query()
|
|
||||||
|
|
||||||
ts.Group(gid1).Shutdown()
|
|
||||||
|
|
||||||
gid2 := ts.newGid()
|
|
||||||
ch := make(chan rpc.Err)
|
|
||||||
go func() {
|
|
||||||
err := ts.joinGroups(sck, []tester.Tgid{gid2})
|
|
||||||
ch <- err
|
|
||||||
}()
|
|
||||||
|
|
||||||
select {
|
|
||||||
case err := <-ch:
|
|
||||||
ts.Fatalf("Join finished %v", err)
|
|
||||||
case <-time.After(1 * NSEC):
|
|
||||||
// Give Join some time to try to join
|
|
||||||
}
|
|
||||||
|
|
||||||
// Now join should be able to finish
|
|
||||||
ts.Group(gid1).StartServers()
|
|
||||||
|
|
||||||
select {
|
|
||||||
case err := <-ch:
|
|
||||||
if err != rpc.OK {
|
|
||||||
ts.Fatalf("Join returns err %v", err)
|
|
||||||
}
|
|
||||||
case <-time.After(time.Second * NSEC):
|
|
||||||
ts.Fatalf("Join didn't complete")
|
|
||||||
}
|
|
||||||
|
|
||||||
cfg1, _ := sck.Query()
|
|
||||||
if cfg.Num+1 != cfg1.Num {
|
|
||||||
ts.t.Fatalf("wrong num %d expected %d ", cfg1.Num, cfg.Num+1)
|
|
||||||
}
|
|
||||||
|
|
||||||
ts.Group(gid2).Shutdown()
|
|
||||||
|
|
||||||
ch = make(chan rpc.Err)
|
|
||||||
go func() {
|
|
||||||
err := ts.leave(sck, shardcfg.Gid1)
|
|
||||||
ch <- err
|
|
||||||
}()
|
|
||||||
|
|
||||||
select {
|
|
||||||
case err := <-ch:
|
|
||||||
ts.Fatalf("Leave finished %v", err)
|
|
||||||
case <-time.After(NSEC * time.Second):
|
|
||||||
// Give give some time to try to join
|
|
||||||
}
|
|
||||||
|
|
||||||
// Now leave should be able to finish
|
|
||||||
ts.Group(gid2).StartServers()
|
|
||||||
|
|
||||||
select {
|
|
||||||
case err := <-ch:
|
|
||||||
if err != rpc.OK {
|
|
||||||
ts.Fatalf("Leave returns err %v", err)
|
|
||||||
}
|
|
||||||
case <-time.After(time.Second * NSEC):
|
|
||||||
ts.Fatalf("Leave didn't complete")
|
|
||||||
}
|
|
||||||
for i := 0; i < len(ka); i++ {
|
|
||||||
ts.CheckGet(ck, ka[i], va[i], rpc.Tversion(1))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// test recovery of partitioned controlers
|
|
||||||
func TestRecoverCtrler5B(t *testing.T) {
|
|
||||||
const (
|
|
||||||
NPARTITION = 5
|
|
||||||
)
|
|
||||||
|
|
||||||
ts := MakeTest(t, "Test (5B): recover controler ...", true)
|
|
||||||
defer ts.Cleanup()
|
|
||||||
|
|
||||||
gid := ts.setupKVService()
|
|
||||||
ck := ts.MakeClerk()
|
|
||||||
ka, va := ts.SpreadPuts(ck, NKEYS)
|
|
||||||
|
|
||||||
for i := 0; i < NPARTITION; i++ {
|
|
||||||
ts.killCtrler(ck, gid, ka, va)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Test concurrent ctrlers fighting for leadership reliable
|
|
||||||
func TestAcquireLockConcurrentReliable5C(t *testing.T) {
|
|
||||||
ts := MakeTestLeases(t, "Test (5C): Concurent ctrlers acquiring leadership ...", true)
|
|
||||||
defer ts.Cleanup()
|
|
||||||
ts.setupKVService()
|
|
||||||
ck := ts.MakeClerk()
|
|
||||||
ka, va := ts.SpreadPuts(ck, NKEYS)
|
|
||||||
ts.electCtrler(ck, ka, va)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Test concurrent ctrlers fighting for leadership unreliable
|
|
||||||
func TestAcquireLockConcurrentUnreliable5C(t *testing.T) {
|
|
||||||
ts := MakeTestLeases(t, "Test (5C): Concurent ctrlers acquiring leadership ...", false)
|
|
||||||
defer ts.Cleanup()
|
|
||||||
ts.setupKVService()
|
|
||||||
ck := ts.MakeClerk()
|
|
||||||
ka, va := ts.SpreadPuts(ck, NKEYS)
|
|
||||||
ts.electCtrler(ck, ka, va)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Test that ReleaseLock allows a new leader to start quickly
|
|
||||||
func TestLeaseBasicRelease5C(t *testing.T) {
|
|
||||||
ts := MakeTestLeases(t, "Test (5C): release lease ...", true)
|
|
||||||
defer ts.Cleanup()
|
|
||||||
ts.setupKVService()
|
|
||||||
|
|
||||||
sck0, clnt0 := ts.makeShardCtrlerClnt()
|
|
||||||
go func() {
|
|
||||||
if err := sck0.InitController(); err != rpc.OK {
|
|
||||||
t.Fatalf("failed to init controller %v", err)
|
|
||||||
}
|
|
||||||
time.Sleep(200 * time.Millisecond)
|
|
||||||
sck0.ExitController()
|
|
||||||
}()
|
|
||||||
|
|
||||||
time.Sleep(10 * time.Millisecond)
|
|
||||||
|
|
||||||
// start new controller
|
|
||||||
sck1, clnt1 := ts.makeShardCtrlerClnt()
|
|
||||||
ch := make(chan struct{})
|
|
||||||
go func() {
|
|
||||||
if err := sck1.InitController(); err != rpc.OK {
|
|
||||||
t.Fatalf("failed to init controller %v", err)
|
|
||||||
}
|
|
||||||
time.Sleep(200 * time.Millisecond)
|
|
||||||
sck1.ExitController()
|
|
||||||
ch <- struct{}{}
|
|
||||||
}()
|
|
||||||
|
|
||||||
select {
|
|
||||||
case <-time.After(1 * time.Second):
|
|
||||||
ts.Fatalf("Release didn't give up leadership")
|
|
||||||
case <-ch:
|
|
||||||
}
|
|
||||||
|
|
||||||
ts.Config.DeleteClient(clnt0)
|
|
||||||
ts.Config.DeleteClient(clnt1)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Test lease expiring
|
|
||||||
func TestLeaseBasicExpire5C(t *testing.T) {
|
|
||||||
ts := MakeTestLeases(t, "Test (5C): lease expiring ...", true)
|
|
||||||
defer ts.Cleanup()
|
|
||||||
ts.setupKVService()
|
|
||||||
|
|
||||||
sck0, clnt0 := ts.makeShardCtrlerClnt()
|
|
||||||
go func() {
|
|
||||||
if err := sck0.InitController(); err != rpc.OK {
|
|
||||||
t.Fatalf("failed to init controller %v", err)
|
|
||||||
}
|
|
||||||
for {
|
|
||||||
time.Sleep(10 * time.Millisecond)
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
|
|
||||||
time.Sleep(100 * time.Millisecond)
|
|
||||||
|
|
||||||
// partition sck0 forever
|
|
||||||
clnt0.DisconnectAll()
|
|
||||||
|
|
||||||
// start new controller
|
|
||||||
sck1, clnt1 := ts.makeShardCtrlerClnt()
|
|
||||||
ch := make(chan struct{})
|
|
||||||
go func() {
|
|
||||||
if err := sck1.InitController(); err != rpc.OK {
|
|
||||||
t.Fatalf("failed to init controller %v", err)
|
|
||||||
}
|
|
||||||
time.Sleep(100 * time.Millisecond)
|
|
||||||
sck1.ExitController()
|
|
||||||
ch <- struct{}{}
|
|
||||||
}()
|
|
||||||
|
|
||||||
select {
|
|
||||||
case <-time.After((param.LEASETIMESEC + 1) * time.Second):
|
|
||||||
ts.Fatalf("Lease didn't expire")
|
|
||||||
case <-ch:
|
|
||||||
}
|
|
||||||
|
|
||||||
ts.Config.DeleteClient(clnt0)
|
|
||||||
ts.Config.DeleteClient(clnt1)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Test lease is being extended
|
|
||||||
func TestLeaseBasicRefresh5C(t *testing.T) {
|
|
||||||
const LEADERSEC = 3
|
|
||||||
|
|
||||||
ts := MakeTestLeases(t, "Test (5C): lease refresh ...", true)
|
|
||||||
defer ts.Cleanup()
|
|
||||||
ts.setupKVService()
|
|
||||||
|
|
||||||
sck0, clnt0 := ts.makeShardCtrlerClnt()
|
|
||||||
go func() {
|
|
||||||
if err := sck0.InitController(); err != rpc.OK {
|
|
||||||
t.Fatalf("failed to init controller %v", err)
|
|
||||||
}
|
|
||||||
time.Sleep(LEADERSEC * param.LEASETIMESEC * time.Second)
|
|
||||||
sck0.ExitController()
|
|
||||||
}()
|
|
||||||
|
|
||||||
// give sck0 time to become leader
|
|
||||||
time.Sleep(100 * time.Millisecond)
|
|
||||||
|
|
||||||
// start new controller
|
|
||||||
sck1, clnt1 := ts.makeShardCtrlerClnt()
|
|
||||||
ch := make(chan struct{})
|
|
||||||
go func() {
|
|
||||||
if err := sck1.InitController(); err != rpc.OK {
|
|
||||||
t.Fatalf("failed to init controller %v", err)
|
|
||||||
}
|
|
||||||
time.Sleep(100 * time.Millisecond)
|
|
||||||
sck1.ExitController()
|
|
||||||
ch <- struct{}{}
|
|
||||||
}()
|
|
||||||
|
|
||||||
select {
|
|
||||||
case <-time.After((LEADERSEC + param.LEASETIMESEC + 1) * time.Second):
|
|
||||||
case <-ch:
|
|
||||||
ts.Fatalf("Lease not refreshed")
|
|
||||||
}
|
|
||||||
|
|
||||||
ts.Config.DeleteClient(clnt0)
|
|
||||||
ts.Config.DeleteClient(clnt1)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Test if old leader is fenced off when reconnecting while it is in
|
|
||||||
// the middle of a Join.
|
|
||||||
func TestPartitionControlerJoin5C(t *testing.T) {
|
|
||||||
const (
|
|
||||||
NSLEEP = 2
|
|
||||||
RAND = 1000
|
|
||||||
)
|
|
||||||
|
|
||||||
ts := MakeTestLeases(t, "Test (5C): partition controller in join...", true)
|
|
||||||
defer ts.Cleanup()
|
|
||||||
ts.setupKVService()
|
|
||||||
ck := ts.MakeClerk()
|
|
||||||
ka, va := ts.SpreadPuts(ck, NKEYS)
|
|
||||||
|
|
||||||
sck, clnt := ts.makeShardCtrlerClnt()
|
|
||||||
if err := sck.InitController(); err != rpc.OK {
|
|
||||||
ts.Fatalf("failed to init controller %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
ch := make(chan rpc.Err)
|
|
||||||
ngid := tester.Tgid(0)
|
|
||||||
go func() {
|
|
||||||
ngid = ts.newGid()
|
|
||||||
ts.Config.MakeGroupStart(ngid, NSRV, ts.StartServerShardGrp)
|
|
||||||
ts.Group(ngid).Shutdown()
|
|
||||||
ch <- ts.join(sck, ngid, ts.Group(ngid).SrvNames())
|
|
||||||
}()
|
|
||||||
|
|
||||||
// sleep for a while to get the chance for the controler to get stuck
|
|
||||||
// in join or leave, because gid is down
|
|
||||||
time.Sleep(1 * time.Second)
|
|
||||||
|
|
||||||
// partition sck
|
|
||||||
clnt.DisconnectAll()
|
|
||||||
|
|
||||||
// wait until sck's lease expired before restarting shardgrp `ngid`
|
|
||||||
time.Sleep((param.LEASETIMESEC + 1) * time.Second)
|
|
||||||
|
|
||||||
ts.Group(ngid).StartServers()
|
|
||||||
|
|
||||||
// start new controler to supersede partitioned one,
|
|
||||||
// it will also be stuck
|
|
||||||
sck0 := ts.makeShardCtrler()
|
|
||||||
if err := sck0.InitController(); err != rpc.OK {
|
|
||||||
t.Fatalf("failed to init controller %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
sck0.ExitController()
|
|
||||||
|
|
||||||
//log.Printf("reconnect")
|
|
||||||
|
|
||||||
// reconnect old controller, which shouldn't be able
|
|
||||||
// to do anything
|
|
||||||
clnt.ConnectAll()
|
|
||||||
|
|
||||||
err := <-ch
|
|
||||||
if err == rpc.OK {
|
|
||||||
t.Fatalf("Old leader succeeded %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
time.Sleep(1 * time.Second)
|
|
||||||
|
|
||||||
for i := 0; i < len(ka); i++ {
|
|
||||||
ts.CheckGet(ck, ka[i], va[i], rpc.Tversion(1))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Make a leader controller loses its leadership during join/leave and
|
|
||||||
// test if the next controller recovers correctly.
|
|
||||||
func TestPartitionRecovery5C(t *testing.T) {
|
|
||||||
const (
|
|
||||||
// NPARTITION = 10
|
|
||||||
NPARTITION = 5
|
|
||||||
)
|
|
||||||
|
|
||||||
ts := MakeTestLeases(t, "Test (5C): controllers with leased leadership ...", true)
|
|
||||||
defer ts.Cleanup()
|
|
||||||
gid := ts.setupKVService()
|
|
||||||
ck := ts.MakeClerk()
|
|
||||||
ka, va := ts.SpreadPuts(ck, NKEYS)
|
|
||||||
|
|
||||||
for i := 0; i < NPARTITION; i++ {
|
|
||||||
ts.killCtrler(ck, gid, ka, va)
|
|
||||||
}
|
|
||||||
}
|
|
@ -1,417 +0,0 @@
|
|||||||
package shardkv
|
|
||||||
|
|
||||||
import (
|
|
||||||
"fmt"
|
|
||||||
//"log"
|
|
||||||
"math/rand"
|
|
||||||
"sync"
|
|
||||||
"sync/atomic"
|
|
||||||
"testing"
|
|
||||||
"time"
|
|
||||||
|
|
||||||
"6.5840/kvraft1/rsm"
|
|
||||||
"6.5840/kvsrv1/rpc"
|
|
||||||
"6.5840/kvtest1"
|
|
||||||
"6.5840/labrpc"
|
|
||||||
"6.5840/shardkv1/kvsrv1"
|
|
||||||
"6.5840/shardkv1/shardcfg"
|
|
||||||
"6.5840/shardkv1/shardctrler"
|
|
||||||
"6.5840/shardkv1/shardctrler/param"
|
|
||||||
"6.5840/shardkv1/shardgrp"
|
|
||||||
"6.5840/tester1"
|
|
||||||
)
|
|
||||||
|
|
||||||
type Test struct {
|
|
||||||
t *testing.T
|
|
||||||
*kvtest.Test
|
|
||||||
|
|
||||||
sck *shardctrler.ShardCtrler
|
|
||||||
part string
|
|
||||||
leases bool
|
|
||||||
|
|
||||||
maxraftstate int
|
|
||||||
mu sync.Mutex
|
|
||||||
ngid tester.Tgid
|
|
||||||
}
|
|
||||||
|
|
||||||
const (
|
|
||||||
Controler = tester.Tgid(0) // controler uses group 0 for a kvraft group
|
|
||||||
NSRV = 3 // servers per group
|
|
||||||
INTERGRPDELAY = 200 // time in ms between group changes
|
|
||||||
)
|
|
||||||
|
|
||||||
// Setup kvserver for the shard controller and make the controller
|
|
||||||
func MakeTestMaxRaft(t *testing.T, part string, reliable, leases bool, maxraftstate int) *Test {
|
|
||||||
ts := &Test{
|
|
||||||
ngid: shardcfg.Gid1 + 1, // Gid1 is in use
|
|
||||||
t: t,
|
|
||||||
leases: leases,
|
|
||||||
maxraftstate: maxraftstate,
|
|
||||||
}
|
|
||||||
cfg := tester.MakeConfig(t, 1, reliable, kvsrv.StartKVServer)
|
|
||||||
ts.Test = kvtest.MakeTest(t, cfg, false, ts)
|
|
||||||
ts.Begin(part)
|
|
||||||
return ts
|
|
||||||
}
|
|
||||||
|
|
||||||
func MakeTest(t *testing.T, part string, reliable bool) *Test {
|
|
||||||
return MakeTestMaxRaft(t, part, reliable, false, -1)
|
|
||||||
}
|
|
||||||
|
|
||||||
func MakeTestLeases(t *testing.T, part string, reliable bool) *Test {
|
|
||||||
return MakeTestMaxRaft(t, part, reliable, true, -1)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (ts *Test) MakeClerk() kvtest.IKVClerk {
|
|
||||||
clnt := ts.Config.MakeClient()
|
|
||||||
ck := MakeClerk(clnt, ts.makeShardCtrler())
|
|
||||||
return &kvtest.TestClerk{ck, clnt}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (ts *Test) DeleteClerk(ck kvtest.IKVClerk) {
|
|
||||||
tck := ck.(*kvtest.TestClerk)
|
|
||||||
ts.DeleteClient(tck.Clnt)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (ts *Test) ShardCtrler() *shardctrler.ShardCtrler {
|
|
||||||
return ts.sck
|
|
||||||
}
|
|
||||||
|
|
||||||
func (ts *Test) makeShardCtrler() *shardctrler.ShardCtrler {
|
|
||||||
ck, _ := ts.makeShardCtrlerClnt()
|
|
||||||
return ck
|
|
||||||
}
|
|
||||||
|
|
||||||
func (ts *Test) makeShardCtrlerClnt() (*shardctrler.ShardCtrler, *tester.Clnt) {
|
|
||||||
clnt := ts.Config.MakeClient()
|
|
||||||
return shardctrler.MakeShardCtrler(clnt, ts.leases), clnt
|
|
||||||
}
|
|
||||||
|
|
||||||
func (ts *Test) makeKVClerk() *kvsrv.Clerk {
|
|
||||||
srv := tester.ServerName(tester.GRP0, 0)
|
|
||||||
clnt := ts.Config.MakeClient()
|
|
||||||
return kvsrv.MakeClerk(clnt, srv).(*kvsrv.Clerk)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (ts *Test) newGid() tester.Tgid {
|
|
||||||
ts.mu.Lock()
|
|
||||||
defer ts.mu.Unlock()
|
|
||||||
|
|
||||||
gid := ts.ngid
|
|
||||||
ts.ngid += 1
|
|
||||||
return gid
|
|
||||||
}
|
|
||||||
|
|
||||||
func (ts *Test) groups(n int) []tester.Tgid {
|
|
||||||
grps := make([]tester.Tgid, n)
|
|
||||||
for i := 0; i < n; i++ {
|
|
||||||
grps[i] = ts.newGid()
|
|
||||||
}
|
|
||||||
return grps
|
|
||||||
}
|
|
||||||
|
|
||||||
// Set up KVServervice with one group Gid1. Gid1 should initialize itself to
|
|
||||||
// own all shards.
|
|
||||||
func (ts *Test) setupKVService() tester.Tgid {
|
|
||||||
ts.sck = ts.makeShardCtrler()
|
|
||||||
scfg := shardcfg.MakeShardConfig()
|
|
||||||
ts.Config.MakeGroupStart(shardcfg.Gid1, NSRV, ts.StartServerShardGrp)
|
|
||||||
scfg.JoinBalance(map[tester.Tgid][]string{shardcfg.Gid1: ts.Group(shardcfg.Gid1).SrvNames()})
|
|
||||||
ts.sck.InitConfig(scfg)
|
|
||||||
return shardcfg.Gid1
|
|
||||||
}
|
|
||||||
|
|
||||||
func (ts *Test) StartServerShardGrp(servers []*labrpc.ClientEnd, gid tester.Tgid, me int, persister *tester.Persister) []tester.IService {
|
|
||||||
return shardgrp.StartServerShardGrp(servers, gid, me, persister, ts.maxraftstate)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Add group gid
|
|
||||||
func (ts *Test) join(sck *shardctrler.ShardCtrler, gid tester.Tgid, srvs []string) rpc.Err {
|
|
||||||
cfg, _ := sck.Query()
|
|
||||||
newcfg := cfg.Copy()
|
|
||||||
ok := newcfg.JoinBalance(map[tester.Tgid][]string{gid: srvs})
|
|
||||||
if !ok {
|
|
||||||
return rpc.ErrVersion
|
|
||||||
}
|
|
||||||
err := sck.ChangeConfigTo(newcfg)
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
func (ts *Test) joinGroups(sck *shardctrler.ShardCtrler, gids []tester.Tgid) rpc.Err {
|
|
||||||
for i, gid := range gids {
|
|
||||||
ts.Config.MakeGroupStart(gid, NSRV, ts.StartServerShardGrp)
|
|
||||||
if err := ts.join(sck, gid, ts.Group(gid).SrvNames()); err != rpc.OK {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
if i < len(gids)-1 {
|
|
||||||
time.Sleep(INTERGRPDELAY * time.Millisecond)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return rpc.OK
|
|
||||||
}
|
|
||||||
|
|
||||||
// Group gid leaves.
|
|
||||||
func (ts *Test) leave(sck *shardctrler.ShardCtrler, gid tester.Tgid) rpc.Err {
|
|
||||||
cfg, _ := sck.Query()
|
|
||||||
newcfg := cfg.Copy()
|
|
||||||
ok := newcfg.LeaveBalance([]tester.Tgid{gid})
|
|
||||||
if !ok {
|
|
||||||
return rpc.ErrVersion
|
|
||||||
}
|
|
||||||
return sck.ChangeConfigTo(newcfg)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (ts *Test) leaveGroups(sck *shardctrler.ShardCtrler, gids []tester.Tgid) rpc.Err {
|
|
||||||
for i, gid := range gids {
|
|
||||||
if err := ts.leave(sck, gid); err != rpc.OK {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
ts.Config.ExitGroup(gid)
|
|
||||||
if i < len(gids)-1 {
|
|
||||||
time.Sleep(INTERGRPDELAY * time.Millisecond)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return rpc.OK
|
|
||||||
}
|
|
||||||
|
|
||||||
func (ts *Test) disconnectRaftLeader(gid tester.Tgid) (int, string) {
|
|
||||||
_, l := rsm.Leader(ts.Config, gid)
|
|
||||||
g := ts.Group(gid)
|
|
||||||
ln := g.SrvName(l)
|
|
||||||
g.DisconnectAll(l)
|
|
||||||
return l, ln
|
|
||||||
}
|
|
||||||
|
|
||||||
func (ts *Test) reconnectOldLeader(gid tester.Tgid, l int) {
|
|
||||||
g := ts.Group(gid)
|
|
||||||
g.ConnectOne(l)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (ts *Test) disconnectClntFromLeader(clnt *tester.Clnt, gid tester.Tgid) int {
|
|
||||||
l, ln := ts.disconnectRaftLeader(gid)
|
|
||||||
p := ts.Group(gid).AllowServersExcept(l)
|
|
||||||
srvs := ts.Group(gid).SrvNamesTo(p)
|
|
||||||
clnt.Disconnect(ln)
|
|
||||||
clnt.ConnectTo(srvs)
|
|
||||||
return l
|
|
||||||
}
|
|
||||||
|
|
||||||
func (ts *Test) checkLogs(gids []tester.Tgid) {
|
|
||||||
for _, gid := range gids {
|
|
||||||
n := ts.Group(gid).LogSize()
|
|
||||||
s := ts.Group(gid).SnapshotSize()
|
|
||||||
if ts.maxraftstate >= 0 && n > 8*ts.maxraftstate {
|
|
||||||
ts.t.Fatalf("persister.RaftStateSize() %v, but maxraftstate %v",
|
|
||||||
n, ts.maxraftstate)
|
|
||||||
}
|
|
||||||
if ts.maxraftstate < 0 && s > 0 {
|
|
||||||
ts.t.Fatalf("maxraftstate is -1, but snapshot is non-empty!")
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// make sure that the data really is sharded by
|
|
||||||
// shutting down one shard and checking that some
|
|
||||||
// Get()s don't succeed.
|
|
||||||
func (ts *Test) checkShutdownSharding(down, up tester.Tgid, ka []string, va []string) {
|
|
||||||
const NSEC = 2
|
|
||||||
|
|
||||||
ts.Group(down).Shutdown()
|
|
||||||
|
|
||||||
ts.checkLogs([]tester.Tgid{down, up}) // forbid snapshots
|
|
||||||
|
|
||||||
n := len(ka)
|
|
||||||
ch := make(chan string)
|
|
||||||
done := int32(0)
|
|
||||||
for xi := 0; xi < n; xi++ {
|
|
||||||
ck1 := ts.MakeClerk()
|
|
||||||
go func(i int) {
|
|
||||||
v, _, _ := ck1.Get(ka[i])
|
|
||||||
if atomic.LoadInt32(&done) == 1 {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
if v != va[i] {
|
|
||||||
ch <- fmt.Sprintf("Get(%v): expected:\n%v\nreceived:\n%v", ka[i], va[i], v)
|
|
||||||
} else {
|
|
||||||
ch <- ""
|
|
||||||
}
|
|
||||||
}(xi)
|
|
||||||
}
|
|
||||||
|
|
||||||
// wait a bit, only about half the Gets should succeed.
|
|
||||||
ndone := 0
|
|
||||||
for atomic.LoadInt32(&done) != 1 {
|
|
||||||
select {
|
|
||||||
case err := <-ch:
|
|
||||||
if err != "" {
|
|
||||||
ts.Fatalf(err)
|
|
||||||
}
|
|
||||||
ndone += 1
|
|
||||||
case <-time.After(time.Second * NSEC):
|
|
||||||
atomic.StoreInt32(&done, 1)
|
|
||||||
break
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
//log.Printf("%d completions out of %d; down %d", ndone, n, down)
|
|
||||||
if ndone >= n {
|
|
||||||
ts.Fatalf("expected less than %d completions with one shard dead\n", n)
|
|
||||||
}
|
|
||||||
|
|
||||||
// bring the crashed shard/group back to life.
|
|
||||||
ts.Group(down).StartServers()
|
|
||||||
}
|
|
||||||
|
|
||||||
// Run one controler and then partition it after some time. Run
|
|
||||||
// another cntrler that must finish the first ctrler's unfinished
|
|
||||||
// shard moves. To ensure first ctrler is in a join/leave the test
|
|
||||||
// shuts down shardgrp `gid`. After the second controller is done,
|
|
||||||
// heal the partition to test if Freeze,InstallShard, and Delete are
|
|
||||||
// are fenced.
|
|
||||||
func (ts *Test) killCtrler(ck kvtest.IKVClerk, gid tester.Tgid, ka, va []string) {
|
|
||||||
const (
|
|
||||||
NSLEEP = 2
|
|
||||||
|
|
||||||
RAND = 1000
|
|
||||||
|
|
||||||
JOIN = 1
|
|
||||||
LEAVE = 2
|
|
||||||
)
|
|
||||||
|
|
||||||
sck, clnt := ts.makeShardCtrlerClnt()
|
|
||||||
if err := sck.InitController(); err != rpc.OK {
|
|
||||||
ts.Fatalf("failed to init controller %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
cfg, _ := ts.ShardCtrler().Query()
|
|
||||||
num := cfg.Num
|
|
||||||
|
|
||||||
state := 0
|
|
||||||
ngid := tester.Tgid(0)
|
|
||||||
go func() {
|
|
||||||
for {
|
|
||||||
ngid = ts.newGid()
|
|
||||||
state = JOIN
|
|
||||||
err := ts.joinGroups(sck, []tester.Tgid{ngid})
|
|
||||||
if err == rpc.OK {
|
|
||||||
state = LEAVE
|
|
||||||
err = ts.leaveGroups(sck, []tester.Tgid{ngid})
|
|
||||||
} else {
|
|
||||||
//log.Printf("deposed err %v", err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
|
|
||||||
r := rand.Int() % RAND
|
|
||||||
d := time.Duration(r) * time.Millisecond
|
|
||||||
time.Sleep(d)
|
|
||||||
|
|
||||||
//log.Printf("shutdown gid %d after %dms", gid, r)
|
|
||||||
ts.Group(gid).Shutdown()
|
|
||||||
|
|
||||||
// sleep for a while to get the chance for the controler to get stuck
|
|
||||||
// in join or leave, because gid is down
|
|
||||||
time.Sleep(NSLEEP * time.Second)
|
|
||||||
|
|
||||||
//log.Printf("disconnect sck %v ngid %d num %d state %d", d, ngid, num, state)
|
|
||||||
|
|
||||||
// partition controller
|
|
||||||
clnt.DisconnectAll()
|
|
||||||
|
|
||||||
if ts.leases {
|
|
||||||
// wait until sck's lease expired before restarting shardgrp `gid`
|
|
||||||
time.Sleep((param.LEASETIMESEC + 1) * time.Second)
|
|
||||||
}
|
|
||||||
|
|
||||||
ts.Group(gid).StartServers()
|
|
||||||
|
|
||||||
// start new controler to pick up where sck left off
|
|
||||||
sck0, clnt0 := ts.makeShardCtrlerClnt()
|
|
||||||
if err := sck0.InitController(); err != rpc.OK {
|
|
||||||
ts.Fatalf("failed to init controller %v", err)
|
|
||||||
}
|
|
||||||
cfg, _ = sck0.Query()
|
|
||||||
s := "join"
|
|
||||||
if state == LEAVE {
|
|
||||||
s = "leave"
|
|
||||||
}
|
|
||||||
//log.Printf("%v cfg %v recovered %s", s, cfg, s)
|
|
||||||
|
|
||||||
if cfg.Num <= num {
|
|
||||||
ts.Fatalf("didn't recover; expected %d > %d", num, cfg.Num)
|
|
||||||
}
|
|
||||||
|
|
||||||
present := cfg.IsMember(ngid)
|
|
||||||
if (state == JOIN && !present) || (state == LEAVE && present) {
|
|
||||||
ts.Fatalf("didn't recover %d correctly after %v", ngid, s)
|
|
||||||
}
|
|
||||||
|
|
||||||
if state == JOIN && present {
|
|
||||||
// cleanup if disconnected after join but before leave
|
|
||||||
ts.leaveGroups(sck0, []tester.Tgid{ngid})
|
|
||||||
}
|
|
||||||
|
|
||||||
for i := 0; i < len(ka); i++ {
|
|
||||||
ts.CheckGet(ck, ka[i], va[i], rpc.Tversion(1))
|
|
||||||
}
|
|
||||||
|
|
||||||
sck0.ExitController()
|
|
||||||
|
|
||||||
if ts.leases {
|
|
||||||
// reconnect old controller, which shouldn't be able
|
|
||||||
// to do anything
|
|
||||||
clnt.ConnectAll()
|
|
||||||
|
|
||||||
time.Sleep(1 * time.Second)
|
|
||||||
|
|
||||||
for i := 0; i < len(ka); i++ {
|
|
||||||
ts.CheckGet(ck, ka[i], va[i], rpc.Tversion(1))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
ts.Config.DeleteClient(clnt)
|
|
||||||
ts.Config.DeleteClient(clnt0)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (ts *Test) electCtrler(ck kvtest.IKVClerk, ka, va []string) {
|
|
||||||
const (
|
|
||||||
NSEC = 5
|
|
||||||
N = 4
|
|
||||||
)
|
|
||||||
|
|
||||||
ch := make(chan struct{})
|
|
||||||
f := func(ch chan struct{}, i int) {
|
|
||||||
for true {
|
|
||||||
select {
|
|
||||||
case <-ch:
|
|
||||||
return
|
|
||||||
default:
|
|
||||||
ngid := ts.newGid()
|
|
||||||
sck := ts.makeShardCtrler()
|
|
||||||
if err := sck.InitController(); err != rpc.OK {
|
|
||||||
ts.Fatalf("failed to init controller %v", err)
|
|
||||||
}
|
|
||||||
//log.Printf("%d(%p): join/leave %v", i, sck, ngid)
|
|
||||||
if err := ts.joinGroups(sck, []tester.Tgid{ngid}); err == rpc.OK {
|
|
||||||
ts.leaveGroups(sck, []tester.Tgid{ngid})
|
|
||||||
}
|
|
||||||
sck.ExitController()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
for i := 0; i < N; i++ {
|
|
||||||
go f(ch, i)
|
|
||||||
}
|
|
||||||
|
|
||||||
// let f()'s run for a while
|
|
||||||
time.Sleep(NSEC * time.Second)
|
|
||||||
|
|
||||||
for i := 0; i < N; i++ {
|
|
||||||
ch <- struct{}{}
|
|
||||||
}
|
|
||||||
for i := 0; i < len(ka); i++ {
|
|
||||||
ts.CheckGet(ck, ka[i], va[i], rpc.Tversion(1))
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
Loading…
x
Reference in New Issue
Block a user