MapReduce Lab1总结

MIT-6.824 Lab1 总结与备忘

思路整理

  • 整个mapreduce框架分为两个大的阶段:map和reduce。master在被启动后,最外层一直周期性调用done(),当其返回值为true时说明job完成。
  • 两个阶段中,master首先需要等worker先把所有的map任务完成——这里,一个worker做一个map任务,即对一个输入文件做map任务,有files有多少个文件,master就要分配几次map任务——然后再进入reduce阶段,reduce阶段要分配reduce任务的次数为“超参数”nReduce。等所有的reduce任务结束,master的状态转为特殊的结束状态,此时最外层再调用done()时,返回值为true,结束master进程。本条分析表明:
    • 要实现两种task结构体
    • 要用两个列表存储map task和reduce task
    • master类中的成员应当包括:map的任务数、reduce的任务数、map task列表、reduce task列表、master当前所处的阶段
    • master在初始化时就要建立两个列表
    • task结构体里要有一个flag,标志其是否完成,每当一个worker传回finish信号时,将该worker的task标记为已完成
  • 在做map task或reduce task时,master和worker(任意一个worker)的通信过程和处理过程为:
    • 一个worker启动后,通过RPC来主动申请任务(call)。因此master需要实现一个成员函数,用来处理worker的申请,根据master所处的阶段,以及当前阶段任务的状态(还是否存在没有被分配的任务)来决定是否向这个worker分配任务。本条分析表明:
      • master要实现:func (m *Master) Ask_for_task(args *Ask_args, reply *Ask_reply) error
      • task结构体包含:
        • task号码:task_index
        • 是否分配的标志distributed
        • 该task被分配给哪个worker:worker_id
      • worker的ask结构体包含:
        • 自己的id(这里以进程号唯一标识一个worker)
      • worker的reply结构体(master直接操作reply的内容,worker将)包含:(这里,将map task的reply和reduce task的reply合并为一个通用的结构体)
        • map任务对应的文件名file_name
        • reduce任务对应的文件列表intermediate_files
        • nReduce
        • 和master所处阶段一致的任务类型task_type
        • 任务结束时间(用于超时判定)
        • 任务编号task_index
    • 当worker被分配任务后,worker根据reply的内容决定执行什么任务。如果为map task,则读取map task对应的文件并调用用户定义的map函数,将处理结果存储为中间文件mr-map_task_id-x(x取值范围为0~nReduce-1)。如果为reduce task,则根据任务的id,读取中间文件mr-x-reduce_task_id(x取值范围为0~nMap-1,nMap为map task的总数)。而当任务完成时,向master报告“任务已完成”。本条分析表明:
      • worker要实现:
        • 申请任务-报告任务-申请任务-报告任务的死循环
        • 根据task_type做不同的任务
        • task_typedone或者申请任务的返回值为false时,跳出死循环,结束该worker进程
      • 报告结构体finish_args要实现:
        • 记录该项已完成任务的类型task_type
        • 记录该项已完成任务的idtask_index,它和reply结构体的task_index应当是相同的
        • 记录发送该报告的worker id(即该worker的进程号)
      • 报告回复结构体finish_reply为空,因为worker报告后,master不需要再回复它,worker自行进入下一个申请任务-报告任务的循环
    • master收到完成的任务时,根据master所处的阶段能够判别该完成的task是什么类型,于是master根据finish_replytask_index从相应的task列表中找到这个task,标记该task已完成,并检查该列表中是否所有task都已经完成,如果都完成则master进入下一个阶段。本条分析表明:
      • master要实现成员函数func (m *Master) Finish(args *Finish_args, reply *Finish_reply) error
      • master要根据所处的状态,分别检查不同的task列表
      • task结构体包含一个标志本任务是否完成的flagtask_done,初始化时为false
  • 在以上过程里,master要随时关注分配出去的task运行时间,因此master需要另起一个协程,以死循环的方式每隔一小段时间就检查,当前时间是否在分配出去task的deadline之后。本条分析表明:
    • master在分配任务时设置task的deadline
    • task结构体包含:
      • 记录task最晚结束时间的deadline
    • 需要实现一个内含死循环的check(),并在makeMaster()go check()调用

关键实现

