MIT-6.824 Lab2B 总结与备忘
思路整理
tester
调用Start()
来向leader传入commandleader
每隔一个心跳周期,就将收到的logs发送给自己的follower,如果没有收到logs,则发送空的log entry- raft需要保存:
log []Entry
:log列表,当保存snapshot时清空,并且为了索引和计数的方便,第一个log为空commit_index int
:已经被commit的最高索引next_index []int
:对于第i个服务器,要发送给该服务器的下一个log entry的索引match_index []int
:对于第i个服务器,已知的在该服务器上复制的最高log entry的索引last_snapshot_point_index int
- 注意,这里的index都是全局的index,即如果已经保存了一个snapshot,虽然log从头开始记录,但index为last_snapshot_point_index int+len(log)-1
- 发送RPC的args:同Fig2
- 发送RPC的reply:
- 为了便于修正leader的next_index[],需要增加额外的参数
ConflictingIndex
- 为了便于修正leader的next_index[],需要增加额外的参数
- RPC的处理函数要服从Fig2中的诸多rules,同时要考虑很多奇葩的情况,比如leader发送了RPC,但在follower处理时leader的任期突然变化的情况
- follower在append entry时,应当在entry后面加
...
表明解包
- raft需要保存:
- 选举中要加入一个 UpToDate的概念,即section5.4.1中的eletion restriction, 从而实现figure8中的fault tolerance属性。需要好好理解“up-to-date”的含义,peers只会投票给那些拥有”least as up-to-date than itself”的candidate
- 各个raft每隔一段时间(通常略大于心跳周期),就将已经commit的Entry应用在状态虚拟机上,此时需要检测是否存在已经被commited但没有Applied的Entry,将Entry信息发送给测试系统提供的chan中
- raft需要保存:
last_applied int
applyCh chan ApplyMsg
- raft需要保存:
- 重点仍旧在于Fig2中各个规则的实现:如何结合一些辅助函数来判断规则是否成立
- 需要留意raft结构里变量的变化,包括log的数目增减、index什么时候发生改变
start()
中先根据get_last_index()
得到下一个log entry存放位置,再append entry到rf.log
中,因为get_last_index()
需要append之前的log数目——这个bug比较隐蔽花了很长时间看config.go和test_test.go才查出来
关键代码
一些辅助函数:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44// 获得最后一个log的index
func (rf *Raft) get_last_index() int {
return len(rf.log) - 1 + rf.last_snapshot_point_index
}
// 获得最后一个log对应的term 注意,最开始log里有一个空的entry
func (rf *Raft) get_last_term() int {
if len(rf.log)-1 == 0 {
return rf.last_snapshot_point_term // 当前没有log记录,因此返回上一个快照log的记录
} else {
return rf.log[len(rf.log)-1].Term // 当前有log,直接读取最后一个log的term
}
}
// leader获得某个raft实例的Prev_log_index、term
func (rf *Raft) get_prev_log_info(follower_id int) (int, int) {
prev_log_index := rf.next_index[follower_id] - 1
last_index := rf.get_last_index()
if prev_log_index == last_index+1 {
prev_log_index = last_index
}
return prev_log_index, rf.get_log_term(prev_log_index)
}
// raft通过log index获取相应log的term
func (rf *Raft) get_log_term(log_index int) int {
if log_index == rf.last_snapshot_point_index {
return rf.last_snapshot_point_term
}
return rf.log[log_index-rf.last_snapshot_point_index].Term
}
// raft通过log index获得相应的log entry
func (rf *Raft) get_entry(log_index int) Entry {
return rf.log[log_index-rf.last_snapshot_point_index]
}
// 检查输入的index和term是否新于当前raft存储的log
func (rf *Raft) need_update(index, term int) bool {
cur_log_index_saved := rf.get_last_index()
cur_term_saved := rf.get_last_term()
return term > cur_term_saved ||
(cur_term_saved == term && index >= cur_log_index_saved)
}start:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18func (rf *Raft) Start(command interface{}) (int, int, bool) {
// Your code here (2B).
rf.mu.Lock()
defer rf.mu.Unlock()
if rf.killed() {
return -1, -1, false
}
if rf.state != "leader" {
return -1, -1, false
} else {
// leader raft为该命令建立一个log entry
index := rf.get_last_index() + 1
rf.log = append(rf.log, Entry{Term: rf.cur_term, Command: command})
rf.persist()
return index, rf.cur_term, true
}
}entry的处理(append entry,以及apply entry)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236package raft
import (
"fmt"
"time"
)
type AppendEntryArgs struct {
Term int // 当前leader任期
Leader_id int // 当前leader id
Prev_log_index int // 新entry之前的log entry的索引
Prev_log_term int
Entries []Entry // empty for hearbeat, send a list for efficiency
Leader_commit int // index of leader's commit index
}
type AppendEntryReply struct {
Term int // current term
Success bool // True if follower contained entry which matched the prev_log_index and term
ConflictingIndex int //帮助修正leader的next_index[](如果Success为False)
}
func (rf *Raft) update_commit(is_leader bool, commit_index int) {
if !is_leader {
// AppendEntries rule5
if commit_index > rf.commit_index {
index_last_entry := rf.get_last_index()
if commit_index >= index_last_entry {
rf.commit_index = index_last_entry
} else {
rf.commit_index = commit_index
}
}
} else {
// Rules for Servers, Leaders rule4,更新leader的commit index
rf.commit_index = rf.last_snapshot_point_index
for index := rf.get_last_index(); index >= rf.last_snapshot_point_index+1; index-- {
// N的范围:[last_snapshot_point_index+1, get_last_index()]
sum := 0
for i := range rf.peers {
if i == rf.me {
sum += 1
} else {
if rf.match_index[i] >= index {
sum += 1
}
}
}
if sum >= len(rf.peers)/2+1 && rf.get_log_term(index) == rf.cur_term {
rf.commit_index = index
break
}
}
}
}
func (rf *Raft) AppendEntries(args *AppendEntryArgs, reply *AppendEntryReply) {
rf.mu.Lock()
defer rf.mu.Unlock()
if rf.cur_term > args.Term {
// 不满足rule1
reply.Term = rf.cur_term
reply.Success = false
reply.ConflictingIndex = -1
return
}
rf.cur_term = args.Term
reply.Term = rf.cur_term
reply.Success = true
reply.ConflictingIndex = -1
// 收到心跳,更新状态和时间戳
if rf.state != "follower" {
rf.change_state(TO_FOLLOWER, true)
} else {
rf.timeout = time.Now()
}
// TODO: 考虑snapshot
if rf.get_last_index() < args.Prev_log_index {
// 不满足rule2
reply.Success = false
reply.ConflictingIndex = rf.get_last_index()
fmt.Println("rule2 failed")
return
} else {
if rf.get_log_term(args.Prev_log_index) != args.Prev_log_term {
reply.Success = false
temp_term := rf.get_log_term(args.Prev_log_index)
for index := args.Prev_log_index; index >= rf.last_snapshot_point_index; index-- {
if rf.get_log_term(index) != temp_term {
reply.ConflictingIndex = index + 1
break
}
}
fmt.Println("rule2 failed")
return
}
}
// rule3、rule4
rf.log = append(rf.log[:args.Prev_log_index-rf.last_snapshot_point_index+1], args.Entries...) // ...表明args.Entries需要解包
// rule5
if args.Leader_commit > rf.commit_index {
rf.update_commit(false, args.Leader_commit)
}
}
func (rf *Raft) leader_append_entries() {
for index := range rf.peers {
if index == rf.me { // 不向自己发送消息
continue
}
go func(follower_id int) {
rf.mu.Lock()
if rf.state != "leader" {
rf.mu.Unlock()
return
}
// TODO: 缺少对snapshot的处理
entry_args := AppendEntryArgs{}
if rf.get_last_index() >= rf.next_index[follower_id] {
// 发送append entry RPC
entry_to_replicate := make([]Entry, 0)
entry_to_replicate = append(entry_to_replicate, rf.log[rf.next_index[follower_id]-rf.last_snapshot_point_index:]...)
prev_log_index, prev_log_term := rf.get_prev_log_info(follower_id)
entry_args = AppendEntryArgs{
Term: rf.cur_term,
Leader_id: rf.me,
Prev_log_index: prev_log_index,
Prev_log_term: prev_log_term,
Entries: entry_to_replicate,
Leader_commit: rf.commit_index,
}
} else {
// 发送心跳
prev_log_index, prev_log_term := rf.get_prev_log_info(follower_id)
entry_args = AppendEntryArgs{
Term: rf.cur_term,
Leader_id: rf.me,
Prev_log_index: prev_log_index, // prevLogIndex,
Prev_log_term: prev_log_term, // prevLogTerm,
Entries: []Entry{},
Leader_commit: rf.commit_index,
}
}
entry_reply := AppendEntryReply{}
rf.mu.Unlock()
ok := rf.sendAppendEntries(follower_id, &entry_args, &entry_reply)
if ok {
rf.mu.Lock()
defer rf.mu.Unlock()
if rf.state != "leader" {
// 可能不再是leader
return
}
if rf.cur_term < entry_reply.Term {
// 可能任期不足
rf.cur_term = entry_reply.Term
rf.change_state(TO_FOLLOWER, true)
return
}
// Rules for servers,leaders rule3
if entry_reply.Success { // 成功
rf.match_index[follower_id] = len(entry_args.Entries) + entry_args.Prev_log_index
rf.next_index[follower_id] = rf.match_index[follower_id] + 1
rf.update_commit(true, -1)
// if len(entry_args.Entries) != 0 {
// fmt.Println("log index", rf.get_last_index(), "replicated in follower", follower_id)
// }
} else { // 失败
if entry_reply.ConflictingIndex != -1 {
fmt.Println("something wrong")
rf.next_index[follower_id] = entry_reply.ConflictingIndex
}
}
}
}(index)
}
}
// start ticker goroutine to append entries (including sending heartbeat)
func (rf *Raft) leader_append_entries_ticker() {
for !rf.killed() {
time.Sleep(Heat_beat_time)
rf.mu.Lock()
if rf.state == "leader" {
rf.mu.Unlock()
rf.leader_append_entries()
} else {
rf.mu.Unlock()
}
}
}
// 模拟集群把已经commited 的Entry应用在状态虚拟机上,检测是否存在已经被commited但没有Applied的Entry,将Entry信息发送给测试系统提供的chan中
func (rf *Raft) committed_to_apply_ticker() {
for !rf.killed() {
time.Sleep(Apply_time)
rf.mu.Lock()
if rf.last_applied >= rf.commit_index {
rf.mu.Unlock()
continue
}
Messages := make([]ApplyMsg, 0)
for rf.last_applied < rf.commit_index && rf.last_applied < rf.get_last_index() {
rf.last_applied += 1 // index都从1开始,last_applied初始化为0
Messages = append(
Messages, ApplyMsg{
CommandValid: true,
Command: rf.get_entry(rf.last_applied).Command,
CommandIndex: rf.last_applied,
SnapshotValid: false,
})
}
rf.mu.Unlock() // 必须先unlock,再发给applyCh
// fmt.Println(rf.me, rf.state, Messages)
for _, message := range Messages {
rf.applyCh <- message
}
}
}