MIT-6.824 Lab4A 总结与备忘
思路整理
Lab4的目标是创建一个 “分布式、拥有分片(shard)功能、控制server加入和退出不同的group、根据Config迁移shard到不同group的,Key-Value数据库服务“,因此Lab4的代码框架和Lab3非常相似,是Lab3的扩展
具体而言。Lab4要实现:
- 用户通过Query RPC向ShardCtrler请求最新的Config,以获知数据分片的位置信息,然后向相应的Key-Value Servers请求某个Key-Value对
- Key-Value Servers会根据Config组成不同的Group,不同的Group通过轮询ShardCtrler得到最新的Config,并根据Config进行数据分片的迁移,以及Group成员的改变
Lab4A要构造ShardCtrler,它负责调度Client和后端数据库。多个ShardCtrler服务器组成一个Raft管理集群,集群维护配置列表
[]Config
。参考代码给出的Config结构为:1
2
3
4
5type Config struct {
Num int // config number
Shards [NShards]int // shard -> gid 根据分片id,查找其所在的group
Groups map[int][]string // gid -> servers[] 根据groupid,查找属于该group的servers id
}Lab4A中:
- Client的功能基本不变,只是将原先的Put、Append、Get改为Join、Leave、Move、Query
- RPC handler的处理流程基本类似Lab3中的handler流程,其中Query类似Get,其余类似PutAppend
- Server的运行流程同样相同,循环地从ApplyCh中读取写入raft集群commit了的命令,根据命令(Op)的类型分别实现Config的更改。每更改一次Config,就append到Server结构体中的Config列表
综上,Command的处理流程为:
- Client向Leader发送Command
- Leader检测触发条件和执行条件,如果满足则将Op交给Raft,实现集群的log同步,然后设置WaitChan,等待ApplyCh输出commit后的Op(这里同样设置了超时的处理)
- 每个Server从自己的ApplyCh中读出Op后,进行”重复检测“、命令执行、SnapShot制作(如果满足条件)、发给WaitChan(如果是Leader,不是Leader则
channel, exist := sc.wait_apply_ch[command_index]
中,exist为False) - Leader将结果从WaitChan读出,如果符合条件则reply OK
因此,基本上Lab3的代码简单改下变量名就可以用在Lab4A上
新的Config基于Config列表中的最后一个Config得到,要注意go的map是引用,如果把一个map类型的变量分配给另一个变量,两个变量都会引用同一个map,因此要用
make()
创建一个新的map对象并单独复制键和值存在一个问题:如何在Key-Value Servers Group成员变化时,重新平衡shard负载(即,ShardCtrler收到Join、Leave RPC时,需要在新的Group中,确定性地重新分配Shard)
- 举例:5个Groups,6个Shards,则最终分配应当是11112,并且必须是确定的,即每次运行后,Shard所属的Group是一样的
- 因此根据现有的Group(Join和leave之后的Group),排序其拥有的Shard数目,多退少补,其中先退再补。退的时候,被”退“的Shard可以暂存于Group 0中
Command的含义:
- Join : 新加入的Group信息
- Leave : 某些Group要离开集群
- Move : 将Shard分配给GID的Group
- Query : 查询最新的Config信息
关键代码
重新平衡负载:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87func (sc *ShardCtrler) reBalanceShards(groupid_shardnum map[int]int, last_shards [NShards]int) [NShards]int {
// 根据group_id:shard_num映射,和shard_id:group_id映射,重新平衡负载
group_num := len(groupid_shardnum)
avg_shard_per_group := NShards / group_num
subNum := NShards % group_num // 无法均匀分配的shard数目
realSortNum := realNumArray(groupid_shardnum)
//多退
for i := group_num - 1; i >= 0; i-- {
resultNum := avg_shard_per_group
var is_avg bool // 将无法均匀分配的剩余shard,放到realSortNum中后面那几个group中,每个group分一个
if i < group_num-subNum {
is_avg = true
} else {
is_avg = false
}
if !is_avg {
resultNum = avg_shard_per_group + 1
}
if resultNum < groupid_shardnum[realSortNum[i]] {
fromGid := realSortNum[i] // 从这个group转移出changeNum个shard
changeNum := groupid_shardnum[fromGid] - resultNum // 要转移的shard数目
for shard, gid := range last_shards {
if changeNum <= 0 {
break
}
if gid == fromGid {
last_shards[shard] = 0 // fromGid的这个shard高于avg_shard_per_group,要“退”,退到group 0暂存
changeNum -= 1
}
}
groupid_shardnum[fromGid] = resultNum
}
}
//少补
for i := 0; i < group_num; i++ {
resultNum := avg_shard_per_group
var flag bool
if i < avg_shard_per_group-subNum {
flag = true
} else {
flag = false
}
if !flag {
resultNum = avg_shard_per_group + 1
}
if resultNum > groupid_shardnum[realSortNum[i]] {
toGid := realSortNum[i]
changeNum := resultNum - groupid_shardnum[toGid]
for shard, gid := range last_shards {
if changeNum <= 0 {
break
}
if gid == 0 {
last_shards[shard] = toGid // fromGid的这个shard少于avg_shard_per_group,要“补”,从group 0中补
changeNum -= 1
}
}
groupid_shardnum[toGid] = resultNum
}
}
return last_shards
}
// 根据group_id下shard的数目升序排序group_id,如果shard数目相同,则group_id小的在前面
func realNumArray(groupid_shardnum map[int]int) []int {
group_num := len(groupid_shardnum)
numArray := make([]int, 0, group_num) // length 0, capacity 10
for group_id, _ := range groupid_shardnum {
numArray = append(numArray, group_id)
}
for i := 0; i < group_num-1; i++ {
for j := group_num - 1; j > i; j-- {
if groupid_shardnum[numArray[j]] < groupid_shardnum[numArray[j-1]] || (groupid_shardnum[numArray[j]] == groupid_shardnum[numArray[j-1]] && numArray[j] < numArray[j-1]) {
numArray[j], numArray[j-1] = numArray[j-1], numArray[j]
}
}
}
return numArray
}根据不同的Command,得到新的Config:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133func (sc *ShardCtrler) Execute_Query(op Op) Config {
sc.mu.Lock()
defer sc.mu.Unlock()
sc.last_request_id[op.Client_id] = op.Request_id
if op.Index_Query == -1 || op.Index_Query >= len(sc.configs) {
return sc.configs[len(sc.configs)-1]
}
return sc.configs[op.Index_Query]
}
func (sc *ShardCtrler) Execute_Join(op Op) {
sc.mu.Lock()
defer sc.mu.Unlock()
sc.last_request_id[op.Client_id] = op.Request_id
sc.configs = append(sc.configs, *sc.Join_Config(op.Servers_Join))
}
func (sc *ShardCtrler) Execute_Leave(op Op) {
sc.mu.Lock()
defer sc.mu.Unlock()
sc.last_request_id[op.Client_id] = op.Request_id
sc.configs = append(sc.configs, *sc.Leave_Config(op.Gids_Leave))
}
func (sc *ShardCtrler) Execute_Move(op Op) {
sc.mu.Lock()
defer sc.mu.Unlock()
sc.last_request_id[op.Client_id] = op.Request_id
sc.configs = append(sc.configs, *sc.Move_Config(op.Shard_Move, op.Gid_Move))
}
func (sc *ShardCtrler) Join_Config(servers_join map[int][]string) *Config {
lastConfig := sc.configs[len(sc.configs)-1]
temp_groups := make(map[int][]string)
for gid, server_list := range lastConfig.Groups { // 复制最后一个config的groups
temp_groups[gid] = server_list
}
for gids, join_list := range servers_join { // 将新的serers加入集群
temp_groups[gids] = join_list
}
group_shardnum := make(map[int]int) // 记录最后一个config中,每个group下分别有多少个shard;key:group_id value:shard_id
for gid := range temp_groups {
group_shardnum[gid] = 0
}
for _, gid := range lastConfig.Shards {
if gid != 0 {
group_shardnum[gid]++
}
}
if len(group_shardnum) == 0 { // 如果上一个config为初始config
return &Config{
Num: len(sc.configs),
Shards: [10]int{},
Groups: temp_groups,
}
}
return &Config{
Num: len(sc.configs),
Shards: sc.reBalanceShards(group_shardnum, lastConfig.Shards),
Groups: temp_groups,
}
}
func (sc *ShardCtrler) Leave_Config(gid_leave []int) *Config {
lastConfig := sc.configs[len(sc.configs)-1]
group_del := make(map[int]bool) // 记录删除的group_id
for _, gid := range gid_leave {
group_del[gid] = true
}
temp_groups := make(map[int][]string)
for gid, serverList := range lastConfig.Groups {
temp_groups[gid] = serverList
}
for _, cur_gid := range gid_leave { // 通过group_id删除group
delete(temp_groups, cur_gid)
}
// 得到delete group之后的 group_id:shard_num 映射,和 shard_id:group_id 映射
newShard := lastConfig.Shards
group_shardnum := make(map[int]int)
for gid := range temp_groups { // 初始化 group_id:shard_num 映射
if !group_del[gid] {
group_shardnum[gid] = 0
}
}
for shard, gid := range lastConfig.Shards {
if gid != 0 {
if group_del[gid] {
newShard[shard] = 0 // shard所在的group要被删除,则记录归零
} else {
group_shardnum[gid]++
}
}
}
if len(group_shardnum) == 0 {
return &Config{
Num: len(sc.configs),
Shards: [10]int{},
Groups: temp_groups,
}
}
return &Config{
Num: len(sc.configs),
Shards: sc.reBalanceShards(group_shardnum, newShard),
Groups: temp_groups,
}
}
func (sc *ShardCtrler) Move_Config(shard, gid int) *Config {
last_config := sc.configs[len(sc.configs)-1]
cur_config := Config{
Num: last_config.Num + 1, // len(sc.configs)
Shards: [NShards]int{},
Groups: map[int][]string{},
}
// 复制一遍最后一个config,然后再微调
for shards, gids := range last_config.Shards {
cur_config.Shards[shards] = gids
}
for gidss, servers := range last_config.Groups {
cur_config.Groups[gidss] = servers
}
cur_config.Shards[shard] = gid
return &cur_config
}