KVRaft Lab3A总结

MIT-6.824 Lab3A 总结与备忘

思路整理

  • Lab3实现一个基于Raft的key/value数据库

  • 每个KVServer对应一个Raft peer,KVServer通过Raft服务达成共识(因此KVServer结构体中的一个字段为raft),Client向任意一个KVServer发送请求(Get、Put、Append),如果该KVServer为Leader,则响应请求——如果是ZooKeeper,则任意副本都可以响应Get,但这里Lab要求强一致性,因此必须都由Leader响应。为了标识唯一的请求,并且KVServer要明白请求来自哪个客户端,需要给每个客户端分配唯一的ID(用nrand()生成,实际生产环境下会用IP:PORT的方法去定位具体的Client),并且请求用唯一的id和客户端id共同确定

    • 请求(Op)结构体为:

      1
      2
      3
      4
      5
      6
      7
      8
      type Op struct {
      // Field names must start with capital letters
      Operation string // "get" "put" "append"
      Key string
      Value string
      Client_id int64 // 发送Op的client(一个请求的id和发送请求的client共同确定一个唯一性的请求)
      Request_id int // 同一个请求要有唯一的id
      }
    • KVServer的结构体为:

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      type KVServer struct {
      mu sync.Mutex
      me int
      rf *raft.Raft // 每个KVServer对应一个Raft peer node(将raft作为一个服务)
      applyCh chan raft.ApplyMsg
      dead int32 // set by Kill()

      maxraftstate int // snapshot if log grows this big

      // Your definitions here.
      kv_database map[string]string // 存储key value的数据库
      wait_apply_ch map[int]chan Op // index of raft -> chan Op (key为一个请求在raft中的index,value为Op通道)
      last_request_id map[int64]int // key:client id, value: request id
      last_snapshot_log_index int // last SnapShot point , raftIndex
      }
    • Client结构体为:

      1
      2
      3
      4
      5
      6
      7
      type Clerk struct {
      servers []*labrpc.ClientEnd // kvserver
      // You will have to modify this struct.
      Client_id int64 // 该客户端的id
      Request_id int // 上一个请求的id,每次发送请求时,id自增
      Leader_id int // 标识上一次通信的kvserver leader
      }
  • 客户端发送一个请求的流程如下:

    • 建立多个KVServer,每个Server会在goroutine ApplyLoop中等待到来的ApplyCh的请求

    • Client发送一个请求:

      • 自增Request_id,得到当前请求的id

      • 构造RPC的args和reply。Put和Append RPC相似(都要修改数据库),可以合并为一类,Get单独为一类

        • 无论哪种RPC,都要给定请求的id和Client的id

        • Put和Append要额外指定Key和Value

        • PutAppend:

          1
          2
          3
          4
          5
          6
          7
          8
          9
          10
          11
          12
          13
          14
          15
          type Err string
          type PutAppendArgs struct {
          Key string
          Value string
          Op string // "Put" or "Append"
          // You'll have to add definitions here.
          // Field names must start with capital letters,
          // otherwise RPC will break.
          Client_id int64
          Request_id int
          }

          type PutAppendReply struct {
          Err Err
          }
        • Get:

          1
          2
          3
          4
          5
          6
          7
          8
          9
          10
          11
          type GetArgs struct {
          Key string
          // You'll have to add definitions here.
          Client_id int64
          Request_id int
          }

          type GetReply struct {
          Err Err
          Value string
          }
      • 发送给Leader_id对应的KVServer,如果该Server不是Leader,则它在RPC handler里会直接返回,reply的Err字段为”ErrWrongLeader”,此时Client要重新选择一个KVServer,再次发送请求

    • 请求由RPC handler处理(Get RPC handler和PutAppend RPC handler)——记住,处理该RPC的KVServer是leader,这很重要

      • RPC handler首先判断KVServer是否存活,以及自己是不是leader,如果挂了或者不是leader则直接返回
      • 通过raft.Start() 提交Op给Raft,开启一个Wait Channel,等待Raft返回给自己请求的结果
        • 底层共识达成时,Raft所有peer的ApplyCh会收到该请求(Op),此时所有peer的log已经达成一致
        • 各个Server执行该Op:
          • 如果该Op重复了(根据Cilent_id和Request_id字段决定),则Put、Append不必再执行
          • Get不执行
        • 如果有Wait Channel在等待,则返回结果给Wait Channel——但只有Leader的WaitChannel才真正返回数据给Client
      • RPC handler(Leader的handler)通过Wait Channel接受结果,执行Get操作并填充Reply结构体,或者直接填充Reply结构体。如果超时,则返回结果(可以认为是ErrWrongLeader,让Client按逻辑重发)——超时的处理用Select关键字
    • Client收到结果,如果为ErrWrongLeader,则自增Leader_id,重发,否则返回相应的Value等

  • Raft的heartbeat interval对3A的TestSpeed3A有一定的影响,需要调整好这个参数

