ShardKV Lab4B总结

MIT-6.824 Lab4B 总结与备忘

思路整理

  • Lab4B要实现分片的KeyValue服务

  • 系统规定分片数量为NShards(10),需要构建多个ShardKV服务器集群,即多个Group,每个Group负责几个分片——此时每个Group都有一个Leader

  • 整体框架基本类似于Lab3,但由于数据存储在不同片上,因此要注意实现一个Shard结构体,其中last_request_id放在其中

    1
    2
    3
    4
    5
    type Shard struct {
    Shard_id int
    Kv_data map[string]string
    Last_request_id map[int64]int // 以shard为单位记录最后一个访问该shard的id
    }
  • 由于ShardKV要和ShardCtrler交互,以获取Config,因此ShardKV结构体中需要一个ShardCtrler成员

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    type ShardKV struct {
    mu sync.Mutex
    me int
    rf *raft.Raft
    dead int32
    applyCh chan raft.ApplyMsg
    make_end func(string) *labrpc.ClientEnd
    gid int // 所属的group
    ctrlers []*labrpc.ClientEnd
    maxraftstate int // snapshot if log grows this big

    // Your definitions here.
    mck *shardctrler.Clerk // 作为shardctrler.Clerk向shardCtriler获取新的config
    ctrl_config shardctrler.Config // 存储配置
    kv_database []Shard // 以shard为单位存储
    wait_apply_ch map[int]chan Op // index of raft -> chan Op (key为一个请求在raft中的index,value为 Op通道)
    last_snapshot_log_index int
    migrating_shard [shardctrler.NShards]bool // 如果第i个shard迁移了,则相应第i个元素为true
    }
  • Client的代码与Lab3基本没有什么差别,但需要注意RPC的args在for循环里面

  • Server要完成三个工作,为此需要在StartServer时建立三个循环的goroutine

    • 处理Client发来的Get、PutAppend请求——这和Lab3非常相似,基本上只用简单改下代码即可,其中RPC handler中要检查请求的数据是否在Server所属Group的Shards中

    • 定时向ShardCtrler申请下一个Config(如果ShardCtrler的Config.Num更大),申请到之后交给Raft,在所有的组成员内同步

      • 这里无需再实现要给RPC,直接用结构体中mck的成员函数Query即可

      • 要获得的是下一个Config,而不是最新的Config,因为group要按照Config的顺序改变配置

        1
        2
        3
        4
        5
        6
        7
        8
        9
        10
        11
        12
        13
        14
        15
        16
        17
        18
        19
        20
        21
        22
        23
        24
        25
        26
        27
        func (kv *ShardKV) Get_New_Config() {
        for !kv.killed() {
        kv.mu.Lock()
        last_config_num := kv.ctrl_config.Num
        _, is_leader := kv.rf.GetState()
        kv.mu.Unlock()

        if !is_leader {
        time.Sleep(90 * time.Millisecond)
        continue
        }

        new_config := kv.mck.Query(last_config_num + 1) // 获得下一个config 注意,必须是下一个,而不是最新!即使当前的server滞后,必须按config顺序执行
        if new_config.Num == last_config_num+1 {
        kv.mu.Lock()
        op := Op{
        Operation: "newconfig",
        New_config: new_config,
        }
        if _, ifLeader := kv.rf.GetState(); ifLeader {
        kv.rf.Start(op) // 更新所有的config
        }
        kv.mu.Unlock()
        }
        time.Sleep(90 * time.Millisecond)
        }
        }
      • 在更新Config时,要注意锁的应用,同时通过kv.migrating_shard记录哪些shard需要迁移,“冻结”它们使其不可访问(设置migrating_shard相应位置为true),最后才更新服务器的config——注意这里是如何判断数据是否要迁移

        1
        2
        3
        4
        5
        6
        7
        8
        9
        10
        11
        12
        13
        14
        15
        16
        17
        18
        19
        20
        21
        22
        23
        24
        25
        26
        27
        28
        29
        func (kv *ShardKV) Execute_NewConfig(op Op) {
        kv.mu.Lock()
        defer kv.mu.Unlock()
        new_config := op.New_config
        if new_config.Num != kv.ctrl_config.Num+1 {
        return
        }

        // 更新config时,所有的迁移应当停止
        for _, is_migrating := range kv.migrating_shard {
        if is_migrating {
        return
        }
        }

        // 记录那些shard需要迁移
        for shard_id := 0; shard_id < shardctrler.NShards; shard_id++ {
        // 不在当前group且不在空闲group的shard,接下来要放入group
        if kv.ctrl_config.Shards[shard_id] != kv.gid && new_config.Shards[shard_id] == kv.gid && kv.ctrl_config.Shards[shard_id] != 0 {
        kv.migrating_shard[shard_id] = true
        }

        // 在当前group、将移到非空闲group的shard,接下来要移出group
        if kv.ctrl_config.Shards[shard_id] == kv.gid && new_config.Shards[shard_id] != kv.gid && new_config.Shards[shard_id] != 0 {
        kv.migrating_shard[shard_id] = true
        }
        }
        kv.ctrl_config = new_config
        }
    • 定时检测是否需要根据Config的变化,进行Shard的迁移

      • 如果检测到有数据冻结,需要迁移,则先数据复制出来,放到map[int][]Shard中保存,然后向各个目标group发送出去。这里发送迁出的数据即可,需要迁入的数据,其他group的leader会发送RPC过来,在Execute_Migrate中读出来即可实现数据迁入——复制数据也好,读出数据也好,都要重新make相关的map

        1
        2
        3
        4
        5
        6
        7
        8
        9
        10
        11
        12
        13
        14
        15
        16
        17
        18
        19
        20
        21
        // apply the MigrateShardData
        for _, cur_shard := range op.Migrate_data {
        if !kv.migrating_shard[cur_shard.Shard_id] {
        continue
        }
        kv.migrating_shard[cur_shard.Shard_id] = false
        kv.kv_database[cur_shard.Shard_id] = Shard{
        Shard_id: cur_shard.Shard_id,
        Kv_data: make(map[string]string),
        Last_request_id: make(map[int64]int),
        }
        // 复制数据给自己
        if cur_config.Shards[cur_shard.Shard_id] == kv.gid {
        for key, value := range cur_shard.Kv_data {
        kv.kv_database[cur_shard.Shard_id].Kv_data[key] = value
        }
        for client_id, request_id := range cur_shard.Last_request_id {
        kv.kv_database[cur_shard.Shard_id].Last_request_id[client_id] = request_id
        }
        }
        }
      • 以RPC的方式发送,因此需要建立相关的args和reply结构体

        1
        2
        3
        4
        5
        6
        7
        8
        9
        type MigrateShardArgs struct {
        Migrate_data []Shard
        Config_num int
        }

        type MigrateShardReply struct {
        Err Err
        Config_num int
        }
      • 处理RPC的时候,要检查args的confignum是否一致,一致时才接受,交由Raft进行同步

  • 要时刻记住Config存储了全局的配置信息,以及要知道如何根据映射关系,找到自己需要的数据

  • 加锁和解锁!

