Raft Lab2B总结

MIT-6.824 Lab2B 总结与备忘

思路整理

  • tester调用Start()来向leader传入command
  • leader每隔一个心跳周期,就将收到的logs发送给自己的follower,如果没有收到logs,则发送空的log entry
    • raft需要保存:
      • log []Entry:log列表,当保存snapshot时清空,并且为了索引和计数的方便,第一个log为空
      • commit_index int:已经被commit的最高索引
      • next_index []int:对于第i个服务器,要发送给该服务器的下一个log entry的索引
      • match_index []int:对于第i个服务器,已知的在该服务器上复制的最高log entry的索引
      • last_snapshot_point_index int
    • 注意,这里的index都是全局的index,即如果已经保存了一个snapshot,虽然log从头开始记录,但index为last_snapshot_point_index int+len(log)-1
    • 发送RPC的args:同Fig2
    • 发送RPC的reply:
      • 为了便于修正leader的next_index[],需要增加额外的参数ConflictingIndex
    • RPC的处理函数要服从Fig2中的诸多rules,同时要考虑很多奇葩的情况,比如leader发送了RPC,但在follower处理时leader的任期突然变化的情况
    • follower在append entry时,应当在entry后面加...表明解包
  • 选举中要加入一个 UpToDate的概念,即section5.4.1中的eletion restriction, 从而实现figure8中的fault tolerance属性。需要好好理解“up-to-date”的含义,peers只会投票给那些拥有”least as up-to-date than itself”的candidate
  • 各个raft每隔一段时间(通常略大于心跳周期),就将已经commit的Entry应用在状态虚拟机上,此时需要检测是否存在已经被commited但没有Applied的Entry,将Entry信息发送给测试系统提供的chan中
    • raft需要保存:
      • last_applied int
      • applyCh chan ApplyMsg
  • 重点仍旧在于Fig2中各个规则的实现:如何结合一些辅助函数来判断规则是否成立
  • 需要留意raft结构里变量的变化,包括log的数目增减、index什么时候发生改变
  • start()中先根据get_last_index()得到下一个log entry存放位置,再append entry到rf.log中,因为get_last_index()需要append之前的log数目——这个bug比较隐蔽花了很长时间看config.go和test_test.go才查出来