master

  • master结构体

    1
    2
    3
    4
    5
    6
    7
    8
    type Master struct {
    lock sync.Mutex
    num_map int // 输入文件数目
    num_reduce int
    cur_stage string //记录当前的工作状态(map/reduce)
    map_task []Map_task // map task
    reduce_task []Reduce_task //
    }
  • master.Ask_for_task(args *Ask_args, reply *Ask_reply) error:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    m.lock.Lock()
    defer m.lock.Unlock()

    reply.Task_op = 0

    if m.cur_stage == "map" {
    // fmt.Printf("Master current stage: %v\n", m.cur_stage)
    for i, temp_task := range m.map_task {
    if !temp_task.Distributed { // map task没有分配出去
    m.map_task[i].Distributed = true
    m.map_task[i].Deadline = time.Now().Add(10 * time.Second)
    m.map_task[i].Worker_id = args.Worker_id

    reply.File_name = temp_task.File_name
    reply.Num_reduce = m.num_reduce
    reply.Task_type = "map"
    reply.Task_index = temp_task.Index
    reply.Task_deadline = m.map_task[i].Deadline
    reply.Worker_id = args.Worker_id
    break
    }
    }
    } else if m.cur_stage == "reduce" { // m.cur_stage == "reduce"
    // fmt.Println("Master current stage: " + m.cur_stage)
    for i, temp_task := range m.reduce_task {
    if !temp_task.Distributed { // map task没有分配出去
    m.reduce_task[i].Distributed = true
    m.reduce_task[i].Deadline = time.Now().Add(10 * time.Second)
    m.reduce_task[i].Worker_id = args.Worker_id

    reply.File_name = ""
    reply.Num_reduce = m.num_reduce
    reply.Task_type = "reduce"
    reply.Task_index = temp_task.Index
    reply.Task_deadline = m.reduce_task[i].Deadline
    reply.Worker_id = args.Worker_id

    reply.Intermediate_files = nil // 中间文件名mr-X-Y,其中X是map任务号,Y是reduce任务
    for map_index := 0; map_index < m.num_map; map_index++ {
    reply.Intermediate_files = append(reply.Intermediate_files, fmt.Sprintf("mr-%d-%d", map_index, temp_task.Index))
    }
    break
    }
    }
    }
  • master.Finish(args *Finish_args, reply *Finish_reply) error

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    m.lock.Lock()
    defer m.lock.Unlock()

    if m.cur_stage == "map" {
    // fmt.Println("A map task finished")
    if args.Task_type != "map" {
    fmt.Printf("Expect map type finish message, got %s type", args.Task_type)
    return nil
    }
    if m.map_task[args.Task_index].Distributed &&
    m.map_task[args.Task_index].Worker_id == args.Worker_id &&
    m.map_task[args.Task_index].Map_done == 0 &&
    time.Now().Before(m.map_task[args.Task_index].Deadline) {
    // fmt.Printf("map task %v is done\n", args.Task_index)
    m.map_task[args.Task_index].Map_done = 1
    for _, tmp_task := range m.map_task {
    if tmp_task.Map_done == 0 { // 如果有一个map task任务没有完成,则继续进行
    return nil
    }
    }
    time.Sleep(1 * time.Second)
    m.cur_stage = "reduce"

    fmt.Println("Start to reduce")
    }
    } else if m.cur_stage == "reduce" {
    // ...
  • master.check()(这里实现为匿名函数)

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    go func() {
    for {
    time.Sleep(500 * time.Millisecond)

    m.lock.Lock()
    if m.cur_stage == "map" {
    for i, task := range m.map_task {
    if task.Worker_id != "" &&
    task.Distributed &&
    time.Now().After(task.Deadline) &&
    task.Map_done == 0 { // 回收并重新分配
    // log.Printf(
    // "Found timed-out %s task %d previously on worker %s. Re-assign",
    // m.cur_stage, task.Index, task.Worker_id,
    // )
    m.map_task[i].Worker_id = ""
    m.map_task[i].Distributed = false
    m.map_task[i].Map_done = 0
    }
    }
    } else {
    for i, task := range m.reduce_task {
    if task.Worker_id != "" &&
    task.Distributed &&
    time.Now().After(task.Deadline) &&
    task.Reduce_done == 0 { // 回收并重新分配
    // log.Printf(
    // "Found timed-out %s task %d previously on worker %s. Re-assign",
    // m.cur_stage, task.Index, task.Worker_id,
    // )
    m.reduce_task[i].Worker_id = ""
    m.reduce_task[i].Distributed = false
    m.reduce_task[i].Reduce_done = 0
    }
    }
    }
    m.lock.Unlock()
    }
    }()

worker

  • worker:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    worker_id := strconv.Itoa(os.Getpid())
    // Your worker implementation here.
    for {
    reply := Ask_reply{}
    args := Ask_args{
    Worker_id: worker_id,
    Task_type: "",
    }
    call_result := call("Master.Ask_for_task", &args, &reply)
    time.Sleep(1 * time.Second)
    if !call_result || reply.Task_type == "done" { // call不成功,可能job已经结束
    break
    }
    if reply.Task_op == 0 {
    continue
    }
    switch reply.Task_type {
    case "map":
    file, err := os.Open(reply.File_name)
    if err != nil {
    log.Fatalf("cannot open %v", reply.File_name)
    return
    }
    content, err := ioutil.ReadAll(file)
    if err != nil {
    log.Fatalf("cannot read %v", reply.File_name)
    return
    }
    defer file.Close()

    key_value := mapf(reply.File_name, string(content))

    // 创建输出文件
    var encoders []*json.Encoder
    for i := 0; i < reply.Num_reduce; i++ {
    f, err := os.Create(fmt.Sprintf("mr-%d-%d", reply.Task_index, i))
    if err != nil {
    log.Fatalf("cannot create intermediate result file")
    return
    }
    encoders = append(encoders, json.NewEncoder(f))
    }
    // 写入中间结果
    for _, kv := range key_value {
    _ = encoders[ihash(kv.Key)%reply.Num_reduce].Encode(&kv)
    }

    call("Master.Finish", &Finish_args{Worker_id: worker_id, Task_type: "map", Task_index: reply.Task_index}, &Finish_reply{})
    case "reduce":
    // ...
  • call

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    // send an RPC request to the Master, wait for the response.
    // usually returns true.
    // returns false if something goes wrong.
    func call(rpcname string, args interface{}, reply interface{}) bool {
    // c, err := rpc.DialHTTP("tcp", "127.0.0.1"+":1234")
    sockname := masterSock()
    c, err := rpc.DialHTTP("unix", sockname)
    if err != nil {
    log.Fatal("dialing:", err)
    }
    defer c.Close()

    err = c.Call(rpcname, args, reply)
    if err == nil {
    return true
    }

    fmt.Println(err)
    return false
    }

rpc通信需要的结构体

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
type Map_task struct {
Index int // 当前task号(即第i个文件)
Worker_id string // 当前task所属worker的id
File_name string
Deadline time.Time
Map_done int
Distributed bool // 是否已经分配出去
}

type Reduce_task struct {
Index int // 当前task号(即第i个文件)
Worker_id string // 当前task所属worker的id
Intermediate_files []string
Deadline time.Time
Reduce_done int
Distributed bool // 是否已经分配出去
}

type Ask_args struct { // worker申请时,向master发送的args
Worker_id string // 做申请的worker id
Task_type string // 上一个完成的task的类型
}

type Ask_reply struct { // master收到worker申请时,回复的reply
File_name string
Num_reduce int
Task_op int
Task_type string
Task_index int // map任务时,输出的文件名为mr-Task_index-0, ~-1, ..., ~-(Num_reduce-1)
Task_deadline time.Time
Worker_id string
Intermediate_files []string // 分配reduce任务时,对应的map中间文件名列表
}

type Finish_args struct { // worker完成时,向master发送的args
Worker_id string // 发送结束信号的worker进程号
Task_type string // worker完成的任务类型
Task_index int // 完成的task id
}

type Finish_reply struct { // master收到worker完成消息时,回复的reply(不需要reply,所以为空)
}

Lab手册(中文版翻译)

对官网上的Lab1手册做了简要翻译,并记录了较为关键的一些rules和hints

Introduction

  • 构建MapReduce系统,实现一个worker过程——调用Map和Reduce函数,读写文件,一个协调过程——给worker分配任务,和处理失败的worker
  • 本lab用coordinator指代论文中的master

Getting started

  • git clone

    1
    2
    3
    4
    5
    $ git clone git://g.csail.mit.edu/6.824-golabs-2022 6.824
    $ cd 6.824
    $ ls
    Makefile src
    $
  • 源码中有一个简单的mapreduce无分布执行过程(src/main/mrsequential.go),并由MapReduce应用,包括单词计数mrapps/wc.go、文本索引mrapps/indexer.go,运行单词计数如下:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    $ cd ~/6.824
    $ cd src/main
    $ go build -race -buildmode=plugin ../mrapps/wc.go
    $ rm mr-out*
    $ go run -race mrsequential.go wc.so pg*.txt
    $ more mr-out-0
    A 509
    ABOUT 2
    ACT 8
    • 建议使用race参数启动go的race检测器
    • mrsequential.go的输出为mr-out-0,输入文件名为pg-xxx.txt

Your job

  • 实现一个分布式的MapReduce,一个协调进程和一个或多个工作进程并行执行。每个worker进程将向协调进程请求一个task,从一或多个文件读取任务的输入,执行任务,并将任务的输出写入一个或多个文件

  • 协调进程应该关注worker是否在合理的时间内(10s)完成任务,并将同样的任务分配给另一个worker

  • 协调进程和worker进程的主要routine在main/mrcoordinator.gomain/mrworker.go,作为参考。需要在mr/coordinator.go, mr/worker.go, and mr/rpc.go里完成自己的代码

  • 用自己的代码运行MapReduce:

    • 保证word-count plugin编译完成

      1
      $ go build -race -buildmode=plugin ../mrapps/wc.go
    • 在main目录下,运行协调进程

      1
      2
      $ rm mr-out*
      $ go run -race mrcoordinator.go pg-*.txt
      • mrcoordinator的pg-*.txt参数为输入文件,每个文件对应一个“split”
    • 在一个或多个其他窗口,运行worker

      1
      $ go run -race mrworker.go wc.so
    • 完成后,查看mr-out-*中的输出。实验完成时输出文件的排序并集应该符合顺序输出:

      1
      2
      3
      4
      5
      $ cat mr-out-* | sort | more
      A 509
      ABOUT 2
      ACT 8
      ...
  • main/test-mr.sh提供了一个test脚本,检查wc和indexer应用在给定pg-xxx.txt文件作为输入时是否产生正确的输出,并检查实现是否并行运行Map和Reduce任务,以及是否从崩溃的worker中恢复。

    1
    2
    3
    4
    $ cd ~/6.824/src/main
    $ bash test-mr.sh
    *** Starting wc test.
    // 挂起,协调进程永远不会完成
    • mr/coordinator.go 设置ret := false ,从而协调进程能够立刻退出

      1
      2
      3
      4
      5
      6
      7
      $ bash test-mr.sh
      *** Starting wc test.
      sort: No such file or directory
      cmp: EOF on mr-wc-all
      --- wc output is not the same as mr-correct-wc.txt
      --- wc test: FAIL
      $
      • 测试脚本期望在名为mr-out-X的文件中看到输出,每个reduce任务对应一个
      • mr/coordinator.gomr/worker.go 尚未实现,因此因此测试失败
    • 如果测试成功,则输出为

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      $ bash test-mr.sh
      *** Starting wc test.
      --- wc test: PASS
      *** Starting indexer test.
      --- indexer test: PASS
      *** Starting map parallelism test.
      --- map parallelism test: PASS
      *** Starting reduce parallelism test.
      --- reduce parallelism test: PASS
      *** Starting crash test.
      --- crash test: PASS
      *** PASSED ALL TESTS
      $
  • 可能会报错:2019/12/16 13:27:09 rpc.Register: method "Done" has 1 input parameters; needs exactly three,忽略即可

A few rules

  • map阶段应该为reduce任务把中间的键划分成nReduce个桶,其中nReduce是reduce任务的数量,为main/mrcoordinator.go传递给MakeCoordinator()的参数,因此每个mapper需要创建nReduce个中间文件,供reduce任务使用
  • 第X个reduce任务的输出放在文件mr-out-X
  • mr-out-X文件的每行为每个Reduce函数输出。该行应该使用Go的"%v %v"格式生成,使用key和value调用——参见main/mrsequential.go的代码
  • 可以修改mr/worker.gomr/coordinator.gomr/rpc.go。可以临时修改其他文件进行测试,但要确保代码能够与原始版本兼容
  • worker应该将中间Map输出放在当前目录的文件中,稍后可以将它们作为Reduce任务的输入读取
  • main/mrcoordinator.go 希望mr/coordinator.go 实现一个Done()函数,以返回true(如果MapReduce完成),此时 mrcoordinator.go会退出
  • 当job结束时,worker应当退出。一个简单的实现方法为使用call()的返回值,如果worker无法和coordinator联系,则会认为job完成

Hints

  • 一种开始的方法是修改mr/worker.goWorker()向coordinater发送RPC,请求执行任务。然后修改coordinator,用尚未启动map任务的文件名响应。修改worker以读取该文件,并调用应用程序映射函数,如mrsequential.go所示

  • 应用程序Map和Reduce函数在运行时使用Go插件包从名称以结尾的文件加载。所以如果更改了mr/目录的任何内容,可能需要重新编译MapReduce插件:go build-race-buildmode=plugin/mrapps/wc.go

  • worker的map存储中间键/值对,可以使用Go的encoding/json包,将JSON格式的键/值对写入/读取一个打开的文件:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    enc := json.NewEncoder(file)
    for _, kv := ... {
    err := enc.Encode(&kv)

    dec := json.NewDecoder(file)
    for {
    var kv KeyValue
    if err := dec.Decode(&kv); err != nil {
    break
    }
    kva = append(kva, kv)
    }
  • 中间文件的合理命名是mr-X-Y,其中X是map任务号,Y是reduce任务

  • map worker可以使用worker.go里的ihash(key)来选择reduce worker

  • 可以从mrsequential.go中摘取map输入文件读取代码、中间键值对排序代码、存储Reduce输出代码

  • lock共享数据

  • 测试崩溃恢复可以使用mrapps/crash.go