关键代码

  • 建立KVServer,开启goroutine,循环检测是否收到ApplyMsg

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    func StartKVServer(servers []*labrpc.ClientEnd, me int, persister *raft.Persister, maxraftstate int) *KVServer {
    // call labgob.Register on structures you want
    // Go's RPC library to marshall/unmarshall.
    labgob.Register(Op{})

    kv := new(KVServer)
    kv.me = me
    kv.maxraftstate = maxraftstate

    // You may need initialization code here.

    kv.applyCh = make(chan raft.ApplyMsg)
    kv.rf = raft.Make(servers, me, persister, kv.applyCh)

    // You may need initialization code here.
    kv.kv_database = make(map[string]string)
    kv.last_request_id = make(map[int64]int)
    kv.wait_apply_ch = make(map[int]chan Op)

    // TODO: 载入snapshot

    go kv.Read_Raft_Apply_Command()

    return kv
    }
  • Client发送请求:

    • Get:

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      func (ck *Clerk) Get(key string) string {
      // You will have to modify this function.
      ck.Request_id += 1
      leader_id := ck.Leader_id // 任意的server都行(任意副本都能返回Get),但为了一致性,必须call leader

      for {
      args := GetArgs{
      Key: key,
      Client_id: ck.Client_id,
      Request_id: ck.Request_id,
      }
      reply := GetReply{}

      ok := ck.servers[leader_id].Call("KVServer.Get", &args, &reply)
      if !ok || reply.Err == ErrWrongLeader { // 如果出现问题,则切换询问对象
      leader_id = (leader_id + 1) % len(ck.servers)
      continue
      }
      if reply.Err == ErrNoKey { // 没有key
      return ""
      }
      if reply.Err == OK { // 获得value
      ck.Leader_id = leader_id
      return reply.Value
      }
      }
      }
    • PutAppend:

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      func (ck *Clerk) PutAppend(key string, value string, op string) {
      // You will have to modify this function.
      ck.Request_id += 1
      leader_id := ck.Leader_id // 任意的server都行(任意副本都能返回Get)

      for {
      args := PutAppendArgs{
      Key: key,
      Value: value,
      Op: op,
      Client_id: ck.Client_id,
      Request_id: ck.Request_id,
      }
      reply := PutAppendReply{}

      ok := ck.servers[leader_id].Call("KVServer.PutAppend", &args, &reply)
      if !ok || reply.Err == ErrWrongLeader { // 如果出现问题,则切换询问对象
      leader_id = (leader_id + 1) % len(ck.servers)
      continue
      }
      if reply.Err == OK { // 获得value
      ck.Leader_id = leader_id
      return
      }
      }
      }
  • Leader的RPC handler处理请求:超时的处理,用到了Select

    • Get请求的处理

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      31
      32
      33
      34
      35
      36
      37
      38
      39
      40
      41
      42
      43
      44
      45
      46
      47
      48
      49
      50
      51
      52
      53
      54
      55
      56
      57
      58
      59
      60
      61
      62
      63
      64
      65
      66
      67
      68
      func (kv *KVServer) Get(args *GetArgs, reply *GetReply) {
      if kv.killed() {
      reply.Err = ErrWrongLeader
      return
      }
      _, is_leader := kv.rf.GetState()
      if !is_leader { // 被client RPC调用的该kvserver不是leader,则数据不一定是最新的,因此reply不为ok
      reply.Err = ErrWrongLeader
      return
      }

      // 根据args构建Op,由kv.rf.Start调用
      op := Op{
      Operation: "get",
      Key: args.Key,
      Value: "",
      Client_id: args.Client_id,
      Request_id: args.Request_id,
      }
      raft_log_index, _, _ := kv.rf.Start(op)

      // 构建wait channel
      kv.mu.Lock()
      wait_ch, exist := kv.wait_apply_ch[raft_log_index]
      if !exist {
      kv.wait_apply_ch[raft_log_index] = make(chan Op, 1)
      wait_ch = kv.wait_apply_ch[raft_log_index]
      }
      kv.mu.Unlock()

      select {
      // select 随机执行一个可运行的 case。如果没有 case 可运行,它将阻塞,直到有 case 可运行
      // case 必须是一个通信操作,要么是发送要么是接收
      case <-time.After(time.Millisecond * 500): // 超时
      _, is_leader := kv.rf.GetState()
      if kv.Is_Duplicate(op.Client_id, op.Request_id) && is_leader {
      value, exist := kv.Execute_Get(op) // Get_Command_From_Raft没有执行Get,因此这里要主动Get
      if exist {
      reply.Err = OK
      reply.Value = value
      } else {
      reply.Err = ErrNoKey
      reply.Value = ""
      }
      } else {
      reply.Err = ErrWrongLeader
      }

      case op_committed := <-wait_ch: // 此时,该kvserver运行了Get_Command_From_Raft().Send_Wait_Chan()
      if op_committed.Client_id == op.Client_id && op_committed.Request_id == op.Request_id {
      value, exist := kv.Execute_Get(op)
      if exist {
      reply.Err = OK
      reply.Value = value
      } else {
      reply.Err = ErrNoKey
      reply.Value = ""
      }
      } else {
      reply.Err = ErrWrongLeader
      }
      }

      kv.mu.Lock()
      delete(kv.wait_apply_ch, raft_log_index)
      kv.mu.Unlock()
      return
      }
    • Put和Append请求的处理

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      31
      32
      33
      34
      35
      36
      37
      38
      39
      40
      41
      42
      43
      44
      45
      46
      47
      48
      49
      50
      51
      52
      53
      54
      func (kv *KVServer) PutAppend(args *PutAppendArgs, reply *PutAppendReply) {
      if kv.killed() {
      reply.Err = ErrWrongLeader
      return
      }
      _, is_leader := kv.rf.GetState()
      if !is_leader { // 被client RPC调用的该kvserver不是leader,则数据不一定是最新的,因此reply不为ok
      reply.Err = ErrWrongLeader
      return
      }

      // 根据args构建Op,由kv.rf.Start调用
      op := Op{
      Operation: args.Op,
      Key: args.Key,
      Value: args.Value,
      Client_id: args.Client_id,
      Request_id: args.Request_id,
      }
      raft_log_index, _, _ := kv.rf.Start(op)

      // 构建wait channel
      kv.mu.Lock()
      wait_ch, exist := kv.wait_apply_ch[raft_log_index]
      if !exist {
      kv.wait_apply_ch[raft_log_index] = make(chan Op, 1)
      wait_ch = kv.wait_apply_ch[raft_log_index]
      }
      kv.mu.Unlock()

      select {
      // select 随机执行一个可运行的 case。如果没有 case 可运行,它将阻塞,直到有 case 可运行
      // case 必须是一个通信操作,要么是发送要么是接收
      case <-time.After(time.Millisecond * 500): // 超时
      _, is_leader := kv.rf.GetState()
      if kv.Is_Duplicate(op.Client_id, op.Request_id) && is_leader {
      reply.Err = OK
      } else {
      reply.Err = ErrWrongLeader
      }

      case op_committed := <-wait_ch: // 此时,该kvserver运行了Get_Command_From_Raft().Send_Wait_Chan(),收到了
      if op_committed.Client_id == op.Client_id && op_committed.Request_id == op.Request_id {
      reply.Err = OK
      } else {
      reply.Err = ErrWrongLeader
      }
      }

      kv.mu.Lock()
      delete(kv.wait_apply_ch, raft_log_index)
      kv.mu.Unlock()
      return
      }
  • KVServer(各个peer)检测是否收到ApplyMsg,并执行Get和Append

    • 检测是否收到ApplyMsg

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      31
      32
      func (kv *KVServer) Read_Raft_Apply_Command() {
      for message := range kv.applyCh {
      // listen to every command applied by its raft ,delivery to relative RPC Handler
      if message.CommandValid {
      kv.Get_Command_From_Raft(message)
      }
      if message.SnapshotValid {
      fmt.Println("not implemented")
      }
      }
      }

      func (kv *KVServer) Get_Command_From_Raft(message raft.ApplyMsg) {
      op := message.Command.(Op)

      if !kv.Is_Duplicate(op.Client_id, op.Request_id) { // 判断这个request(op)是否重复
      // 执行put和append,保证数据库内容一致,get对于非leader的peer来说则无需执行,leader在相应RPC handler执行Get即可
      switch op.Operation {
      case "Put":
      kv.Execute_Put(op)
      case "Append":
      kv.Execute_Append(op)
      }
      }

      if kv.maxraftstate != -1 {
      //TODO 判断是否要snapshot
      }

      kv.Send_Wait_Chan(op, message.CommandIndex)
      // Server(leader)有一个WaitChannel,等待Raft leader返回给自己Request结果
      }
    • 检测Op是否重复

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      func (kv *KVServer) Is_Duplicate(client_id int64, request_id int) bool {
      kv.mu.Lock()
      defer kv.mu.Unlock()

      last_request_id, exist := kv.last_request_id[client_id]
      if !exist {
      return false // 如果kv的记录中不存在这个client id,说明这个request一定不会重复
      }
      // 如果kv的记录中存在这个client id,那么这个client保存的最后一个requestid要小于当前request,才不会重复
      return last_request_id >= request_id
      }
    • 执行Put

      1
      2
      3
      4
      5
      6
      7
      8
      9
      func (kv *KVServer) Execute_Put(op Op) (string, bool) {
      kv.mu.Lock()
      defer kv.mu.Unlock()

      kv.kv_database[op.Key] = op.Value // 更新数据库的value
      kv.last_request_id[op.Client_id] = op.Request_id

      return kv.kv_database[op.Key], true
      }
    • 执行Append

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      func (kv *KVServer) Execute_Append(op Op) (string, bool) {
      kv.mu.Lock()
      defer kv.mu.Unlock()

      value, exist := kv.kv_database[op.Key]
      if exist {
      kv.kv_database[op.Key] = value + op.Value
      } else {
      kv.kv_database[op.Key] = op.Value
      }
      kv.last_request_id[op.Client_id] = op.Request_id

      return kv.kv_database[op.Key], exist
      }
    • 发送到Wait Channel

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      func (kv *KVServer) Send_Wait_Chan(op Op, command_index int) bool {
      kv.mu.Lock()
      defer kv.mu.Unlock()

      channel, exist := kv.wait_apply_ch[command_index]
      if exist {
      channel <- op
      }
      return exist
      }
  • Get的执行

    1
    2
    3
    4
    5
    6
    7
    8
    9
    func (kv *KVServer) Execute_Get(op Op) (string, bool) {
    kv.mu.Lock()
    defer kv.mu.Unlock()

    value, exist := kv.kv_database[op.Key]
    kv.last_request_id[op.Client_id] = op.Request_id // kvserver更新request的记录(对于某个client id)

    return value, exist
    }