MIT-6.824 Lab4B 总结与备忘
思路整理
Lab4B要实现分片的KeyValue服务
系统规定分片数量为NShards(10),需要构建多个ShardKV服务器集群,即多个Group,每个Group负责几个分片——此时每个Group都有一个Leader
整体框架基本类似于Lab3,但由于数据存储在不同片上,因此要注意实现一个Shard结构体,其中
last_request_id
放在其中1
2
3
4
5type Shard struct {
Shard_id int
Kv_data map[string]string
Last_request_id map[int64]int // 以shard为单位记录最后一个访问该shard的id
}由于ShardKV要和ShardCtrler交互,以获取Config,因此ShardKV结构体中需要一个ShardCtrler成员
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19type ShardKV struct {
mu sync.Mutex
me int
rf *raft.Raft
dead int32
applyCh chan raft.ApplyMsg
make_end func(string) *labrpc.ClientEnd
gid int // 所属的group
ctrlers []*labrpc.ClientEnd
maxraftstate int // snapshot if log grows this big
// Your definitions here.
mck *shardctrler.Clerk // 作为shardctrler.Clerk向shardCtriler获取新的config
ctrl_config shardctrler.Config // 存储配置
kv_database []Shard // 以shard为单位存储
wait_apply_ch map[int]chan Op // index of raft -> chan Op (key为一个请求在raft中的index,value为 Op通道)
last_snapshot_log_index int
migrating_shard [shardctrler.NShards]bool // 如果第i个shard迁移了,则相应第i个元素为true
}Client的代码与Lab3基本没有什么差别,但需要注意RPC的args在for循环里面
Server要完成三个工作,为此需要在
StartServer
时建立三个循环的goroutine处理Client发来的Get、PutAppend请求——这和Lab3非常相似,基本上只用简单改下代码即可,其中RPC handler中要检查请求的数据是否在Server所属Group的Shards中
定时向ShardCtrler申请下一个Config(如果ShardCtrler的
Config.Num
更大),申请到之后交给Raft,在所有的组成员内同步这里无需再实现要给RPC,直接用结构体中
mck
的成员函数Query即可要获得的是下一个Config,而不是最新的Config,因为group要按照Config的顺序改变配置
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27func (kv *ShardKV) Get_New_Config() {
for !kv.killed() {
kv.mu.Lock()
last_config_num := kv.ctrl_config.Num
_, is_leader := kv.rf.GetState()
kv.mu.Unlock()
if !is_leader {
time.Sleep(90 * time.Millisecond)
continue
}
new_config := kv.mck.Query(last_config_num + 1) // 获得下一个config 注意,必须是下一个,而不是最新!即使当前的server滞后,必须按config顺序执行
if new_config.Num == last_config_num+1 {
kv.mu.Lock()
op := Op{
Operation: "newconfig",
New_config: new_config,
}
if _, ifLeader := kv.rf.GetState(); ifLeader {
kv.rf.Start(op) // 更新所有的config
}
kv.mu.Unlock()
}
time.Sleep(90 * time.Millisecond)
}
}在更新Config时,要注意锁的应用,同时通过
kv.migrating_shard
记录哪些shard需要迁移,“冻结”它们使其不可访问(设置migrating_shard
相应位置为true),最后才更新服务器的config——注意这里是如何判断数据是否要迁移1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29func (kv *ShardKV) Execute_NewConfig(op Op) {
kv.mu.Lock()
defer kv.mu.Unlock()
new_config := op.New_config
if new_config.Num != kv.ctrl_config.Num+1 {
return
}
// 更新config时,所有的迁移应当停止
for _, is_migrating := range kv.migrating_shard {
if is_migrating {
return
}
}
// 记录那些shard需要迁移
for shard_id := 0; shard_id < shardctrler.NShards; shard_id++ {
// 不在当前group且不在空闲group的shard,接下来要放入group
if kv.ctrl_config.Shards[shard_id] != kv.gid && new_config.Shards[shard_id] == kv.gid && kv.ctrl_config.Shards[shard_id] != 0 {
kv.migrating_shard[shard_id] = true
}
// 在当前group、将移到非空闲group的shard,接下来要移出group
if kv.ctrl_config.Shards[shard_id] == kv.gid && new_config.Shards[shard_id] != kv.gid && new_config.Shards[shard_id] != 0 {
kv.migrating_shard[shard_id] = true
}
}
kv.ctrl_config = new_config
}
定时检测是否需要根据Config的变化,进行Shard的迁移
如果检测到有数据冻结,需要迁移,则先数据复制出来,放到
map[int][]Shard
中保存,然后向各个目标group发送出去。这里发送迁出的数据即可,需要迁入的数据,其他group的leader会发送RPC过来,在Execute_Migrate
中读出来即可实现数据迁入——复制数据也好,读出数据也好,都要重新make
相关的map1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21// apply the MigrateShardData
for _, cur_shard := range op.Migrate_data {
if !kv.migrating_shard[cur_shard.Shard_id] {
continue
}
kv.migrating_shard[cur_shard.Shard_id] = false
kv.kv_database[cur_shard.Shard_id] = Shard{
Shard_id: cur_shard.Shard_id,
Kv_data: make(map[string]string),
Last_request_id: make(map[int64]int),
}
// 复制数据给自己
if cur_config.Shards[cur_shard.Shard_id] == kv.gid {
for key, value := range cur_shard.Kv_data {
kv.kv_database[cur_shard.Shard_id].Kv_data[key] = value
}
for client_id, request_id := range cur_shard.Last_request_id {
kv.kv_database[cur_shard.Shard_id].Last_request_id[client_id] = request_id
}
}
}以RPC的方式发送,因此需要建立相关的args和reply结构体
1
2
3
4
5
6
7
8
9type MigrateShardArgs struct {
Migrate_data []Shard
Config_num int
}
type MigrateShardReply struct {
Err Err
Config_num int
}处理RPC的时候,要检查args的confignum是否一致,一致时才接受,交由Raft进行同步
要时刻记住Config存储了全局的配置信息,以及要知道如何根据映射关系,找到自己需要的数据
加锁和解锁!
关键代码
Client的RPC代码(以PutAppend为例):
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33func (ck *Clerk) PutAppend(key string, value string, op string) {
ck.Request_id += 1
for {
args := PutAppendArgs{
Key: key,
Value: value,
Op: op,
Client_id: ck.Client_id,
Request_id: ck.Request_id,
Config_num: ck.config.Num,
}
shard := key2shard(key)
gid := ck.config.Shards[shard]
if servers, ok := ck.config.Groups[gid]; ok {
for si := 0; si < len(servers); si++ {
srv := ck.make_end(servers[si])
var reply PutAppendReply
ok := srv.Call("ShardKV.PutAppend", &args, &reply)
if ok && reply.Err == OK {
return
}
if ok && reply.Err == ErrWrongGroup {
break
}
// ... not ok, or ErrWrongLeader
}
}
time.Sleep(100 * time.Millisecond)
// ask controler for the latest configuration.
ck.config = ck.sm.Query(-1)
}
}迁移数据:
检测是否存在数据要迁移
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110func (kv *ShardKV) Detect_Migrate_Data() {
for !kv.killed() {
kv.mu.Lock()
_, is_leader := kv.rf.GetState()
kv.mu.Unlock()
if !is_leader {
time.Sleep(120 * time.Millisecond)
continue
}
kv.mu.Lock()
no_migrate := true
for _, need_migrate := range kv.migrating_shard {
if need_migrate {
no_migrate = false
break
}
}
kv.mu.Unlock()
if no_migrate {
time.Sleep(120 * time.Millisecond)
continue
}
kv.mu.Lock()
send_data := make(map[int][]Shard) // target group: shard list
for shard_id := 0; shard_id < shardctrler.NShards; shard_id++ { // 记录发送出去的shard
cur_group := kv.ctrl_config.Shards[shard_id]
if kv.migrating_shard[shard_id] && kv.gid != cur_group { // 这些数据要送出去(shard被冻结,并且shard所属的group和kvserver不同。注意,这里的config已经是最新的了)
tmp_shard := Shard{
Shard_id: shard_id,
Kv_data: make(map[string]string),
Last_request_id: make(map[int64]int),
}
// 将要发送的数据复制出来
for key, value := range kv.kv_database[shard_id].Kv_data {
tmp_shard.Kv_data[key] = value
}
for client_id, request_id := range kv.kv_database[shard_id].Last_request_id {
tmp_shard.Last_request_id[client_id] = request_id
}
send_data[cur_group] = append(send_data[cur_group], tmp_shard)
}
}
kv.mu.Unlock()
if len(send_data) == 0 { // 如果没有要送出去的数据
time.Sleep(120 * time.Millisecond)
continue
}
//Send Data
for target_gid, shards_to_send := range send_data {
kv.mu.Lock()
target_servers := kv.ctrl_config.Groups[target_gid]
args := &MigrateShardArgs{
Migrate_data: make([]Shard, 0),
Config_num: kv.ctrl_config.Num,
}
kv.mu.Unlock()
for _, cur_shard := range shards_to_send {
tmp_shard := Shard{
Shard_id: cur_shard.Shard_id,
Kv_data: make(map[string]string),
Last_request_id: make(map[int64]int),
}
// 将要发送的数据装进来
for key, value := range cur_shard.Kv_data {
tmp_shard.Kv_data[key] = value
}
for client_id, request_id := range cur_shard.Last_request_id {
tmp_shard.Last_request_id[client_id] = request_id
}
args.Migrate_data = append(args.Migrate_data, tmp_shard)
}
// 分别向不同的group发送shard list
go func(groupServers []string, args *MigrateShardArgs) {
for _, groupMember := range groupServers {
callEnd := kv.make_end(groupMember)
migrateReply := MigrateShardReply{}
ok := callEnd.Call("ShardKV.MigrateShard", args, &migrateReply)
kv.mu.Lock()
cur_config_num := kv.ctrl_config.Num
kv.mu.Unlock()
if ok && migrateReply.Err == OK {
// Send to Raft : I'have Send my Components out
if cur_config_num != args.Config_num || kv.check_shard_been_migrated(args.Migrate_data) {
return
} else {
kv.rf.Start(
Op{
Operation: "migrate",
Migrate_data: args.Migrate_data,
Config_num_migrate: args.Config_num,
},
)
return
}
}
}
}(target_servers, args)
}
time.Sleep(120 * time.Millisecond)
}
}RPC handler:和Get、PutAppend的RPC handler相似,但增加一个检查,检测要migrate的shard是否已经迁移完成
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59func (kv *ShardKV) MigrateShard(args *MigrateShardArgs, reply *MigrateShardReply) {
kv.mu.Lock()
cur_config_num := kv.ctrl_config.Num
kv.mu.Unlock()
// 收到RPC时,kvserver的config必须和发送的config时间一致
if args.Config_num > cur_config_num { // 当前的config滞后,可能需要先更新数据,因此不迁移
reply.Err = ErrConfigNum
reply.Config_num = cur_config_num
return
}
if args.Config_num < cur_config_num || kv.check_shard_been_migrated(args.Migrate_data) {
reply.Err = OK
return
}
op := Op{
Operation: "migrate",
Migrate_data: args.Migrate_data,
Config_num_migrate: args.Config_num,
}
raft_log_index, _, _ := kv.rf.Start(op)
kv.mu.Lock()
channel, exist := kv.wait_apply_ch[raft_log_index]
if !exist {
kv.wait_apply_ch[raft_log_index] = make(chan Op, 1)
channel = kv.wait_apply_ch[raft_log_index]
}
kv.mu.Unlock()
select {
case <-time.After(time.Millisecond * 500):
kv.mu.Lock()
_, is_leader := kv.rf.GetState()
cur_config_num := kv.ctrl_config.Num
kv.mu.Unlock()
if args.Config_num <= cur_config_num && kv.check_shard_been_migrated(args.Migrate_data) && is_leader {
reply.Config_num = cur_config_num
reply.Err = OK
} else {
reply.Err = ErrWrongLeader
}
case op_committed := <-channel:
kv.mu.Lock()
cur_config_num := kv.ctrl_config.Num
kv.mu.Unlock()
if op_committed.Config_num_migrate == args.Config_num && args.Config_num <= cur_config_num && kv.check_shard_been_migrated(args.Migrate_data) {
reply.Config_num = cur_config_num
reply.Err = OK
} else {
reply.Err = ErrWrongLeader
}
}
kv.mu.Lock()
delete(kv.wait_apply_ch, raft_log_index)
kv.mu.Unlock()
}收到迁移命令后,将数据复制出来
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31func (kv *ShardKV) Execute_Migrate(op Op) {
kv.mu.Lock()
defer kv.mu.Unlock()
cur_config := kv.ctrl_config
if op.Config_num_migrate != cur_config.Num {
return
}
// apply the MigrateShardData
for _, cur_shard := range op.Migrate_data {
if !kv.migrating_shard[cur_shard.Shard_id] {
continue
}
kv.migrating_shard[cur_shard.Shard_id] = false
kv.kv_database[cur_shard.Shard_id] = Shard{
Shard_id: cur_shard.Shard_id,
Kv_data: make(map[string]string),
Last_request_id: make(map[int64]int),
}
// 复制数据给自己
if cur_config.Shards[cur_shard.Shard_id] == kv.gid {
for key, value := range cur_shard.Kv_data {
kv.kv_database[cur_shard.Shard_id].Kv_data[key] = value
}
for client_id, request_id := range cur_shard.Last_request_id {
kv.kv_database[cur_shard.Shard_id].Last_request_id[client_id] = request_id
}
}
}
}