在了解了 Hadoop 的序列化操作,实现了基本的 Bean 序列化的一个 demo,接下来分析一下 MapReduce 的框架原理。
切片与MapTask 并行度决定机制
MapTask 的并行度决定 Map 阶段的任务处理并发度,进而影响整个 Job 的处理速度。
问题:
一个 1G 的数据,启动 8 个MapTask,可以提高集群的并发处理能力。但是如果是一个 1K 的数据,也启动 8 个MapTask,会提高性能吗?
MapTask 是否越多越好?
什么因素会影响到 MapTask 的并行度?
MapTask并行度决定机制
前置概念:
数据块:Block 在 HDFS 物理上把数据分成一块一块的。
数据切片:在逻辑上对输入进行分片,并不会在磁盘上将其分片存储。
现在,假设有一个 300M 的数据,分别存放在 DataNode 1、2、3 上,DataNode1 上存储 0~128M 数据,DataNode2 上存储 128~256M 数据,DataNode3 上存储 256~300M 数据。
如果数据切片大小为 100M,则读取第一个切片没有问题,当读取第2、3个切片时,需要将DataNode1 上的剩余的数据拷贝到 MapTask2 上,将 DataNode2 上剩余的数据拷贝到 MapTask3 上,这样会存在大量的数据 IO,严重影响性能。
如果数据切片大小为 128M(即与 Block 大小一致),此时,每个 MapTask 都读取 128M 数据,则可以分别运行在三台 DataNode 上,没有数据拷贝,此时性能最高。
MapTask 并行度决定机制
- 一个 Job 的 Map 阶段并行度由客户端在提交 Job 时的切片数决定
- 每个切片分配一个 MapTask 并行实例处理
- 默认情况下,切片大小等于 BlockSize
- 切片时不考虑数据集整体,而是逐个针对每个文件单独切片
Job 提交流程、切片源码
在 Job 调用 job.waitForCompletion
时,进行任务提交。此方法会调用 submit()
方法进行真正的提交。
任务提交流程
1 | public boolean waitForCompletion(boolean verbose |
1 | public void submit() |
connect 连接流程
1 | private synchronized void connect() |
1 | public Cluster(Configuration conf) throws IOException { |
1 | private void initialize(InetSocketAddress jobTrackAddr, Configuration conf) |
实际提交流程
1 | JobStatus submitJobInternal(Job job, Cluster cluster) |
切片流程
1 | private int writeSplits(org.apache.hadoop.mapreduce.JobContext job, |
1 | private <T extends InputSplit> int writeNewSplits(JobContext job, Path jobSubmitDir) throws IOException, |
1 | // 此处调用的是 FileInputFormat 中的 getSplits |
总结
- 先创建一个数据存储的临时目录
- 开始规划切片,遍历处理目录下的每个文件
- 遍历文件:
获取文件大小
计算切片大小,公式: Math.max(minSize, Math.min(maxSize, blockSize))
默认情况下,切片大小 = blockSize
开始切片:local 运行(第一个切片 0~32M,第二个切片 32~64M …);Yarn 运行(第一个切片 0~128M,第二个切片 128~256M …);注意:每次切片时,都需要判断切片完成后剩余部分是否是块大小的 1.1 倍,大于就切片,否则不切
将切片信息写入切片规划文件
InputSplit 只记录切片的元数据信息(起始位置、长度、所在节点列表等) - 提交切片规划文件(local 运行时为临时目录,集群运行时为 yarn);Yarn 上的 MrAppMaster 根据切片规划文件计算开启 MapTask 个数。