KVRaft Lab3B总结

MIT-6.824 Lab3B 总结与备忘

思路整理

  • RateState(currentTerm, voteFor, logEntries 和你要自己存储的状态)。 Snapshot (Server 维护的KeyValue数据库,其实就是内存中一个map, 和Server维护的关于每个Client Request的状态)。

  • 对于Leader:

    • Leader在处理Op时,如果发现RaftStateSize高于阈值,则将自身的kv_databaselast_request_id制作成snapshot, 并调用Leader Raft的Snapshot() ,将快照存储起来
    • 该接口安装Snapshot分为两步:修剪log Entries[] 、通过Persister持久化存储,之后Leader在Appendentries中将本次的SnapShot信息发送给落后的Follower
  • 对于Follower:

    • Raft Leader在处理数据的时候,如果发现某个follower leader.next_index[follower_id] - 1 < rf.last_snapshot_point_index,则表明该follower落后,Leader将snapshot发送给follower

    • follower的InstallSnapshotraft/snapshot.go)处理Leader的snapshot RPC,获取snapshot数据并裁剪log,通过ApplyCh设置相关数据SnapshotValidtrue,并载入rf.applyCh

      1
      2
      3
      4
      5
      6
      7
      8
      msg := ApplyMsg{
      SnapshotValid: true,
      Snapshot: args.Data,
      SnapshotTerm: rf.last_snapshot_point_term,
      SnapshotIndex: rf.last_snapshot_point_index,
      }
      // ...
      rf.applyCh <- msg
    • Read_Raft_Apply_Command()此时检测到kv.applyCh中有数据存在,然后发现message.SnapshotValid的时候,读取SnapShot中的数据,以实现数据的一致性

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      func (kv *KVServer) ReadSnapShotToInstall(snapshot []byte) {
      if snapshot == nil || len(snapshot) < 1 { // bootstrap without any state?
      return
      }

      r := bytes.NewBuffer(snapshot)
      d := labgob.NewDecoder(r)

      var persist_kvdb map[string]string
      var persist_lastRequestId map[int64]int

      if d.Decode(&persist_kvdb) != nil || d.Decode(&persist_lastRequestId) != nil {
      DPrintf("KVSERVER %d read persister got a problem!!!!!!!!!!", kv.me)
      } else {
      kv.kv_database = persist_kvdb
      kv.last_request_id = persist_lastRequestId
      }
      }
  • 这里和Lab2的关联在于,KVServer检测到raft state size比较大时,主动制作snapshot,此时data是key/value数据库,以及last_request_id

  • KVServer在初始化时,需要检查是否有Snapshot,如果有就恢复出来

  • KVServer执行Op时需要检查从applyCh取出的message的message.CommandIndex是否大于 kv.last_snapshot_log_index如果小于或等于,说明该Op已经被执行过,则直接return

关键代码

  • KVServer恢复:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    func StartKVServer(servers []*labrpc.ClientEnd, me int, persister *raft.Persister, maxraftstate int) *KVServer {
    // ...
    snapshot := persister.ReadSnapshot()
    if len(snapshot) > 0 {
    kv.ReadSnapShotToInstall(snapshot)
    }

    go kv.Read_Raft_Apply_Command()

    return kv
    }
  • KVServer检查message是否已经被执行

    1
    2
    3
    4
    5
    6
    7
    8
    func (kv *KVServer) Get_Command_From_Raft(message raft.ApplyMsg) {
    op := message.Command.(Op)

    if message.CommandIndex <= kv.last_snapshot_log_index { // 当前命令已经被snapshot了
    return
    }
    // ...
    }
  • Leader主动制作Snapshot:(第二个代码块和Lab2D的一部分代码相同,只不过为了理解方便重新复制到这里)

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    func (kv *KVServer) Get_Command_From_Raft(message raft.ApplyMsg) {
    // ...
    if kv.maxraftstate != -1 {
    kv.IfNeedToSendSnapShotCommand(message.CommandIndex, 9)
    }

    kv.Send_Wait_Chan(op, message.CommandIndex)
    }
    // ...
    func (kv *KVServer) IfNeedToSendSnapShotCommand(raftIndex int, proportion int) {
    if kv.rf.GetRaftStateSize() > (kv.maxraftstate * proportion / 10) {
    // Send SnapShot Command
    snapshot := kv.MakeSnapShot()
    kv.rf.Snapshot(raftIndex, snapshot)
    }
    }

    // Give it to raft when server decide to start a snapshot
    func (kv *KVServer) MakeSnapShot() []byte {
    kv.mu.Lock()
    defer kv.mu.Unlock()
    w := new(bytes.Buffer)
    e := labgob.NewEncoder(w)
    e.Encode(kv.kv_database)
    e.Encode(kv.last_request_id)
    data := w.Bytes()
    return data
    }
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    // raft/snapshot.go
    func (rf *Raft) Snapshot(index int, snapshot []byte) {
    // Your code here (2D).
    rf.mu.Lock()
    defer rf.mu.Unlock()
    if rf.last_snapshot_point_index >= index || index > rf.commit_index {
    return
    }
    // snapshot the entrier form 1:index(global)
    tempLog := make([]Entry, 0)
    tempLog = append(tempLog, Entry{})

    for i := index + 1; i <= rf.get_last_index(); i++ {
    tempLog = append(tempLog, rf.get_entry(i))
    }

    if index == rf.get_last_index()+1 {
    rf.last_snapshot_point_term = rf.get_last_term()
    } else {
    rf.last_snapshot_point_term = rf.get_log_term(index)
    }

    rf.last_snapshot_point_index = index

    rf.log = tempLog
    if index > rf.commit_index {
    rf.commit_index = index
    }
    if index > rf.commit_index {
    rf.last_applied = index
    }
    DPrintf("[SnapShot]Server %d sanpshot until index %d, term %d, loglen %d", rf.me, index, rf.last_snapshot_point_term, len(rf.log)-1)
    rf.persister.SaveStateAndSnapshot(rf.persistData(), snapshot)
    }
  • follower加载Snapshot:(不是RPC!是主动加载)

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    func (kv *KVServer) Read_Raft_Apply_Command() {
    for message := range kv.applyCh {
    // ...
    if message.SnapshotValid {
    kv.GetSnapShotFromRaft(message)
    }
    }
    }

    // ...

    func (kv *KVServer) ReadSnapShotToInstall(snapshot []byte) {
    if snapshot == nil || len(snapshot) < 1 { // bootstrap without any state?
    return
    }

    r := bytes.NewBuffer(snapshot)
    d := labgob.NewDecoder(r)

    var persist_kvdb map[string]string
    var persist_lastRequestId map[int64]int

    if d.Decode(&persist_kvdb) != nil || d.Decode(&persist_lastRequestId) != nil {
    DPrintf("KVSERVER %d read persister got a problem!!!!!!!!!!", kv.me)
    } else {
    kv.kv_database = persist_kvdb
    kv.last_request_id = persist_lastRequestId
    }
    }