Raft Lab2C-2D总结

MIT-6.824 Lab2C、2D 总结与备忘

思路整理

2C

  • 参考官网上Lab2C的Introduction、Hints与样本代码的实现
  • persistence的作用:存储一些不能自动推算出的状态,在peers崩溃后恢复自身状态时读取
  • figure2中指出,需要持久化currentTerm,votedFor,log[]三个变量,其他的状态可以通过集群传递过来的信息进行推测
    • readpersist和persist函数可以简单理解成:persist编码、readpersist解码
    • 重点在于,每次发生变动就把所有的状态都编码持久化一遍。其中,currentTerm, voteFor和logs三个变量一旦发生变化,就一定要在被其他goroutine感知到之前(如:释放锁之前、发送RPC之前)持久化,才能保证原子性

2D

  • 引入SnapShot后,同过Index取log(或者term)的地方,都需要考虑lastSnapShotIndex的影响

  • SnapShot() 是peers主动存储Snapshot的调用接口,InstallSnapShotRPC是Leader将自己的SnapShot同步给peers的过程,是一个RPC handler——SnapShot()的内容和InstallSnapShot有挺多是相似的

  • leader向follower发送Snapshot的条件 :rf.nextIndex[follower_id]-1 < rf.lastSnapShotIndex——follower需要同步的下一个Entry已经被leader持久化,而持久化也表明该entry已经经过majority commit和apply,说明,当前follower的进度非常滞后,leader此时直接告知它lastSnapShotIndex前entry都已经apply了,要求follower直接全部接受

  • 在处理RPC时,同样要考虑rf.cur_termargs.Term哪个更大,已经rf.state是否为follower(如果不是,则change state),并且要更新follower的log

  • 具体如何存储,官方代码已经实现了,需要做的只是重新设定follower的commit_index等参数

  • CondInstallSnapshot按照hints,无需处理

关键代码

2C

  • persist

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    func (rf *Raft) persist() {
    // Your code here (2C).
    // Example:
    // w := new(bytes.Buffer)
    // e := labgob.NewEncoder(w)
    // e.Encode(rf.xxx)
    // e.Encode(rf.yyy)
    // data := w.Bytes()
    // rf.persister.SaveRaftState(data)
    data := rf.persistData()
    rf.persister.SaveRaftState(data)
    }
    func (rf *Raft) persistData() []byte {

    // Your code here (2C).
    // Example:
    // r := bytes.NewBuffer(data)
    // d := labgob.NewDecoder(r)
    // var xxx
    // var yyy
    // if d.Decode(&xxx) != nil ||
    // d.Decode(&yyy) != nil {
    // error...
    // } else {
    // rf.xxx = xxx
    // rf.yyy = yyy
    // }
    w := new(bytes.Buffer)
    e := labgob.NewEncoder(w)
    e.Encode(rf.cur_term)
    e.Encode(rf.voted_for)
    e.Encode(rf.log)
    // e.Encode(rf.lastApplied)
    e.Encode(rf.last_snapshot_point_index)
    e.Encode(rf.last_snapshot_point_term)
    //e.Encode(rf.persister.ReadSnapshot())
    data := w.Bytes()
    return data
    }
  • readpersist

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    func (rf *Raft) readPersist(data []byte) {
    if data == nil || len(data) < 1 { // bootstrap without any state?
    return
    }
    // Your code here (2C).
    // Example:
    r := bytes.NewBuffer(data)
    d := labgob.NewDecoder(r)

    var persist_currentTrem int
    var persist_voteFor int
    var persist_log []Entry
    //var persist_lastApplied int
    var persist_lastSSPointIndex int
    var persist_lastSSPointTerm int
    //var persist_snapshot []byte

    if d.Decode(&persist_currentTrem) != nil ||
    d.Decode(&persist_voteFor) != nil ||
    d.Decode(&persist_log) != nil ||
    //d.Decode(&persist_lastApplied) != nil ||
    d.Decode(&persist_lastSSPointIndex) != nil ||
    d.Decode(&persist_lastSSPointTerm) != nil {
    //d.Decode(&persist_snapshot) != nil{
    DPrintf("%d read persister got a problem!!!!!!!!!!", rf.me)
    } else {
    rf.cur_term = persist_currentTrem
    rf.voted_for = persist_voteFor
    rf.log = persist_log
    // rf.lastApplied = persist_lastApplied
    rf.last_snapshot_point_index = persist_lastSSPointIndex
    rf.last_snapshot_point_term = persist_lastSSPointTerm
    // rf.persister.SaveStateAndSnapshot(rf.persistData(),persist_snapshot)
    }
    }
  • 调用的位置:changestate、requestvote最后、start()、AppendEntries()中log append之后、AppendEntries()中收到心跳更新状态和时间戳之后

