MIT-6.824 Lab3A 总结与备忘
思路整理
Lab3实现一个基于Raft的key/value数据库
每个KVServer对应一个Raft peer,KVServer通过Raft服务达成共识(因此KVServer结构体中的一个字段为raft),Client向任意一个KVServer发送请求(Get、Put、Append),如果该KVServer为Leader,则响应请求——如果是ZooKeeper,则任意副本都可以响应Get,但这里Lab要求强一致性,因此必须都由Leader响应。为了标识唯一的请求,并且KVServer要明白请求来自哪个客户端,需要给每个客户端分配唯一的ID(用
nrand()
生成,实际生产环境下会用IP:PORT的方法去定位具体的Client),并且请求用唯一的id和客户端id共同确定请求(Op)结构体为:
1
2
3
4
5
6
7
8type Op struct {
// Field names must start with capital letters
Operation string // "get" "put" "append"
Key string
Value string
Client_id int64 // 发送Op的client(一个请求的id和发送请求的client共同确定一个唯一性的请求)
Request_id int // 同一个请求要有唯一的id
}KVServer的结构体为:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15type KVServer struct {
mu sync.Mutex
me int
rf *raft.Raft // 每个KVServer对应一个Raft peer node(将raft作为一个服务)
applyCh chan raft.ApplyMsg
dead int32 // set by Kill()
maxraftstate int // snapshot if log grows this big
// Your definitions here.
kv_database map[string]string // 存储key value的数据库
wait_apply_ch map[int]chan Op // index of raft -> chan Op (key为一个请求在raft中的index,value为Op通道)
last_request_id map[int64]int // key:client id, value: request id
last_snapshot_log_index int // last SnapShot point , raftIndex
}Client结构体为:
1
2
3
4
5
6
7type Clerk struct {
servers []*labrpc.ClientEnd // kvserver
// You will have to modify this struct.
Client_id int64 // 该客户端的id
Request_id int // 上一个请求的id,每次发送请求时,id自增
Leader_id int // 标识上一次通信的kvserver leader
}
客户端发送一个请求的流程如下:
建立多个KVServer,每个Server会在goroutine ApplyLoop中等待到来的ApplyCh的请求
Client发送一个请求:
自增
Request_id
,得到当前请求的id构造RPC的args和reply。Put和Append RPC相似(都要修改数据库),可以合并为一类,Get单独为一类
无论哪种RPC,都要给定请求的id和Client的id
Put和Append要额外指定Key和Value
PutAppend:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15type Err string
type PutAppendArgs struct {
Key string
Value string
Op string // "Put" or "Append"
// You'll have to add definitions here.
// Field names must start with capital letters,
// otherwise RPC will break.
Client_id int64
Request_id int
}
type PutAppendReply struct {
Err Err
}Get:
1
2
3
4
5
6
7
8
9
10
11type GetArgs struct {
Key string
// You'll have to add definitions here.
Client_id int64
Request_id int
}
type GetReply struct {
Err Err
Value string
}
发送给
Leader_id
对应的KVServer,如果该Server不是Leader,则它在RPC handler里会直接返回,reply的Err字段为”ErrWrongLeader”,此时Client要重新选择一个KVServer,再次发送请求
请求由RPC handler处理(Get RPC handler和PutAppend RPC handler)——记住,处理该RPC的KVServer是leader,这很重要
- RPC handler首先判断KVServer是否存活,以及自己是不是leader,如果挂了或者不是leader则直接返回
- 通过
raft.Start()
提交Op给Raft,开启一个Wait Channel,等待Raft返回给自己请求的结果- 底层共识达成时,Raft所有peer的ApplyCh会收到该请求(Op),此时所有peer的log已经达成一致
- 各个Server执行该Op:
- 如果该Op重复了(根据Cilent_id和Request_id字段决定),则Put、Append不必再执行
- Get不执行
- 如果有Wait Channel在等待,则返回结果给Wait Channel——但只有Leader的WaitChannel才真正返回数据给Client
- RPC handler(Leader的handler)通过Wait Channel接受结果,执行Get操作并填充Reply结构体,或者直接填充Reply结构体。如果超时,则返回结果(可以认为是ErrWrongLeader,让Client按逻辑重发)——超时的处理用Select关键字
Client收到结果,如果为ErrWrongLeader,则自增
Leader_id
,重发,否则返回相应的Value等
Raft的heartbeat interval对3A的
TestSpeed3A
有一定的影响,需要调整好这个参数
关键代码
建立KVServer,开启goroutine,循环检测是否收到ApplyMsg
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25func StartKVServer(servers []*labrpc.ClientEnd, me int, persister *raft.Persister, maxraftstate int) *KVServer {
// call labgob.Register on structures you want
// Go's RPC library to marshall/unmarshall.
labgob.Register(Op{})
kv := new(KVServer)
kv.me = me
kv.maxraftstate = maxraftstate
// You may need initialization code here.
kv.applyCh = make(chan raft.ApplyMsg)
kv.rf = raft.Make(servers, me, persister, kv.applyCh)
// You may need initialization code here.
kv.kv_database = make(map[string]string)
kv.last_request_id = make(map[int64]int)
kv.wait_apply_ch = make(map[int]chan Op)
// TODO: 载入snapshot
go kv.Read_Raft_Apply_Command()
return kv
}Client发送请求:
Get:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27func (ck *Clerk) Get(key string) string {
// You will have to modify this function.
ck.Request_id += 1
leader_id := ck.Leader_id // 任意的server都行(任意副本都能返回Get),但为了一致性,必须call leader
for {
args := GetArgs{
Key: key,
Client_id: ck.Client_id,
Request_id: ck.Request_id,
}
reply := GetReply{}
ok := ck.servers[leader_id].Call("KVServer.Get", &args, &reply)
if !ok || reply.Err == ErrWrongLeader { // 如果出现问题,则切换询问对象
leader_id = (leader_id + 1) % len(ck.servers)
continue
}
if reply.Err == ErrNoKey { // 没有key
return ""
}
if reply.Err == OK { // 获得value
ck.Leader_id = leader_id
return reply.Value
}
}
}PutAppend:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26func (ck *Clerk) PutAppend(key string, value string, op string) {
// You will have to modify this function.
ck.Request_id += 1
leader_id := ck.Leader_id // 任意的server都行(任意副本都能返回Get)
for {
args := PutAppendArgs{
Key: key,
Value: value,
Op: op,
Client_id: ck.Client_id,
Request_id: ck.Request_id,
}
reply := PutAppendReply{}
ok := ck.servers[leader_id].Call("KVServer.PutAppend", &args, &reply)
if !ok || reply.Err == ErrWrongLeader { // 如果出现问题,则切换询问对象
leader_id = (leader_id + 1) % len(ck.servers)
continue
}
if reply.Err == OK { // 获得value
ck.Leader_id = leader_id
return
}
}
}
Leader的RPC handler处理请求:超时的处理,用到了Select
Get请求的处理
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68func (kv *KVServer) Get(args *GetArgs, reply *GetReply) {
if kv.killed() {
reply.Err = ErrWrongLeader
return
}
_, is_leader := kv.rf.GetState()
if !is_leader { // 被client RPC调用的该kvserver不是leader,则数据不一定是最新的,因此reply不为ok
reply.Err = ErrWrongLeader
return
}
// 根据args构建Op,由kv.rf.Start调用
op := Op{
Operation: "get",
Key: args.Key,
Value: "",
Client_id: args.Client_id,
Request_id: args.Request_id,
}
raft_log_index, _, _ := kv.rf.Start(op)
// 构建wait channel
kv.mu.Lock()
wait_ch, exist := kv.wait_apply_ch[raft_log_index]
if !exist {
kv.wait_apply_ch[raft_log_index] = make(chan Op, 1)
wait_ch = kv.wait_apply_ch[raft_log_index]
}
kv.mu.Unlock()
select {
// select 随机执行一个可运行的 case。如果没有 case 可运行,它将阻塞,直到有 case 可运行
// case 必须是一个通信操作,要么是发送要么是接收
case <-time.After(time.Millisecond * 500): // 超时
_, is_leader := kv.rf.GetState()
if kv.Is_Duplicate(op.Client_id, op.Request_id) && is_leader {
value, exist := kv.Execute_Get(op) // Get_Command_From_Raft没有执行Get,因此这里要主动Get
if exist {
reply.Err = OK
reply.Value = value
} else {
reply.Err = ErrNoKey
reply.Value = ""
}
} else {
reply.Err = ErrWrongLeader
}
case op_committed := <-wait_ch: // 此时,该kvserver运行了Get_Command_From_Raft().Send_Wait_Chan()
if op_committed.Client_id == op.Client_id && op_committed.Request_id == op.Request_id {
value, exist := kv.Execute_Get(op)
if exist {
reply.Err = OK
reply.Value = value
} else {
reply.Err = ErrNoKey
reply.Value = ""
}
} else {
reply.Err = ErrWrongLeader
}
}
kv.mu.Lock()
delete(kv.wait_apply_ch, raft_log_index)
kv.mu.Unlock()
return
}Put和Append请求的处理
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54func (kv *KVServer) PutAppend(args *PutAppendArgs, reply *PutAppendReply) {
if kv.killed() {
reply.Err = ErrWrongLeader
return
}
_, is_leader := kv.rf.GetState()
if !is_leader { // 被client RPC调用的该kvserver不是leader,则数据不一定是最新的,因此reply不为ok
reply.Err = ErrWrongLeader
return
}
// 根据args构建Op,由kv.rf.Start调用
op := Op{
Operation: args.Op,
Key: args.Key,
Value: args.Value,
Client_id: args.Client_id,
Request_id: args.Request_id,
}
raft_log_index, _, _ := kv.rf.Start(op)
// 构建wait channel
kv.mu.Lock()
wait_ch, exist := kv.wait_apply_ch[raft_log_index]
if !exist {
kv.wait_apply_ch[raft_log_index] = make(chan Op, 1)
wait_ch = kv.wait_apply_ch[raft_log_index]
}
kv.mu.Unlock()
select {
// select 随机执行一个可运行的 case。如果没有 case 可运行,它将阻塞,直到有 case 可运行
// case 必须是一个通信操作,要么是发送要么是接收
case <-time.After(time.Millisecond * 500): // 超时
_, is_leader := kv.rf.GetState()
if kv.Is_Duplicate(op.Client_id, op.Request_id) && is_leader {
reply.Err = OK
} else {
reply.Err = ErrWrongLeader
}
case op_committed := <-wait_ch: // 此时,该kvserver运行了Get_Command_From_Raft().Send_Wait_Chan(),收到了
if op_committed.Client_id == op.Client_id && op_committed.Request_id == op.Request_id {
reply.Err = OK
} else {
reply.Err = ErrWrongLeader
}
}
kv.mu.Lock()
delete(kv.wait_apply_ch, raft_log_index)
kv.mu.Unlock()
return
}
KVServer(各个peer)检测是否收到ApplyMsg,并执行Get和Append
检测是否收到ApplyMsg
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32func (kv *KVServer) Read_Raft_Apply_Command() {
for message := range kv.applyCh {
// listen to every command applied by its raft ,delivery to relative RPC Handler
if message.CommandValid {
kv.Get_Command_From_Raft(message)
}
if message.SnapshotValid {
fmt.Println("not implemented")
}
}
}
func (kv *KVServer) Get_Command_From_Raft(message raft.ApplyMsg) {
op := message.Command.(Op)
if !kv.Is_Duplicate(op.Client_id, op.Request_id) { // 判断这个request(op)是否重复
// 执行put和append,保证数据库内容一致,get对于非leader的peer来说则无需执行,leader在相应RPC handler执行Get即可
switch op.Operation {
case "Put":
kv.Execute_Put(op)
case "Append":
kv.Execute_Append(op)
}
}
if kv.maxraftstate != -1 {
//TODO 判断是否要snapshot
}
kv.Send_Wait_Chan(op, message.CommandIndex)
// Server(leader)有一个WaitChannel,等待Raft leader返回给自己Request结果
}检测Op是否重复
1
2
3
4
5
6
7
8
9
10
11func (kv *KVServer) Is_Duplicate(client_id int64, request_id int) bool {
kv.mu.Lock()
defer kv.mu.Unlock()
last_request_id, exist := kv.last_request_id[client_id]
if !exist {
return false // 如果kv的记录中不存在这个client id,说明这个request一定不会重复
}
// 如果kv的记录中存在这个client id,那么这个client保存的最后一个requestid要小于当前request,才不会重复
return last_request_id >= request_id
}执行Put
1
2
3
4
5
6
7
8
9func (kv *KVServer) Execute_Put(op Op) (string, bool) {
kv.mu.Lock()
defer kv.mu.Unlock()
kv.kv_database[op.Key] = op.Value // 更新数据库的value
kv.last_request_id[op.Client_id] = op.Request_id
return kv.kv_database[op.Key], true
}执行Append
1
2
3
4
5
6
7
8
9
10
11
12
13
14func (kv *KVServer) Execute_Append(op Op) (string, bool) {
kv.mu.Lock()
defer kv.mu.Unlock()
value, exist := kv.kv_database[op.Key]
if exist {
kv.kv_database[op.Key] = value + op.Value
} else {
kv.kv_database[op.Key] = op.Value
}
kv.last_request_id[op.Client_id] = op.Request_id
return kv.kv_database[op.Key], exist
}发送到Wait Channel
1
2
3
4
5
6
7
8
9
10func (kv *KVServer) Send_Wait_Chan(op Op, command_index int) bool {
kv.mu.Lock()
defer kv.mu.Unlock()
channel, exist := kv.wait_apply_ch[command_index]
if exist {
channel <- op
}
return exist
}
Get的执行
1
2
3
4
5
6
7
8
9func (kv *KVServer) Execute_Get(op Op) (string, bool) {
kv.mu.Lock()
defer kv.mu.Unlock()
value, exist := kv.kv_database[op.Key]
kv.last_request_id[op.Client_id] = op.Request_id // kvserver更新request的记录(对于某个client id)
return value, exist
}