关键代码

  • 一些辅助函数:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    // 获得最后一个log的index
    func (rf *Raft) get_last_index() int {
    return len(rf.log) - 1 + rf.last_snapshot_point_index
    }

    // 获得最后一个log对应的term 注意,最开始log里有一个空的entry
    func (rf *Raft) get_last_term() int {
    if len(rf.log)-1 == 0 {
    return rf.last_snapshot_point_term // 当前没有log记录,因此返回上一个快照log的记录
    } else {
    return rf.log[len(rf.log)-1].Term // 当前有log,直接读取最后一个log的term
    }
    }

    // leader获得某个raft实例的Prev_log_index、term
    func (rf *Raft) get_prev_log_info(follower_id int) (int, int) {
    prev_log_index := rf.next_index[follower_id] - 1
    last_index := rf.get_last_index()
    if prev_log_index == last_index+1 {
    prev_log_index = last_index
    }
    return prev_log_index, rf.get_log_term(prev_log_index)
    }

    // raft通过log index获取相应log的term
    func (rf *Raft) get_log_term(log_index int) int {
    if log_index == rf.last_snapshot_point_index {
    return rf.last_snapshot_point_term
    }
    return rf.log[log_index-rf.last_snapshot_point_index].Term
    }

    // raft通过log index获得相应的log entry
    func (rf *Raft) get_entry(log_index int) Entry {
    return rf.log[log_index-rf.last_snapshot_point_index]
    }

    // 检查输入的index和term是否新于当前raft存储的log
    func (rf *Raft) need_update(index, term int) bool {
    cur_log_index_saved := rf.get_last_index()
    cur_term_saved := rf.get_last_term()
    return term > cur_term_saved ||
    (cur_term_saved == term && index >= cur_log_index_saved)
    }
  • start:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    func (rf *Raft) Start(command interface{}) (int, int, bool) {
    // Your code here (2B).
    rf.mu.Lock()
    defer rf.mu.Unlock()

    if rf.killed() {
    return -1, -1, false
    }
    if rf.state != "leader" {
    return -1, -1, false
    } else {
    // leader raft为该命令建立一个log entry
    index := rf.get_last_index() + 1
    rf.log = append(rf.log, Entry{Term: rf.cur_term, Command: command})
    rf.persist()
    return index, rf.cur_term, true
    }
    }
  • entry的处理(append entry,以及apply entry)

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91
    92
    93
    94
    95
    96
    97
    98
    99
    100
    101
    102
    103
    104
    105
    106
    107
    108
    109
    110
    111
    112
    113
    114
    115
    116
    117
    118
    119
    120
    121
    122
    123
    124
    125
    126
    127
    128
    129
    130
    131
    132
    133
    134
    135
    136
    137
    138
    139
    140
    141
    142
    143
    144
    145
    146
    147
    148
    149
    150
    151
    152
    153
    154
    155
    156
    157
    158
    159
    160
    161
    162
    163
    164
    165
    166
    167
    168
    169
    170
    171
    172
    173
    174
    175
    176
    177
    178
    179
    180
    181
    182
    183
    184
    185
    186
    187
    188
    189
    190
    191
    192
    193
    194
    195
    196
    197
    198
    199
    200
    201
    202
    203
    204
    205
    206
    207
    208
    209
    210
    211
    212
    213
    214
    215
    216
    217
    218
    219
    220
    221
    222
    223
    224
    225
    226
    227
    228
    229
    230
    231
    232
    233
    234
    235
    236
    package raft

    import (
    "fmt"
    "time"
    )

    type AppendEntryArgs struct {
    Term int // 当前leader任期
    Leader_id int // 当前leader id
    Prev_log_index int // 新entry之前的log entry的索引
    Prev_log_term int
    Entries []Entry // empty for hearbeat, send a list for efficiency
    Leader_commit int // index of leader's commit index
    }

    type AppendEntryReply struct {
    Term int // current term
    Success bool // True if follower contained entry which matched the prev_log_index and term
    ConflictingIndex int //帮助修正leader的next_index[](如果Success为False)
    }

    func (rf *Raft) update_commit(is_leader bool, commit_index int) {
    if !is_leader {
    // AppendEntries rule5
    if commit_index > rf.commit_index {
    index_last_entry := rf.get_last_index()
    if commit_index >= index_last_entry {
    rf.commit_index = index_last_entry
    } else {
    rf.commit_index = commit_index
    }
    }
    } else {
    // Rules for Servers, Leaders rule4,更新leader的commit index
    rf.commit_index = rf.last_snapshot_point_index
    for index := rf.get_last_index(); index >= rf.last_snapshot_point_index+1; index-- {
    // N的范围:[last_snapshot_point_index+1, get_last_index()]
    sum := 0
    for i := range rf.peers {
    if i == rf.me {
    sum += 1
    } else {
    if rf.match_index[i] >= index {
    sum += 1
    }
    }
    }

    if sum >= len(rf.peers)/2+1 && rf.get_log_term(index) == rf.cur_term {
    rf.commit_index = index
    break
    }
    }
    }

    }

    func (rf *Raft) AppendEntries(args *AppendEntryArgs, reply *AppendEntryReply) {
    rf.mu.Lock()
    defer rf.mu.Unlock()

    if rf.cur_term > args.Term {
    // 不满足rule1
    reply.Term = rf.cur_term
    reply.Success = false
    reply.ConflictingIndex = -1
    return
    }

    rf.cur_term = args.Term
    reply.Term = rf.cur_term
    reply.Success = true
    reply.ConflictingIndex = -1
    // 收到心跳,更新状态和时间戳
    if rf.state != "follower" {
    rf.change_state(TO_FOLLOWER, true)
    } else {
    rf.timeout = time.Now()
    }

    // TODO: 考虑snapshot

    if rf.get_last_index() < args.Prev_log_index {
    // 不满足rule2
    reply.Success = false
    reply.ConflictingIndex = rf.get_last_index()
    fmt.Println("rule2 failed")
    return
    } else {
    if rf.get_log_term(args.Prev_log_index) != args.Prev_log_term {
    reply.Success = false
    temp_term := rf.get_log_term(args.Prev_log_index)
    for index := args.Prev_log_index; index >= rf.last_snapshot_point_index; index-- {
    if rf.get_log_term(index) != temp_term {
    reply.ConflictingIndex = index + 1
    break
    }
    }
    fmt.Println("rule2 failed")
    return
    }
    }

    // rule3、rule4
    rf.log = append(rf.log[:args.Prev_log_index-rf.last_snapshot_point_index+1], args.Entries...) // ...表明args.Entries需要解包

    // rule5
    if args.Leader_commit > rf.commit_index {
    rf.update_commit(false, args.Leader_commit)
    }
    }

    func (rf *Raft) leader_append_entries() {
    for index := range rf.peers {
    if index == rf.me { // 不向自己发送消息
    continue
    }

    go func(follower_id int) {
    rf.mu.Lock()
    if rf.state != "leader" {
    rf.mu.Unlock()
    return
    }
    // TODO: 缺少对snapshot的处理

    entry_args := AppendEntryArgs{}
    if rf.get_last_index() >= rf.next_index[follower_id] {
    // 发送append entry RPC
    entry_to_replicate := make([]Entry, 0)
    entry_to_replicate = append(entry_to_replicate, rf.log[rf.next_index[follower_id]-rf.last_snapshot_point_index:]...)
    prev_log_index, prev_log_term := rf.get_prev_log_info(follower_id)
    entry_args = AppendEntryArgs{
    Term: rf.cur_term,
    Leader_id: rf.me,
    Prev_log_index: prev_log_index,
    Prev_log_term: prev_log_term,
    Entries: entry_to_replicate,
    Leader_commit: rf.commit_index,
    }
    } else {
    // 发送心跳
    prev_log_index, prev_log_term := rf.get_prev_log_info(follower_id)
    entry_args = AppendEntryArgs{
    Term: rf.cur_term,
    Leader_id: rf.me,
    Prev_log_index: prev_log_index, // prevLogIndex,
    Prev_log_term: prev_log_term, // prevLogTerm,
    Entries: []Entry{},
    Leader_commit: rf.commit_index,
    }
    }
    entry_reply := AppendEntryReply{}
    rf.mu.Unlock()

    ok := rf.sendAppendEntries(follower_id, &entry_args, &entry_reply)

    if ok {
    rf.mu.Lock()
    defer rf.mu.Unlock()
    if rf.state != "leader" {
    // 可能不再是leader
    return
    }

    if rf.cur_term < entry_reply.Term {
    // 可能任期不足
    rf.cur_term = entry_reply.Term
    rf.change_state(TO_FOLLOWER, true)
    return
    }

    // Rules for servers,leaders rule3
    if entry_reply.Success { // 成功
    rf.match_index[follower_id] = len(entry_args.Entries) + entry_args.Prev_log_index
    rf.next_index[follower_id] = rf.match_index[follower_id] + 1
    rf.update_commit(true, -1)
    // if len(entry_args.Entries) != 0 {
    // fmt.Println("log index", rf.get_last_index(), "replicated in follower", follower_id)
    // }
    } else { // 失败
    if entry_reply.ConflictingIndex != -1 {
    fmt.Println("something wrong")
    rf.next_index[follower_id] = entry_reply.ConflictingIndex
    }
    }
    }
    }(index)
    }
    }

    // start ticker goroutine to append entries (including sending heartbeat)
    func (rf *Raft) leader_append_entries_ticker() {
    for !rf.killed() {
    time.Sleep(Heat_beat_time)
    rf.mu.Lock()
    if rf.state == "leader" {
    rf.mu.Unlock()
    rf.leader_append_entries()
    } else {
    rf.mu.Unlock()
    }
    }
    }

    // 模拟集群把已经commited 的Entry应用在状态虚拟机上,检测是否存在已经被commited但没有Applied的Entry,将Entry信息发送给测试系统提供的chan中
    func (rf *Raft) committed_to_apply_ticker() {
    for !rf.killed() {
    time.Sleep(Apply_time)
    rf.mu.Lock()

    if rf.last_applied >= rf.commit_index {
    rf.mu.Unlock()
    continue
    }

    Messages := make([]ApplyMsg, 0)
    for rf.last_applied < rf.commit_index && rf.last_applied < rf.get_last_index() {
    rf.last_applied += 1 // index都从1开始,last_applied初始化为0
    Messages = append(
    Messages, ApplyMsg{
    CommandValid: true,
    Command: rf.get_entry(rf.last_applied).Command,
    CommandIndex: rf.last_applied,
    SnapshotValid: false,
    })
    }
    rf.mu.Unlock() // 必须先unlock,再发给applyCh
    // fmt.Println(rf.me, rf.state, Messages)
    for _, message := range Messages {
    rf.applyCh <- message
    }
    }
    }