MR 概念

  • 简介
  • MR 程序组成部分
  • MapTask 并行度
  • ReduceTask 的并行度
  • Shuffle 机制
  • 文件太小如何处理
  • 自定义分区
  • 二次排序

简介

MapReduce 是一个分布式运算程序的编程框架,是用户开发基于 hadoop 的数据分析应用的核心框架

MapReduce 核心功能是将用户编写的业务逻辑代码和自带默认组件整合成一个完整的分布式运算程序,并发运行在一个 hadoop 集群上

MR 程序组成部分

  • Split

    将程序输入的数据进行切分,每个 split 交给一个 MapTask 执行。split 的数量可以自己定义,默认情况下一个文件一个 split

  • Map

    输入 为一个 split 中的数据,对 split 中的数据进行拆分,并以<key, value> 对的格式保存数据

  • Shuffle / Combine / sort

    这几个过程在简单的 MR 程序中并不需要我们关注,因为源代码中已经给出了一些默认的 Shuffle / Combine / sort 处理器,作用分别是:

    • Combine:对 MapTask 产生的结果在本地节点上进行合并、统计等,以减少后续整个集群间的 Shuffle 过程所需要传输的数据量
    • Shuffle / Sort:将集群中各个 MapTask 的处理结果在集群间进行传输,排序,数据经过这个阶段之后就作为 Reduce 端的输入
  • Reduce

    ReduceTask 的输入数据是经过排序之后的一系列 key 值相同的 <key, value> 对,ReduceTask 对其进行统计等处理,产生最终的输出。ReduceTask 的数量可以设置

MapTask 并行度

选择并发数的影响因素:

  1. 运算节点的硬件配置
  2. 运算任务的类型:CPU 密集型还是 IO 密集型
  3. 运算任务的数据量

Task 并行度的经验

  • 最好每个 task 执行的时间至少一分钟
  • 如果 job 的每个 map 或者 reducetask 运行时间比较短,那就应该减少 job 的 map 或者 reduce 的数量,因为每个 task 的 setup 和加入到调度器需要消耗一定的时间,如果每个 task 都花不了太多时间,就没必要有太多的 task
  • 默认情况下,每个 task 都是一个 jvm 实例,都需要开启和销毁,jvm 的开启和销毁所需要的时间比执行的时间要长,所以配置 jvm 的可重用性可以改善性能
  • mapred.job.reuse.jvm.num.tasks,默认是 1,表示一个 JVM 上最可以顺序执行的 task 数目是 1,也就是说一个 task 启动一个 JVM
  • 如果 input 的文件非常大,可以考虑将 hdfs 上的每个 blocksize 设置大一些,比如 256MB 或者 512MB
  • JVM 重用技术不是指同一个 job 的多个 task 可以同时运行于同一个 JVM 上,而是排队按顺序执行

ReduceTask 的并行度

maptask 的并发数由切片数决定不同,reducetask 数量是可以手动设置的

1
2
// 默认为 1,手动设置为 4
job.setNumReduceTask(4);

如果数据分布不均匀,就有可能在 reduce 阶段产生数据倾斜

注意:ReduceTask 数量并不是任意设置的,需要考虑业务逻辑需求,有些情况需要计算全局汇总结果,就只能有一个 reducetask

尽量不要运行太多的 reducetask。最好 reduce 数量和集群中的 reduce 持平或者比集群中的 reduce slots 小

Shuffle 机制

shuffle

wordcount 的 shuffle 详细过程:

  1. 读取数据:MR 默认使用 TextInputFormat 来获取切片的数量,通过 createRecordReader 方法获取 RecordReader,缺省的 RecordReader 是 LineRecordReader,通过调用 LineRecordReader 的 nextKeyValue 方法获取每行的数据
  2. 溢出:通过 OutputCollector 收集器收集读取的数据,缺省使用的是 Task.CombineOutputCollector,调用其 collect 进行溢出,收集器默认的空间是 100 M,当收集器达到 80% 的时候开始溢出
  3. 分区排序:mapreduce 默认通过 HashPartitioner 进行分区且有序(在内存中结束)
  4. 输出文件:maptask 的最终输出文件分区有序
  5. Reduce拉取文件:从各个分区中拉取相同的 key 到 reducetask 中,合并归并排序,相同的 key 看作一个 group
  6. 写出数据:FileOutputFormat 调用 RecordWriter 写出文件,相同的 key 会写到一个分区

文件太小如何处理

小文件处理方法

自定义分区

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public class CustomizeParitioner extends Partitioner<Text, NullWritable> {
private static Map<String, Integer> map = new HashMap<>();

static {
map.put("139", 0);
map.put("186", 1);
map.put("187", 2);
map.put("136", 3);
}

@Override
public int getPartition(Text key, NullWritable value, int numPartitions) {
String key_ = key.toString().subString(0, 3);
return map.get(key_) != null ? map.get(key_) : 4;
}
}

二次排序

在 hadoop 中一般都是按照 key 进行排序的,但有时候还需要按照 value 进行排序

有两种方法:buffer and int memory sort 和 value-to-key conversion

  • buffer and in memory sort:主要是在 reduce() 函数中,将每个 key 对应的 value 值保存下来,进行排序。缺点是会造成 out of memory
  • value-to-key conversion:主要思想是将 key 和 value 拼接成一个组合 key,然后进行排序,这样 reduce() 函数获取结果就实现了按照 key 排序,然后按照 value 排序,但是需要用户自己实现 paritioner,以便只按照 key 进行数据划分
坚持原创技术分享,您的支持将鼓励我继续创作!