MIT-6.824 Lab2C、2D 总结与备忘
思路整理
2C
- 参考官网上Lab2C的Introduction、Hints与样本代码的实现
- persistence的作用:存储一些不能自动推算出的状态,在peers崩溃后恢复自身状态时读取
- figure2中指出,需要持久化currentTerm,votedFor,log[]三个变量,其他的状态可以通过集群传递过来的信息进行推测
- readpersist和persist函数可以简单理解成:persist编码、readpersist解码
- 重点在于,每次发生变动就把所有的状态都编码持久化一遍。其中,currentTerm, voteFor和logs三个变量一旦发生变化,就一定要在被其他goroutine感知到之前(如:释放锁之前、发送RPC之前)持久化,才能保证原子性
2D
引入SnapShot后,同过Index取log(或者term)的地方,都需要考虑lastSnapShotIndex的影响
SnapShot() 是peers主动存储Snapshot的调用接口,InstallSnapShotRPC是Leader将自己的SnapShot同步给peers的过程,是一个RPC handler——
SnapShot()
的内容和InstallSnapShot
有挺多是相似的leader向follower发送Snapshot的条件 :
rf.nextIndex[follower_id]-1 < rf.lastSnapShotIndex
——follower需要同步的下一个Entry已经被leader持久化,而持久化也表明该entry已经经过majority commit和apply,说明,当前follower的进度非常滞后,leader此时直接告知它lastSnapShotIndex前entry都已经apply了,要求follower直接全部接受在处理RPC时,同样要考虑
rf.cur_term
和args.Term
哪个更大,已经rf.state
是否为follower(如果不是,则change state),并且要更新follower的log具体如何存储,官方代码已经实现了,需要做的只是重新设定follower的
commit_index
等参数CondInstallSnapshot
按照hints,无需处理
关键代码
2C
persist
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39func (rf *Raft) persist() {
// Your code here (2C).
// Example:
// w := new(bytes.Buffer)
// e := labgob.NewEncoder(w)
// e.Encode(rf.xxx)
// e.Encode(rf.yyy)
// data := w.Bytes()
// rf.persister.SaveRaftState(data)
data := rf.persistData()
rf.persister.SaveRaftState(data)
}
func (rf *Raft) persistData() []byte {
// Your code here (2C).
// Example:
// r := bytes.NewBuffer(data)
// d := labgob.NewDecoder(r)
// var xxx
// var yyy
// if d.Decode(&xxx) != nil ||
// d.Decode(&yyy) != nil {
// error...
// } else {
// rf.xxx = xxx
// rf.yyy = yyy
// }
w := new(bytes.Buffer)
e := labgob.NewEncoder(w)
e.Encode(rf.cur_term)
e.Encode(rf.voted_for)
e.Encode(rf.log)
// e.Encode(rf.lastApplied)
e.Encode(rf.last_snapshot_point_index)
e.Encode(rf.last_snapshot_point_term)
//e.Encode(rf.persister.ReadSnapshot())
data := w.Bytes()
return data
}readpersist
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35func (rf *Raft) readPersist(data []byte) {
if data == nil || len(data) < 1 { // bootstrap without any state?
return
}
// Your code here (2C).
// Example:
r := bytes.NewBuffer(data)
d := labgob.NewDecoder(r)
var persist_currentTrem int
var persist_voteFor int
var persist_log []Entry
//var persist_lastApplied int
var persist_lastSSPointIndex int
var persist_lastSSPointTerm int
//var persist_snapshot []byte
if d.Decode(&persist_currentTrem) != nil ||
d.Decode(&persist_voteFor) != nil ||
d.Decode(&persist_log) != nil ||
//d.Decode(&persist_lastApplied) != nil ||
d.Decode(&persist_lastSSPointIndex) != nil ||
d.Decode(&persist_lastSSPointTerm) != nil {
//d.Decode(&persist_snapshot) != nil{
DPrintf("%d read persister got a problem!!!!!!!!!!", rf.me)
} else {
rf.cur_term = persist_currentTrem
rf.voted_for = persist_voteFor
rf.log = persist_log
// rf.lastApplied = persist_lastApplied
rf.last_snapshot_point_index = persist_lastSSPointIndex
rf.last_snapshot_point_term = persist_lastSSPointTerm
// rf.persister.SaveStateAndSnapshot(rf.persistData(),persist_snapshot)
}
}调用的位置:changestate、requestvote最后、start()、AppendEntries()中log append之后、AppendEntries()中收到心跳更新状态和时间戳之后
2D
调用snapshot
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16go func(follower_id int) {
rf.mu.Lock()
if rf.state != "leader" {
rf.mu.Unlock()
return
}
// TODO: 缺少对snapshot的处理
prevLogIndextemp := rf.next_index[follower_id] - 1
// DPrintf("[IfNeedSendSnapShot] leader %d ,lastSSPIndex %d, server %d ,prevIndex %d",rf.me,rf.lastSSPointIndex,server,prevLogIndextemp)
if prevLogIndextemp < rf.last_snapshot_point_index {
go rf.leaderSendSnapShot(follower_id)
rf.mu.Unlock()
return
}
entry_args := AppendEntryArgs{}snapshot逻辑
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165package raft
import "time"
type InstallSnapshotArgs struct {
Term int
LeaderId int
LastIncludeIndex int
LastIncludeTerm int
Data []byte
//Done bool
}
type InstallSnapshotReply struct {
Term int
}
//
// A service wants to switch to snapshot. Only do so if Raft hasn't
// have more recent info since it communicate the snapshot on applyCh.
//
func (rf *Raft) CondInstallSnapshot(lastIncludedTerm int, lastIncludedIndex int, snapshot []byte) bool {
// Your code here (2D).
return true
}
// the service says it has created a snapshot that has
// all info up to and including index. this means the
// service no longer needs the log through (and including)
// that index. Raft should now trim its log as much as possible.
func (rf *Raft) Snapshot(index int, snapshot []byte) {
// Your code here (2D).
rf.mu.Lock()
defer rf.mu.Unlock()
if rf.last_snapshot_point_index >= index || index > rf.commit_index {
return
}
// snapshot the entrier form 1:index(global)
tempLog := make([]Entry, 0)
tempLog = append(tempLog, Entry{})
for i := index + 1; i <= rf.get_last_index(); i++ {
tempLog = append(tempLog, rf.get_entry(i))
}
// TODO fix it in lab 4
if index == rf.get_last_index()+1 {
rf.last_snapshot_point_term = rf.get_last_term()
} else {
rf.last_snapshot_point_term = rf.get_log_term(index)
}
rf.last_snapshot_point_index = index
rf.log = tempLog
if index > rf.commit_index {
rf.commit_index = index
}
if index > rf.commit_index {
rf.last_applied = index
}
DPrintf("[SnapShot]Server %d sanpshot until index %d, term %d, loglen %d", rf.me, index, rf.last_snapshot_point_term, len(rf.log)-1)
rf.persister.SaveStateAndSnapshot(rf.persistData(), snapshot)
}
// InstallSnapShot RPC Handler
func (rf *Raft) InstallSnapShot(args *InstallSnapshotArgs, reply *InstallSnapshotReply) {
rf.mu.Lock()
// DPrintf("[lock] sever %d get the lock",rf.me)
if rf.cur_term > args.Term {
reply.Term = rf.cur_term
rf.mu.Unlock()
return
}
rf.cur_term = args.Term
reply.Term = args.Term
if rf.state != "follower" {
rf.change_state(TO_FOLLOWER, true)
} else {
rf.timeout = time.Now()
rf.persist()
}
if rf.last_snapshot_point_index >= args.LastIncludeIndex {
DPrintf("[HaveSnapShot] sever %d , lastSSPindex %d, leader's lastIncludeIndex %d", rf.me, rf.last_snapshot_point_index, args.LastIncludeIndex)
rf.mu.Unlock()
return
}
index := args.LastIncludeIndex
tempLog := make([]Entry, 0)
tempLog = append(tempLog, Entry{})
for i := index + 1; i <= rf.get_last_index(); i++ {
tempLog = append(tempLog, rf.get_entry(i))
}
rf.last_snapshot_point_term = args.LastIncludeTerm
rf.last_snapshot_point_index = args.LastIncludeIndex
rf.log = tempLog
if index > rf.commit_index {
rf.commit_index = index
}
if index > rf.last_applied {
rf.last_applied = index
}
rf.persister.SaveStateAndSnapshot(rf.persistData(), args.Data)
//rf.persist()
msg := ApplyMsg{
SnapshotValid: true,
Snapshot: args.Data,
SnapshotTerm: rf.last_snapshot_point_term,
SnapshotIndex: rf.last_snapshot_point_index,
}
rf.mu.Unlock()
rf.applyCh <- msg
DPrintf("[FollowerInstallSnapShot]server %d installsnapshot from leader %d, index %d", rf.me, args.LeaderId, args.LastIncludeIndex)
}
func (rf *Raft) leaderSendSnapShot(server int) {
rf.mu.Lock()
DPrintf("[LeaderSendSnapShot]Leader %d (term %d) send snapshot to server %d, index %d", rf.me, rf.cur_term, server, rf.last_snapshot_point_index)
ssArgs := InstallSnapshotArgs{
rf.cur_term,
rf.me,
rf.last_snapshot_point_index,
rf.last_snapshot_point_term,
rf.persister.ReadSnapshot(),
}
ssReply := InstallSnapshotReply{}
rf.mu.Unlock()
re := rf.sendSnapShot(server, &ssArgs, &ssReply)
if !re {
DPrintf("[InstallSnapShot ERROR] Leader %d don't recive from %d", rf.me, server)
}
if re == true {
rf.mu.Lock()
if rf.state != "leader" || rf.cur_term != ssArgs.Term {
rf.mu.Unlock()
return
}
if ssReply.Term > rf.cur_term {
rf.change_state(TO_FOLLOWER, true)
rf.mu.Unlock()
return
}
// leader更新该follower的match_index和next_index
DPrintf("[InstallSnapShot SUCCESS] Leader %d from sever %d", rf.me, server)
rf.match_index[server] = ssArgs.LastIncludeIndex
rf.next_index[server] = ssArgs.LastIncludeIndex + 1
rf.mu.Unlock()
return
}
}