2D

  • 调用snapshot

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    go func(follower_id int) {
    rf.mu.Lock()
    if rf.state != "leader" {
    rf.mu.Unlock()
    return
    }
    // TODO: 缺少对snapshot的处理
    prevLogIndextemp := rf.next_index[follower_id] - 1
    // DPrintf("[IfNeedSendSnapShot] leader %d ,lastSSPIndex %d, server %d ,prevIndex %d",rf.me,rf.lastSSPointIndex,server,prevLogIndextemp)
    if prevLogIndextemp < rf.last_snapshot_point_index {
    go rf.leaderSendSnapShot(follower_id)
    rf.mu.Unlock()
    return
    }

    entry_args := AppendEntryArgs{}
  • snapshot逻辑

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91
    92
    93
    94
    95
    96
    97
    98
    99
    100
    101
    102
    103
    104
    105
    106
    107
    108
    109
    110
    111
    112
    113
    114
    115
    116
    117
    118
    119
    120
    121
    122
    123
    124
    125
    126
    127
    128
    129
    130
    131
    132
    133
    134
    135
    136
    137
    138
    139
    140
    141
    142
    143
    144
    145
    146
    147
    148
    149
    150
    151
    152
    153
    154
    155
    156
    157
    158
    159
    160
    161
    162
    163
    164
    165
    package raft

    import "time"

    type InstallSnapshotArgs struct {
    Term int
    LeaderId int
    LastIncludeIndex int
    LastIncludeTerm int
    Data []byte
    //Done bool
    }

    type InstallSnapshotReply struct {
    Term int
    }

    //
    // A service wants to switch to snapshot. Only do so if Raft hasn't
    // have more recent info since it communicate the snapshot on applyCh.
    //
    func (rf *Raft) CondInstallSnapshot(lastIncludedTerm int, lastIncludedIndex int, snapshot []byte) bool {

    // Your code here (2D).

    return true
    }

    // the service says it has created a snapshot that has
    // all info up to and including index. this means the
    // service no longer needs the log through (and including)
    // that index. Raft should now trim its log as much as possible.
    func (rf *Raft) Snapshot(index int, snapshot []byte) {
    // Your code here (2D).
    rf.mu.Lock()
    defer rf.mu.Unlock()
    if rf.last_snapshot_point_index >= index || index > rf.commit_index {
    return
    }
    // snapshot the entrier form 1:index(global)
    tempLog := make([]Entry, 0)
    tempLog = append(tempLog, Entry{})

    for i := index + 1; i <= rf.get_last_index(); i++ {
    tempLog = append(tempLog, rf.get_entry(i))
    }

    // TODO fix it in lab 4
    if index == rf.get_last_index()+1 {
    rf.last_snapshot_point_term = rf.get_last_term()
    } else {
    rf.last_snapshot_point_term = rf.get_log_term(index)
    }

    rf.last_snapshot_point_index = index

    rf.log = tempLog
    if index > rf.commit_index {
    rf.commit_index = index
    }
    if index > rf.commit_index {
    rf.last_applied = index
    }
    DPrintf("[SnapShot]Server %d sanpshot until index %d, term %d, loglen %d", rf.me, index, rf.last_snapshot_point_term, len(rf.log)-1)
    rf.persister.SaveStateAndSnapshot(rf.persistData(), snapshot)
    }

    // InstallSnapShot RPC Handler
    func (rf *Raft) InstallSnapShot(args *InstallSnapshotArgs, reply *InstallSnapshotReply) {
    rf.mu.Lock()
    // DPrintf("[lock] sever %d get the lock",rf.me)
    if rf.cur_term > args.Term {
    reply.Term = rf.cur_term
    rf.mu.Unlock()
    return
    }

    rf.cur_term = args.Term
    reply.Term = args.Term
    if rf.state != "follower" {
    rf.change_state(TO_FOLLOWER, true)
    } else {
    rf.timeout = time.Now()
    rf.persist()
    }

    if rf.last_snapshot_point_index >= args.LastIncludeIndex {
    DPrintf("[HaveSnapShot] sever %d , lastSSPindex %d, leader's lastIncludeIndex %d", rf.me, rf.last_snapshot_point_index, args.LastIncludeIndex)
    rf.mu.Unlock()
    return
    }

    index := args.LastIncludeIndex
    tempLog := make([]Entry, 0)
    tempLog = append(tempLog, Entry{})

    for i := index + 1; i <= rf.get_last_index(); i++ {
    tempLog = append(tempLog, rf.get_entry(i))
    }

    rf.last_snapshot_point_term = args.LastIncludeTerm
    rf.last_snapshot_point_index = args.LastIncludeIndex

    rf.log = tempLog
    if index > rf.commit_index {
    rf.commit_index = index
    }
    if index > rf.last_applied {
    rf.last_applied = index
    }
    rf.persister.SaveStateAndSnapshot(rf.persistData(), args.Data)
    //rf.persist()

    msg := ApplyMsg{
    SnapshotValid: true,
    Snapshot: args.Data,
    SnapshotTerm: rf.last_snapshot_point_term,
    SnapshotIndex: rf.last_snapshot_point_index,
    }
    rf.mu.Unlock()

    rf.applyCh <- msg
    DPrintf("[FollowerInstallSnapShot]server %d installsnapshot from leader %d, index %d", rf.me, args.LeaderId, args.LastIncludeIndex)

    }

    func (rf *Raft) leaderSendSnapShot(server int) {
    rf.mu.Lock()
    DPrintf("[LeaderSendSnapShot]Leader %d (term %d) send snapshot to server %d, index %d", rf.me, rf.cur_term, server, rf.last_snapshot_point_index)
    ssArgs := InstallSnapshotArgs{
    rf.cur_term,
    rf.me,
    rf.last_snapshot_point_index,
    rf.last_snapshot_point_term,
    rf.persister.ReadSnapshot(),
    }
    ssReply := InstallSnapshotReply{}
    rf.mu.Unlock()

    re := rf.sendSnapShot(server, &ssArgs, &ssReply)

    if !re {
    DPrintf("[InstallSnapShot ERROR] Leader %d don't recive from %d", rf.me, server)
    }
    if re == true {
    rf.mu.Lock()
    if rf.state != "leader" || rf.cur_term != ssArgs.Term {
    rf.mu.Unlock()
    return
    }
    if ssReply.Term > rf.cur_term {
    rf.change_state(TO_FOLLOWER, true)
    rf.mu.Unlock()
    return
    }

    // leader更新该follower的match_index和next_index
    DPrintf("[InstallSnapShot SUCCESS] Leader %d from sever %d", rf.me, server)
    rf.match_index[server] = ssArgs.LastIncludeIndex
    rf.next_index[server] = ssArgs.LastIncludeIndex + 1

    rf.mu.Unlock()
    return
    }
    }