MIT 6.824 Lab 1 - 实现 MapReduce
在这篇文章中,我们将按照 MIT-6.824 2021 Spring 的安排,完成 Lab 1,用 Golang 实现 MapReduce 分布式计算框架。
完整的 Lab 说明可参阅链接 http://nil.csail.mit.edu/6.824/2021/labs/lab-mr.html。
不了解 MapReduce 原理的读者,也可以先阅读我先前的文章《Google MapReduce 总结》。
牛刀小试
首先,我们通过 Git 获取 Lab 的初始代码:
1 | git clone git://g.csail.mit.edu/6.824-golabs-2021 6.824 |
初始代码中默认已经提供了 单进程串行 的 MapReduce 参考实现,在 main/mrsequential.go 中。我们可以通过以下命令来试玩一下:
1 | cd 6.824 |
除了 mrapps/wc.go
,初始代码在 mrapps
中还提供了其他 MR APP 实现,也可以参照着替换上述命令中的参数来试玩一下。
这里使用了 Golang 的 Plugin 来构建 MR APP,使得 MR 框架的代码可以和 MR APP 的代码分开编译,而后 MR 框架再通过动态链接的方式载入指定的 MR APP 运行。
任务分析
如上文所述,在 main/mrsequential.go 中我们可以找到初始代码预先提供的 单进程串行 的 MapReduce 参考实现,而我们的任务是实现一个 单机多进程并行 的版本。
通过阅读 Lab 文档 http://nil.csail.mit.edu/6.824/2021/labs/lab-mr.html 以及初始代码,可知信息如下:
- 整个 MR 框架由一个 Coordinator 进程及若干个 Worker 进程构成
- Coordinator 进程与 Worker 进程间通过本地 Socket 进行 Golang RPC 通信
- 由 Coordinator 协调整个 MR 计算的推进,并分配 Task 到 Worker 上运行
- 在启动 Coordinator 进程时指定 输入文件名 及 Reduce Task 数量
- 在启动 Worker 进程时指定所用的 MR APP 动态链接库文件
- Coordinator 需要留意 Worker 可能无法在合理时间内完成收到的任务(Worker 卡死或宕机),在遇到此类问题时需要重新派发任务
- Coordinator 进程的入口文件为 main/mrcoordinator.go
- Worker 进程的入口文件为 main/mrworker.go
- 我们需要补充实现 mr/coordinator.go、mr/worker.go、mr/rpc.go 这三个文件
基于此,我们不难设计出,Coordinator 需要有以下功能:
- 在启动时根据指定的输入文件数及 Reduce Task 数,生成 Map Task 及 Reduce Task
- 响应 Worker 的 Task 申请 RPC 请求,分配可用的 Task 给到 Worker 处理
- 追踪 Task 的完成情况,在所有 Map Task 完成后进入 Reduce 阶段,开始派发 Reduce Task;在所有 Reduce Task 完成后标记作业已完成并退出
而 Worker 的功能则相对简单,只需要保证在空闲时通过 RPC 向 Coordinator 申请 Task 并运行,再不断重复该过程即可。
此外 Lab 要求我们考虑 Worker 的 Failover,即 Worker 获取到 Task 后可能出现宕机和卡死等情况。这两种情况在 Coordinator 的视角中都是相同的,就是该 Worker 长时间不与 Coordinator 通信了。为了简化任务,Lab 说明中明确指定了,设定该超时阈值为 10s 即可。为了支持这一点,我们的实现需要支持到:
- Coordinator 追踪已分配 Task 的运行情况,在 Task 超出 10s 仍未完成时,将该 Task 重新分配给其他 Worker 重试
- 考虑 Task 上一次分配的 Worker 可能仍在运行,重新分配后会出现两个 Worker 同时运行同一个 Task 的情况。要确保只有一个 Worker 能够完成结果数据的最终写出,以免出现冲突,导致下游观察到重复或缺失的结果数据
第一点比较简单,而第二点会相对复杂些,不过在 Lab 文档中也给出了提示 —— 实际上也是参考了 Google MapReduce 的做法,Worker 在写出数据时可以先写出到临时文件,最终确认没有问题后再将其重命名为正式结果文件,区分开了 Write 和 Commit 的过程。Commit 的过程可以是 Coordinator 来执行,也可以是 Worker 来执行:
- Coordinator Commit:Worker 向 Coordinator 汇报 Task 完成,Coordinator 确认该 Task 是否仍属于该 Worker,是则进行结果文件 Commit,否则直接忽略
- Worker Commit:Worker 向 Coordinator 汇报 Task 完成,Coordinator 确认该 Task 是否仍属于该 Worker 并响应 Worker,是则 Worker 进行结果文件 Commit,再向 Coordinator 汇报 Commit 完成
这里两种方案都是可行的,各有利弊。我在我的实现中选择了 Coordinator Commit,因为它可以少一次 RPC 调用,在编码实现上会更简单,但缺点是所有 Task 的最终 Commit 都由 Coordinator 完成,在极端场景下会让 Coordinator 变成整个 MR 过程的性能瓶颈。
代码设计与实现
代码的设计及实现主要是三个部分:
- Coordinator 与 Worker 间的 RPC 通信,对应 mr/rpc.go 文件
- Coordinator 调度逻辑,对应 mr/coordinator.go 文件
- Worker 计算逻辑,对应 mr/worker.go 文件
RPC 通信
Coordinator 与 Worker 间的需要进行的通信主要有两块:
- Worker 在空闲时向 Coordinator 发起 Task 请求,Coordinator 响应一个分配给该 Worker 的 Task
- Worker 在上一个 Task 运行完成后向 Coordinator 汇报
考虑到上述两个过程总是交替进行的,且 Worker 在上一个 Task 运行完成后总是立刻会需要申请一个新的 Task,在实现上这里我把它们合并为了一个 RPC 调用:
ApplyForTask RPC:
- 由 Worker 向 Coordinator 发起,申请一个新的 Task,同时汇报上一个运行完成的 Task(如有)
- Coordinator 接收到 RPC 请求后将同步阻塞,直到有可用的 Task 分配给该 Worker 或整个 MR 作业已运行完成
参数:
- Worker ID
- 上一个完成的 Task 的类型及 Index。可能为空
响应:
- 新 Task 的类型及 Index。若为空则代表 MR 作业已完成,Worker 可退出
- 运行新 Task 所需的其他信息,包括:
- 如果是 MAP Task,需要
- 对应的输入文件名
- 总 REDUCE Task 数量,用于生成中间结果文件
- 如果是 REDUCE Task,需要总 MAP Task 数量,用于生成对应中间结果文件的文件名
可点击链接 https://github.com/Mr-Dai/MIT-6.824/blob/master/src/mr/rpc.go 查看我的完整实现。
Coordinator
由于涉及整个 MR 作业的运行过程调度以及 Worker Failover 的处理,Coordinator 组件的逻辑会相对复杂。
首先,Coordinator 需要维护以下状态信息:
- 基础配置信息,包括 总 MAP Task 数量、总 Reduce Task 数量
- 调度所需信息,包括
- 当前所处阶段,是 MAP 还是 REDUCE
- 所有仍未完成的 Task 及其所属的 Worker 和 Deadline(若有),使用 Golang Map 结构实现
- 所有仍未分配的 Task 池,用于响应 Worker 的申请及 Failover 时的重新分配,使用 Golang Channel 实现
1 | type Coordinator struct { |
然后,Coordinator 需要实现以下几个过程:
- 在启动时,基于指定的输入文件生成 MAP Task 到可用 Task 池中
- 处理 Worker 的 Task 申请 RPC,从池中分配一个可用的 Task 给 Worker 并响应
- 处理 Worker 的 Task 完成通知,完成 Task 最终的结果数据 Commit
- 在 MAP Task 全部完成后,转移至 REDUCE 阶段,生成 REDUCE Task 到可用 Task 池
- 在 REDUCE Task 全部完成后,标记 MR 作业已完成,退出
- 周期巡检正在运行的 Task,发现 Task 运行时长超出 10s 后重新分配其到新的 Worker 上运行
这里我们一个个来。先看 Coordinator 启动时的 MAP Task 生成:
1 | func MakeCoordinator(files []string, nReduce int) *Coordinator { |
然后我们再来看 可用 Task 获取与分配:
1 | // 基于 Task 的类型和 Index 值生成唯一 ID |
然后是 Worker Task 已完成的处理:
1 | // ApplyForTask RPC 的处理入口,由 Worker 调用 |
然后我们来看 作业运行阶段的切换:
1 | func (c *Coordinator) transit() { |
最后我们再来看 过期 Task 的回收。考虑到该过程需要对已分配的 Task 进行周期巡检,我们直接在 Coordinator 启动时启动一个 Goroutine 来实现:
1 | func MakeCoordinator(files []string, nReduce int) *Coordinator { |
可点击链接 https://github.com/Mr-Dai/MIT-6.824/blob/master/src/mr/coordinator.go 查看我的完整实现。
Worker
Worker 的核心逻辑比较简单,主要是一个死循环,不断地向 Coordinator 调用 ApplyForTask RPC:
- Coordinator 返回空响应,代表 MR 作业已完成,则退出循环,结束 Worker 进程
- Coordinator 返回 MAP Task,则
- 读取对应输入文件的内容
- 传递至 MR APP 指定的 Map 函数,得到对应的中间结果
- 按中间结果 Key 的 Hash 值进行分桶,保存至中间结果文件
- Coordinator 返回 REDUCE Task,则
- 读取所有属于该 REDUCE Task 的中间结果文件数据
- 对所有中间结果进行排序,并按 Key 值进行归并
- 传递归并后的数据至 MR APP 指定的 REDUCE 函数,得到最终结果
- 写出到结果文件
先看最外层的循环:
1 | func Worker(mapf func(string, string) []KeyValue, reducef func(string, []string) string) { |
然后是 MAP Task 的处理:
1 | // 读取输入数据 |
最后是 REDUCE Task 的处理:
1 | // 读取输入数据 |
可点击链接 https://github.com/Mr-Dai/MIT-6.824/blob/master/src/mr/worker.go 查看我的完整实现。
思考延伸
在这个 Lab 中,我们实现了 单机多进程 的 MapReduce 框架。在 Lab 文档的最后,也有建议同学们尝试实现 多机分布式 的版本。这里我就不给出具体代码了,简单分析下要做到这一点大致需要解决以下问题:
- 调整 Worker ID 的生成方式,保证在多机分布式模式下不重复
- 实现多机 RPC 通信。Worker 如何知道 Coordinator 的 Hostname 及端口?
- 中间结果数据的传输?有两类方案:
- 直接写入到如 AWS S3 等共享存储。改动成本低,但依赖外部服务
- 参考 Google MapReduce 的做法,保存在 Map Worker 的本地磁盘,Reduce Worker 通过 RPC 向 Map Worker 拉取数据
此外,我在上文中给出的实现代码也比较简单,在大数据量的场景下也有着不小的改进空间,包括:
- Worker 是否可以得知自己的 Task 已超出 Deadline 并主动处理?
- 调整 Map / Reduce 函数签名,让整个 Map / Reduce 过程 Streaming 化,避免因总输入/输出数据量过大导致进程 OOM
- 比起在 Reduce Task 开始时对完整输入数据进行全排序,也可在各个 Map Task 末尾先进行局部排序,再在 Reduce Task 开始时进行有序归并
时至今日,随着 Hadoop 生态的流行,MapReduce 的运行时实现方案已经非常成熟,上述问题的答案想必都能在 Hadoop 的实现中找到。感兴趣的读者也可在此次 Lab 后自行翻阅 Hadoop MapReduce 的源码,了解并学习我们的实现相比真实的大数据集生产环境还有哪些可以改进的地方。
MIT 6.824 Lab 1 - 实现 MapReduce