MIT-6.824 Lab1 总结与备忘
思路整理
- 整个mapreduce框架分为两个大的阶段:map和reduce。master在被启动后,最外层一直周期性调用
done()
,当其返回值为true
时说明job完成。 - 两个阶段中,master首先需要等worker先把所有的map任务完成——这里,一个worker做一个map任务,即对一个输入文件做map任务,有
files
有多少个文件,master就要分配几次map任务——然后再进入reduce阶段,reduce阶段要分配reduce任务的次数为“超参数”nReduce
。等所有的reduce任务结束,master的状态转为特殊的结束状态,此时最外层再调用done()
时,返回值为true
,结束master进程。本条分析表明:- 要实现两种task结构体
- 要用两个列表存储map task和reduce task
- master类中的成员应当包括:map的任务数、reduce的任务数、map task列表、reduce task列表、master当前所处的阶段
- master在初始化时就要建立两个列表
- task结构体里要有一个flag,标志其是否完成,每当一个worker传回finish信号时,将该worker的task标记为已完成
- 在做map task或reduce task时,master和worker(任意一个worker)的通信过程和处理过程为:
- 一个worker启动后,通过RPC来主动申请任务(
call
)。因此master需要实现一个成员函数,用来处理worker的申请,根据master所处的阶段,以及当前阶段任务的状态(还是否存在没有被分配的任务)来决定是否向这个worker分配任务。本条分析表明:- master要实现:
func (m *Master) Ask_for_task(args *Ask_args, reply *Ask_reply) error
- task结构体包含:
- task号码:
task_index
- 是否分配的标志
distributed
- 该task被分配给哪个worker:
worker_id
- task号码:
- worker的ask结构体包含:
- 自己的id(这里以进程号唯一标识一个worker)
- worker的reply结构体(master直接操作reply的内容,worker将)包含:(这里,将map task的reply和reduce task的reply合并为一个通用的结构体)
- map任务对应的文件名
file_name
- reduce任务对应的文件列表
intermediate_files
- nReduce
- 和master所处阶段一致的任务类型
task_type
- 任务结束时间(用于超时判定)
- 任务编号
task_index
- map任务对应的文件名
- master要实现:
- 当worker被分配任务后,worker根据reply的内容决定执行什么任务。如果为map task,则读取map task对应的文件并调用用户定义的map函数,将处理结果存储为中间文件
mr-map_task_id-x
(x取值范围为0~nReduce-1)。如果为reduce task,则根据任务的id,读取中间文件mr-x-reduce_task_id
(x取值范围为0~nMap-1,nMap为map task的总数)。而当任务完成时,向master报告“任务已完成”。本条分析表明:- worker要实现:
- 申请任务-报告任务-申请任务-报告任务的死循环
- 根据
task_type
做不同的任务 - 当
task_type
为done
或者申请任务的返回值为false时,跳出死循环,结束该worker进程
- 报告结构体
finish_args
要实现:- 记录该项已完成任务的类型
task_type
- 记录该项已完成任务的id
task_index
,它和reply
结构体的task_index
应当是相同的 - 记录发送该报告的worker id(即该worker的进程号)
- 记录该项已完成任务的类型
- 报告回复结构体
finish_reply
为空,因为worker报告后,master不需要再回复它,worker自行进入下一个申请任务-报告任务的循环
- worker要实现:
- master收到完成的任务时,根据master所处的阶段能够判别该完成的task是什么类型,于是master根据
finish_reply
的task_index
从相应的task列表中找到这个task,标记该task已完成,并检查该列表中是否所有task都已经完成,如果都完成则master进入下一个阶段。本条分析表明:- master要实现成员函数
func (m *Master) Finish(args *Finish_args, reply *Finish_reply) error
- master要根据所处的状态,分别检查不同的task列表
- task结构体包含一个标志本任务是否完成的flag
task_done
,初始化时为false
- master要实现成员函数
- 一个worker启动后,通过RPC来主动申请任务(
- 在以上过程里,master要随时关注分配出去的task运行时间,因此master需要另起一个协程,以死循环的方式每隔一小段时间就检查,当前时间是否在分配出去task的deadline之后。本条分析表明:
- master在分配任务时设置task的deadline
- task结构体包含:
- 记录task最晚结束时间的deadline
- 需要实现一个内含死循环的
check()
,并在makeMaster()
中go check()
调用
关键实现
master
master结构体
1
2
3
4
5
6
7
8type Master struct {
lock sync.Mutex
num_map int // 输入文件数目
num_reduce int
cur_stage string //记录当前的工作状态(map/reduce)
map_task []Map_task // map task
reduce_task []Reduce_task //
}master.Ask_for_task(args *Ask_args, reply *Ask_reply) error
:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45m.lock.Lock()
defer m.lock.Unlock()
reply.Task_op = 0
if m.cur_stage == "map" {
// fmt.Printf("Master current stage: %v\n", m.cur_stage)
for i, temp_task := range m.map_task {
if !temp_task.Distributed { // map task没有分配出去
m.map_task[i].Distributed = true
m.map_task[i].Deadline = time.Now().Add(10 * time.Second)
m.map_task[i].Worker_id = args.Worker_id
reply.File_name = temp_task.File_name
reply.Num_reduce = m.num_reduce
reply.Task_type = "map"
reply.Task_index = temp_task.Index
reply.Task_deadline = m.map_task[i].Deadline
reply.Worker_id = args.Worker_id
break
}
}
} else if m.cur_stage == "reduce" { // m.cur_stage == "reduce"
// fmt.Println("Master current stage: " + m.cur_stage)
for i, temp_task := range m.reduce_task {
if !temp_task.Distributed { // map task没有分配出去
m.reduce_task[i].Distributed = true
m.reduce_task[i].Deadline = time.Now().Add(10 * time.Second)
m.reduce_task[i].Worker_id = args.Worker_id
reply.File_name = ""
reply.Num_reduce = m.num_reduce
reply.Task_type = "reduce"
reply.Task_index = temp_task.Index
reply.Task_deadline = m.reduce_task[i].Deadline
reply.Worker_id = args.Worker_id
reply.Intermediate_files = nil // 中间文件名mr-X-Y,其中X是map任务号,Y是reduce任务
for map_index := 0; map_index < m.num_map; map_index++ {
reply.Intermediate_files = append(reply.Intermediate_files, fmt.Sprintf("mr-%d-%d", map_index, temp_task.Index))
}
break
}
}
}master.Finish(args *Finish_args, reply *Finish_reply) error
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27m.lock.Lock()
defer m.lock.Unlock()
if m.cur_stage == "map" {
// fmt.Println("A map task finished")
if args.Task_type != "map" {
fmt.Printf("Expect map type finish message, got %s type", args.Task_type)
return nil
}
if m.map_task[args.Task_index].Distributed &&
m.map_task[args.Task_index].Worker_id == args.Worker_id &&
m.map_task[args.Task_index].Map_done == 0 &&
time.Now().Before(m.map_task[args.Task_index].Deadline) {
// fmt.Printf("map task %v is done\n", args.Task_index)
m.map_task[args.Task_index].Map_done = 1
for _, tmp_task := range m.map_task {
if tmp_task.Map_done == 0 { // 如果有一个map task任务没有完成,则继续进行
return nil
}
}
time.Sleep(1 * time.Second)
m.cur_stage = "reduce"
fmt.Println("Start to reduce")
}
} else if m.cur_stage == "reduce" {
// ...master.check()
(这里实现为匿名函数)1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39go func() {
for {
time.Sleep(500 * time.Millisecond)
m.lock.Lock()
if m.cur_stage == "map" {
for i, task := range m.map_task {
if task.Worker_id != "" &&
task.Distributed &&
time.Now().After(task.Deadline) &&
task.Map_done == 0 { // 回收并重新分配
// log.Printf(
// "Found timed-out %s task %d previously on worker %s. Re-assign",
// m.cur_stage, task.Index, task.Worker_id,
// )
m.map_task[i].Worker_id = ""
m.map_task[i].Distributed = false
m.map_task[i].Map_done = 0
}
}
} else {
for i, task := range m.reduce_task {
if task.Worker_id != "" &&
task.Distributed &&
time.Now().After(task.Deadline) &&
task.Reduce_done == 0 { // 回收并重新分配
// log.Printf(
// "Found timed-out %s task %d previously on worker %s. Re-assign",
// m.cur_stage, task.Index, task.Worker_id,
// )
m.reduce_task[i].Worker_id = ""
m.reduce_task[i].Distributed = false
m.reduce_task[i].Reduce_done = 0
}
}
}
m.lock.Unlock()
}
}()
worker
worker:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50worker_id := strconv.Itoa(os.Getpid())
// Your worker implementation here.
for {
reply := Ask_reply{}
args := Ask_args{
Worker_id: worker_id,
Task_type: "",
}
call_result := call("Master.Ask_for_task", &args, &reply)
time.Sleep(1 * time.Second)
if !call_result || reply.Task_type == "done" { // call不成功,可能job已经结束
break
}
if reply.Task_op == 0 {
continue
}
switch reply.Task_type {
case "map":
file, err := os.Open(reply.File_name)
if err != nil {
log.Fatalf("cannot open %v", reply.File_name)
return
}
content, err := ioutil.ReadAll(file)
if err != nil {
log.Fatalf("cannot read %v", reply.File_name)
return
}
defer file.Close()
key_value := mapf(reply.File_name, string(content))
// 创建输出文件
var encoders []*json.Encoder
for i := 0; i < reply.Num_reduce; i++ {
f, err := os.Create(fmt.Sprintf("mr-%d-%d", reply.Task_index, i))
if err != nil {
log.Fatalf("cannot create intermediate result file")
return
}
encoders = append(encoders, json.NewEncoder(f))
}
// 写入中间结果
for _, kv := range key_value {
_ = encoders[ihash(kv.Key)%reply.Num_reduce].Encode(&kv)
}
call("Master.Finish", &Finish_args{Worker_id: worker_id, Task_type: "map", Task_index: reply.Task_index}, &Finish_reply{})
case "reduce":
// ...call
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20// send an RPC request to the Master, wait for the response.
// usually returns true.
// returns false if something goes wrong.
func call(rpcname string, args interface{}, reply interface{}) bool {
// c, err := rpc.DialHTTP("tcp", "127.0.0.1"+":1234")
sockname := masterSock()
c, err := rpc.DialHTTP("unix", sockname)
if err != nil {
log.Fatal("dialing:", err)
}
defer c.Close()
err = c.Call(rpcname, args, reply)
if err == nil {
return true
}
fmt.Println(err)
return false
}
rpc通信需要的结构体
1 | type Map_task struct { |
Lab手册(中文版翻译)
对官网上的Lab1手册做了简要翻译,并记录了较为关键的一些rules和hints
Introduction
- 构建MapReduce系统,实现一个worker过程——调用Map和Reduce函数,读写文件,一个协调过程——给worker分配任务,和处理失败的worker
- 本lab用coordinator指代论文中的master
Getting started
git clone
1
2
3
4
5$ git clone git://g.csail.mit.edu/6.824-golabs-2022 6.824
$ cd 6.824
$ ls
Makefile src
$源码中有一个简单的mapreduce无分布执行过程(
src/main/mrsequential.go
),并由MapReduce应用,包括单词计数mrapps/wc.go
、文本索引mrapps/indexer.go
,运行单词计数如下:1
2
3
4
5
6
7
8
9$ cd ~/6.824
$ cd src/main
$ go build -race -buildmode=plugin ../mrapps/wc.go
$ rm mr-out*
$ go run -race mrsequential.go wc.so pg*.txt
$ more mr-out-0
A 509
ABOUT 2
ACT 8- 建议使用race参数启动go的race检测器
mrsequential.go
的输出为mr-out-0
,输入文件名为pg-xxx.txt
Your job
实现一个分布式的MapReduce,一个协调进程和一个或多个工作进程并行执行。每个worker进程将向协调进程请求一个task,从一或多个文件读取任务的输入,执行任务,并将任务的输出写入一个或多个文件
协调进程应该关注worker是否在合理的时间内(10s)完成任务,并将同样的任务分配给另一个worker
协调进程和worker进程的主要routine在
main/mrcoordinator.go
和main/mrworker.go
,作为参考。需要在mr/coordinator.go
,mr/worker.go
, andmr/rpc.go
里完成自己的代码用自己的代码运行MapReduce:
保证word-count plugin编译完成
1
$ go build -race -buildmode=plugin ../mrapps/wc.go
在main目录下,运行协调进程
1
2$ rm mr-out*
$ go run -race mrcoordinator.go pg-*.txt- mrcoordinator的pg-*.txt参数为输入文件,每个文件对应一个“split”
在一个或多个其他窗口,运行worker
1
$ go run -race mrworker.go wc.so
完成后,查看mr-out-*中的输出。实验完成时输出文件的排序并集应该符合顺序输出:
1
2
3
4
5$ cat mr-out-* | sort | more
A 509
ABOUT 2
ACT 8
...
在
main/test-mr.sh
提供了一个test脚本,检查wc和indexer应用在给定pg-xxx.txt文件作为输入时是否产生正确的输出,并检查实现是否并行运行Map和Reduce任务,以及是否从崩溃的worker中恢复。1
2
3
4$ cd ~/6.824/src/main
$ bash test-mr.sh
*** Starting wc test.
// 挂起,协调进程永远不会完成在
mr/coordinator.go
设置ret := false
,从而协调进程能够立刻退出1
2
3
4
5
6
7$ bash test-mr.sh
*** Starting wc test.
sort: No such file or directory
cmp: EOF on mr-wc-all
--- wc output is not the same as mr-correct-wc.txt
--- wc test: FAIL
$- 测试脚本期望在名为mr-out-X的文件中看到输出,每个reduce任务对应一个
mr/coordinator.go
和mr/worker.go
尚未实现,因此因此测试失败
如果测试成功,则输出为
1
2
3
4
5
6
7
8
9
10
11
12
13$ bash test-mr.sh
*** Starting wc test.
--- wc test: PASS
*** Starting indexer test.
--- indexer test: PASS
*** Starting map parallelism test.
--- map parallelism test: PASS
*** Starting reduce parallelism test.
--- reduce parallelism test: PASS
*** Starting crash test.
--- crash test: PASS
*** PASSED ALL TESTS
$
可能会报错:
2019/12/16 13:27:09 rpc.Register: method "Done" has 1 input parameters; needs exactly three
,忽略即可
A few rules
- map阶段应该为reduce任务把中间的键划分成nReduce个桶,其中nReduce是reduce任务的数量,为
main/mrcoordinator.go
传递给MakeCoordinator()
的参数,因此每个mapper需要创建nReduce个中间文件,供reduce任务使用 - 第X个reduce任务的输出放在文件mr-out-X
- mr-out-X文件的每行为每个Reduce函数输出。该行应该使用Go的
"%v %v"
格式生成,使用key和value调用——参见main/mrsequential.go
的代码 - 可以修改
mr/worker.go
、mr/coordinator.go
和mr/rpc.go
。可以临时修改其他文件进行测试,但要确保代码能够与原始版本兼容 - worker应该将中间Map输出放在当前目录的文件中,稍后可以将它们作为Reduce任务的输入读取
main/mrcoordinator.go
希望mr/coordinator.go
实现一个Done()
函数,以返回true(如果MapReduce完成),此时mrcoordinator.go
会退出- 当job结束时,worker应当退出。一个简单的实现方法为使用
call()
的返回值,如果worker无法和coordinator联系,则会认为job完成
Hints
一种开始的方法是修改
mr/worker.go
的Worker()
向coordinater发送RPC,请求执行任务。然后修改coordinator,用尚未启动map任务的文件名响应。修改worker以读取该文件,并调用应用程序映射函数,如mrsequential.go
所示应用程序Map和Reduce函数在运行时使用Go插件包从名称以结尾的文件加载。所以如果更改了
mr/
目录的任何内容,可能需要重新编译MapReduce插件:go build-race-buildmode=plugin/mrapps/wc.go
worker的map存储中间键/值对,可以使用Go的
encoding/json
包,将JSON格式的键/值对写入/读取一个打开的文件:1
2
3
4
5
6
7
8
9
10
11
12enc := json.NewEncoder(file)
for _, kv := ... {
err := enc.Encode(&kv)
dec := json.NewDecoder(file)
for {
var kv KeyValue
if err := dec.Decode(&kv); err != nil {
break
}
kva = append(kva, kv)
}中间文件的合理命名是mr-X-Y,其中X是map任务号,Y是reduce任务
map worker可以使用
worker.go
里的ihash(key)
来选择reduce worker可以从
mrsequential.go
中摘取map输入文件读取代码、中间键值对排序代码、存储Reduce输出代码lock共享数据
测试崩溃恢复可以使用
mrapps/crash.go