MIT-6.824 Lab3B 总结与备忘
思路整理
RateState(currentTerm, voteFor, logEntries 和你要自己存储的状态)。 Snapshot (Server 维护的KeyValue数据库,其实就是内存中一个map, 和Server维护的关于每个Client Request的状态)。
对于Leader:
- Leader在处理Op时,如果发现RaftStateSize高于阈值,则将自身的
kv_database
、last_request_id
制作成snapshot, 并调用Leader Raft的Snapshot()
,将快照存储起来 - 该接口安装Snapshot分为两步:修剪
log Entries[]
、通过Persister持久化存储,之后Leader在Appendentries中将本次的SnapShot信息发送给落后的Follower
- Leader在处理Op时,如果发现RaftStateSize高于阈值,则将自身的
对于Follower:
Raft Leader在处理数据的时候,如果发现某个follower
leader.next_index[follower_id] - 1 < rf.last_snapshot_point_index
,则表明该follower落后,Leader将snapshot发送给followerfollower的
InstallSnapshot
(raft/snapshot.go
)处理Leader的snapshot RPC,获取snapshot数据并裁剪log,通过ApplyCh设置相关数据SnapshotValid
为true
,并载入rf.applyCh
1
2
3
4
5
6
7
8msg := ApplyMsg{
SnapshotValid: true,
Snapshot: args.Data,
SnapshotTerm: rf.last_snapshot_point_term,
SnapshotIndex: rf.last_snapshot_point_index,
}
// ...
rf.applyCh <- msgRead_Raft_Apply_Command()
此时检测到kv.applyCh
中有数据存在,然后发现message.SnapshotValid
的时候,读取SnapShot中的数据,以实现数据的一致性1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18func (kv *KVServer) ReadSnapShotToInstall(snapshot []byte) {
if snapshot == nil || len(snapshot) < 1 { // bootstrap without any state?
return
}
r := bytes.NewBuffer(snapshot)
d := labgob.NewDecoder(r)
var persist_kvdb map[string]string
var persist_lastRequestId map[int64]int
if d.Decode(&persist_kvdb) != nil || d.Decode(&persist_lastRequestId) != nil {
DPrintf("KVSERVER %d read persister got a problem!!!!!!!!!!", kv.me)
} else {
kv.kv_database = persist_kvdb
kv.last_request_id = persist_lastRequestId
}
}
这里和Lab2的关联在于,KVServer检测到raft state size比较大时,主动制作snapshot,此时data是key/value数据库,以及
last_request_id
KVServer在初始化时,需要检查是否有Snapshot,如果有就恢复出来
KVServer执行Op时需要检查从applyCh取出的message的
message.CommandIndex
是否大于kv.last_snapshot_log_index
,如果小于或等于,说明该Op已经被执行过,则直接return
关键代码
KVServer恢复:
1
2
3
4
5
6
7
8
9
10
11func StartKVServer(servers []*labrpc.ClientEnd, me int, persister *raft.Persister, maxraftstate int) *KVServer {
// ...
snapshot := persister.ReadSnapshot()
if len(snapshot) > 0 {
kv.ReadSnapShotToInstall(snapshot)
}
go kv.Read_Raft_Apply_Command()
return kv
}KVServer检查message是否已经被执行
1
2
3
4
5
6
7
8func (kv *KVServer) Get_Command_From_Raft(message raft.ApplyMsg) {
op := message.Command.(Op)
if message.CommandIndex <= kv.last_snapshot_log_index { // 当前命令已经被snapshot了
return
}
// ...
}Leader主动制作Snapshot:(第二个代码块和Lab2D的一部分代码相同,只不过为了理解方便重新复制到这里)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28func (kv *KVServer) Get_Command_From_Raft(message raft.ApplyMsg) {
// ...
if kv.maxraftstate != -1 {
kv.IfNeedToSendSnapShotCommand(message.CommandIndex, 9)
}
kv.Send_Wait_Chan(op, message.CommandIndex)
}
// ...
func (kv *KVServer) IfNeedToSendSnapShotCommand(raftIndex int, proportion int) {
if kv.rf.GetRaftStateSize() > (kv.maxraftstate * proportion / 10) {
// Send SnapShot Command
snapshot := kv.MakeSnapShot()
kv.rf.Snapshot(raftIndex, snapshot)
}
}
// Give it to raft when server decide to start a snapshot
func (kv *KVServer) MakeSnapShot() []byte {
kv.mu.Lock()
defer kv.mu.Unlock()
w := new(bytes.Buffer)
e := labgob.NewEncoder(w)
e.Encode(kv.kv_database)
e.Encode(kv.last_request_id)
data := w.Bytes()
return data
}1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34// raft/snapshot.go
func (rf *Raft) Snapshot(index int, snapshot []byte) {
// Your code here (2D).
rf.mu.Lock()
defer rf.mu.Unlock()
if rf.last_snapshot_point_index >= index || index > rf.commit_index {
return
}
// snapshot the entrier form 1:index(global)
tempLog := make([]Entry, 0)
tempLog = append(tempLog, Entry{})
for i := index + 1; i <= rf.get_last_index(); i++ {
tempLog = append(tempLog, rf.get_entry(i))
}
if index == rf.get_last_index()+1 {
rf.last_snapshot_point_term = rf.get_last_term()
} else {
rf.last_snapshot_point_term = rf.get_log_term(index)
}
rf.last_snapshot_point_index = index
rf.log = tempLog
if index > rf.commit_index {
rf.commit_index = index
}
if index > rf.commit_index {
rf.last_applied = index
}
DPrintf("[SnapShot]Server %d sanpshot until index %d, term %d, loglen %d", rf.me, index, rf.last_snapshot_point_term, len(rf.log)-1)
rf.persister.SaveStateAndSnapshot(rf.persistData(), snapshot)
}follower加载Snapshot:(不是RPC!是主动加载)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29func (kv *KVServer) Read_Raft_Apply_Command() {
for message := range kv.applyCh {
// ...
if message.SnapshotValid {
kv.GetSnapShotFromRaft(message)
}
}
}
// ...
func (kv *KVServer) ReadSnapShotToInstall(snapshot []byte) {
if snapshot == nil || len(snapshot) < 1 { // bootstrap without any state?
return
}
r := bytes.NewBuffer(snapshot)
d := labgob.NewDecoder(r)
var persist_kvdb map[string]string
var persist_lastRequestId map[int64]int
if d.Decode(&persist_kvdb) != nil || d.Decode(&persist_lastRequestId) != nil {
DPrintf("KVSERVER %d read persister got a problem!!!!!!!!!!", kv.me)
} else {
kv.kv_database = persist_kvdb
kv.last_request_id = persist_lastRequestId
}
}