关键代码

  • Client的RPC代码(以PutAppend为例):

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    func (ck *Clerk) PutAppend(key string, value string, op string) {
    ck.Request_id += 1

    for {
    args := PutAppendArgs{
    Key: key,
    Value: value,
    Op: op,
    Client_id: ck.Client_id,
    Request_id: ck.Request_id,
    Config_num: ck.config.Num,
    }
    shard := key2shard(key)
    gid := ck.config.Shards[shard]
    if servers, ok := ck.config.Groups[gid]; ok {
    for si := 0; si < len(servers); si++ {
    srv := ck.make_end(servers[si])
    var reply PutAppendReply
    ok := srv.Call("ShardKV.PutAppend", &args, &reply)
    if ok && reply.Err == OK {
    return
    }
    if ok && reply.Err == ErrWrongGroup {
    break
    }
    // ... not ok, or ErrWrongLeader
    }
    }
    time.Sleep(100 * time.Millisecond)
    // ask controler for the latest configuration.
    ck.config = ck.sm.Query(-1)
    }
    }
  • 迁移数据:

    • 检测是否存在数据要迁移

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      31
      32
      33
      34
      35
      36
      37
      38
      39
      40
      41
      42
      43
      44
      45
      46
      47
      48
      49
      50
      51
      52
      53
      54
      55
      56
      57
      58
      59
      60
      61
      62
      63
      64
      65
      66
      67
      68
      69
      70
      71
      72
      73
      74
      75
      76
      77
      78
      79
      80
      81
      82
      83
      84
      85
      86
      87
      88
      89
      90
      91
      92
      93
      94
      95
      96
      97
      98
      99
      100
      101
      102
      103
      104
      105
      106
      107
      108
      109
      110
      func (kv *ShardKV) Detect_Migrate_Data() {
      for !kv.killed() {
      kv.mu.Lock()
      _, is_leader := kv.rf.GetState()
      kv.mu.Unlock()

      if !is_leader {
      time.Sleep(120 * time.Millisecond)
      continue
      }

      kv.mu.Lock()
      no_migrate := true
      for _, need_migrate := range kv.migrating_shard {
      if need_migrate {
      no_migrate = false
      break
      }
      }
      kv.mu.Unlock()

      if no_migrate {
      time.Sleep(120 * time.Millisecond)
      continue
      }

      kv.mu.Lock()
      send_data := make(map[int][]Shard) // target group: shard list
      for shard_id := 0; shard_id < shardctrler.NShards; shard_id++ { // 记录发送出去的shard
      cur_group := kv.ctrl_config.Shards[shard_id]
      if kv.migrating_shard[shard_id] && kv.gid != cur_group { // 这些数据要送出去(shard被冻结,并且shard所属的group和kvserver不同。注意,这里的config已经是最新的了)
      tmp_shard := Shard{
      Shard_id: shard_id,
      Kv_data: make(map[string]string),
      Last_request_id: make(map[int64]int),
      }
      // 将要发送的数据复制出来
      for key, value := range kv.kv_database[shard_id].Kv_data {
      tmp_shard.Kv_data[key] = value
      }
      for client_id, request_id := range kv.kv_database[shard_id].Last_request_id {
      tmp_shard.Last_request_id[client_id] = request_id
      }
      send_data[cur_group] = append(send_data[cur_group], tmp_shard)
      }
      }
      kv.mu.Unlock()
      if len(send_data) == 0 { // 如果没有要送出去的数据
      time.Sleep(120 * time.Millisecond)
      continue
      }

      //Send Data
      for target_gid, shards_to_send := range send_data {
      kv.mu.Lock()
      target_servers := kv.ctrl_config.Groups[target_gid]
      args := &MigrateShardArgs{
      Migrate_data: make([]Shard, 0),
      Config_num: kv.ctrl_config.Num,
      }
      kv.mu.Unlock()

      for _, cur_shard := range shards_to_send {
      tmp_shard := Shard{
      Shard_id: cur_shard.Shard_id,
      Kv_data: make(map[string]string),
      Last_request_id: make(map[int64]int),
      }
      // 将要发送的数据装进来
      for key, value := range cur_shard.Kv_data {
      tmp_shard.Kv_data[key] = value
      }
      for client_id, request_id := range cur_shard.Last_request_id {
      tmp_shard.Last_request_id[client_id] = request_id
      }
      args.Migrate_data = append(args.Migrate_data, tmp_shard)
      }

      // 分别向不同的group发送shard list
      go func(groupServers []string, args *MigrateShardArgs) {
      for _, groupMember := range groupServers {
      callEnd := kv.make_end(groupMember)
      migrateReply := MigrateShardReply{}
      ok := callEnd.Call("ShardKV.MigrateShard", args, &migrateReply)

      kv.mu.Lock()
      cur_config_num := kv.ctrl_config.Num
      kv.mu.Unlock()

      if ok && migrateReply.Err == OK {
      // Send to Raft : I'have Send my Components out
      if cur_config_num != args.Config_num || kv.check_shard_been_migrated(args.Migrate_data) {
      return
      } else {
      kv.rf.Start(
      Op{
      Operation: "migrate",
      Migrate_data: args.Migrate_data,
      Config_num_migrate: args.Config_num,
      },
      )
      return
      }
      }
      }
      }(target_servers, args)
      }
      time.Sleep(120 * time.Millisecond)
      }
      }
    • RPC handler:和Get、PutAppend的RPC handler相似,但增加一个检查,检测要migrate的shard是否已经迁移完成

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      31
      32
      33
      34
      35
      36
      37
      38
      39
      40
      41
      42
      43
      44
      45
      46
      47
      48
      49
      50
      51
      52
      53
      54
      55
      56
      57
      58
      59
      func (kv *ShardKV) MigrateShard(args *MigrateShardArgs, reply *MigrateShardReply) {
      kv.mu.Lock()
      cur_config_num := kv.ctrl_config.Num
      kv.mu.Unlock()

      // 收到RPC时,kvserver的config必须和发送的config时间一致
      if args.Config_num > cur_config_num { // 当前的config滞后,可能需要先更新数据,因此不迁移
      reply.Err = ErrConfigNum
      reply.Config_num = cur_config_num
      return
      }
      if args.Config_num < cur_config_num || kv.check_shard_been_migrated(args.Migrate_data) {
      reply.Err = OK
      return
      }

      op := Op{
      Operation: "migrate",
      Migrate_data: args.Migrate_data,
      Config_num_migrate: args.Config_num,
      }
      raft_log_index, _, _ := kv.rf.Start(op)

      kv.mu.Lock()
      channel, exist := kv.wait_apply_ch[raft_log_index]
      if !exist {
      kv.wait_apply_ch[raft_log_index] = make(chan Op, 1)
      channel = kv.wait_apply_ch[raft_log_index]
      }
      kv.mu.Unlock()
      select {
      case <-time.After(time.Millisecond * 500):
      kv.mu.Lock()
      _, is_leader := kv.rf.GetState()
      cur_config_num := kv.ctrl_config.Num
      kv.mu.Unlock()

      if args.Config_num <= cur_config_num && kv.check_shard_been_migrated(args.Migrate_data) && is_leader {
      reply.Config_num = cur_config_num
      reply.Err = OK
      } else {
      reply.Err = ErrWrongLeader
      }
      case op_committed := <-channel:
      kv.mu.Lock()
      cur_config_num := kv.ctrl_config.Num
      kv.mu.Unlock()
      if op_committed.Config_num_migrate == args.Config_num && args.Config_num <= cur_config_num && kv.check_shard_been_migrated(args.Migrate_data) {
      reply.Config_num = cur_config_num
      reply.Err = OK
      } else {
      reply.Err = ErrWrongLeader
      }
      }

      kv.mu.Lock()
      delete(kv.wait_apply_ch, raft_log_index)
      kv.mu.Unlock()
      }
    • 收到迁移命令后,将数据复制出来

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      31
      func (kv *ShardKV) Execute_Migrate(op Op) {
      kv.mu.Lock()
      defer kv.mu.Unlock()

      cur_config := kv.ctrl_config
      if op.Config_num_migrate != cur_config.Num {
      return
      }

      // apply the MigrateShardData
      for _, cur_shard := range op.Migrate_data {
      if !kv.migrating_shard[cur_shard.Shard_id] {
      continue
      }
      kv.migrating_shard[cur_shard.Shard_id] = false
      kv.kv_database[cur_shard.Shard_id] = Shard{
      Shard_id: cur_shard.Shard_id,
      Kv_data: make(map[string]string),
      Last_request_id: make(map[int64]int),
      }
      // 复制数据给自己
      if cur_config.Shards[cur_shard.Shard_id] == kv.gid {
      for key, value := range cur_shard.Kv_data {
      kv.kv_database[cur_shard.Shard_id].Kv_data[key] = value
      }
      for client_id, request_id := range cur_shard.Last_request_id {
      kv.kv_database[cur_shard.Shard_id].Last_request_id[client_id] = request_id
      }
      }
      }
      }