4 Hadoop之MapReduce总结与优化
4.1 MapReduce开发总结


MapReduce整个工作流程如上图所示,具体包含以下步骤:
1)数据输入接口
默认使用实现类:
(1)默认使用实现类:TextInputFormat:一次读一行文本,然后将该行的起始偏移量作为key,行内容作为value返回
其他常用实现类:
(1)KeyValueInputFormat:每一行均为一条记录,被分隔符分割为key/value,默认分隔符是制表符
(2)NlineInputFormat:按照指定的行数N来划分切片
(3)CombineTextInputFormat:可以把多个小文件合并成一个切片处理,提高处理效率
用户自定义InputFormat
(1)必须实现Writable接口
(2)重写序列化和反序列化方法
(3)如果需要将自定义的bean放在key中传输,则还需要实现Comparable接口
2)逻辑处理接口:Mapper

用户根据业务需求实现:map()、setup()、cleanup()
3)Partitioner分区
默认实现HashPartitioner
逻辑是根据key的哈希值和numReduces来返回一个分区号 :key.hashCode() & Integer.MAXVALUE % numReduces
自定义分区:
如果业务上有特别的需求,可以自定义分区
(1)自定义类继承Partitioner,重写getPartition()方法
(2)在Job驱动中,设置自定义Partitioner,并根据自定义Partitioner的逻辑设置相应数量的ReduceTask
4)Comparable排序
当我们用自定义的对象作为key来输出时,就必须要实现WritableComparable接口,重写其中的compareTo()方法。
MapReduce排序分为:
(1)部分排序:对最终输出的每一个文件进行内部排序。
(2)全排序:对所有数据进行排序,通常只有一个Reduce。
(3)二次排序:排序的条件有两个。
5)Combiner合并
Combiner合并可以提高程序执行效率,减少IO传输。但是使用时必须不能影响原有的业务处理结果。
6)Reduce端分组: GroupingComparator
在Reduce端对key进行分组。应用于:在接收的key为bean对象时,想让一个或几个字段相同(全部字段比较不相同)的key进入到同-个reduce方法时 ,可以采用分组排序。
7)逻辑处理接口: Reducer
用户根据业务需求实现其中三个方法: reduce() setup0 cleanup ()
8)输出数据接口OutputFormat
默认使用实现类:
默认实现类是TextOutputFormat ,功能逻辑是:将每一 个KV对,向目标文本文件输出一-行。
二进制形式:
将SequenceFileOutputFormat输出作为后续MapReduce任务的输入,这便是一种好的输出格式 ,因为它的格式紧凑,很容易被压缩。
用户自定义OutputFormat:
(1)自定义一个类继承FileOutputFormat。
(2)改写RecordWriter ,具体改写输出数据的方法write()。
(3)要将自定义的输出格式组件设置到job中
4.2 Hadoop优化
4.2.1 MapReduce慢的原因
MapReduce程序效率的瓶颈在于两点:
1)计算机性能
CPU、内存、磁盘健康、网络
2)I/O 操作优化
(1)数据倾斜
(2) Map和Reduce数设置不合理
(3) Map运行时间太长,导致Reduce等待过久
(4)小文件过多
(5)大量的不可分块的超大文件
(6) Spill次数过多
(7) Merge次数过多等。
4.2.2 MapReduce优化方法
MapReduce优化方法主要从六个方面考虑:数据输入、Map阶段、Reduce阶段、IO传输、数据倾斜问题和常用的调优参数。
1)数据输入
(1)合并小文件:在执行MR任务前将小文件进行合并,大量的小文件会产生大量的Map任务,增大Map任务装载次数,而任务的装载比较耗时,从而导致MR运行较慢。
(2)采用CombineTextInputFormat来作为输入,解决输入端大量小文件场景。
2)Map阶段
(1)减少溢写(Spill)次数:通过调整io.sort.mb及sort.pill.percent参数值,增大触发Spill的内存上限,减少Spill次数,从而减少磁盘IO。
(2)减少合并(Merge)次数:通过调整io.sort.factor参数,增大Merge的文件数目,减少Merge的次数,从而缩短MR处理时间。
(3)在Map之后,不影响业务逻辑前提下,先进行Combine处理, 减少IO。
3)Reduce阶段
(1)合理设置Map和Reduce数:两个都不能设置太少,也不能设置太多。太少,会导致Task等待,延长处理时间;太多,会导致Map、Reduce任务间竞争资源,造成处理超时等错误。
(2)设置Map、Reduce共存 :调整slowstart.completedmaps参数,使Map运行到一定程度后, Reduce也开始运行,减少Reduce的等待时间。
(3)规避使用Reduce :因为Reduce在用于连接数据集的时候将会产生大量的网络消耗。
(4)合理设置Reduce端的Buffer :默认情况下,数据达到一个阈值的时候,Buffer中的数据就会写入磁盘,然后Reduce会从磁盘中获得所有的数据。也就是说, Buffer和Reduce是没有直接关联的,中间多次写磁盘->读磁盘的过程,既然有这个弊端,那么就可以通过参数来配置,使得Buffer中的一部分数据可以直接输送到Reduce ,从而减少IO开销: mapreduce.reduce.input buffer.percent ,默认为0.0。当值大于0的时候,会保留指定比例的内存读Buffer中的数据直接拿给Reduce使用。这样一来,设置Buffer需要内存,读取数据需要内存, Reduce计算也要内存,所以要根据作业的运行情况进行调整。
4)IO传输
(1)采用数据压缩的方式,减少网络IO的的时间。安装Snappy和LZO压缩编码器。
(2)使用SequenceFile进制文件。
5)数据倾斜问题
(1)现象:
数据频率倾斜:某一个区域的数据要远大于其他区域
数据大小倾斜:部分记录的大小远远大于平均值
(2)优化
抽样和范围分区:可以通过对原始数据进行抽样得到的结果集来预设分区边界值
自定义分区:基于输出件的业务知识进行自定义分区
combine:使用combine可以大量减少数据倾斜
Mapjoin:尽量避免reduce join
6)常见参数调优
1)资源相关参数
(1)以下参数是在用户自己的MR应用程序中配置就可以生效(mapred-default.xml)
| 配置参数 | 参数说明 |
|---|---|
mapreduce.map.memory.mb | 一个MapTask可使用的资源上限(单位:MB),默认为1024。如果MapTask实际使用的资源量超过该值,则会被强制杀死。 |
| mapreduce.reduce.memory.mb | 一个ReduceTask可使用的资源上限(单位:MB),默认为1024。如果ReduceTask实际使用的资源量超过该值,则会被强制杀死。 |
| mapreduce.map.cpu.vcores | 每个MapTask可使用的最多cpu core数目,默认值: 1 |
| mapreduce.reduce.cpu.vcores | 每个ReduceTask可使用的最多cpu core数目,默认值: 1 |
| mapreduce.reduce.shuffle.parallelcopies | 每个Reduce去Map中取数据的并行数。默认值是5 |
| mapreduce.reduce.shuffle.merge.percent | Buffer中的数据达到多少比例开始写入磁盘。默认值0.66 |
| mapreduce.reduce.shuffle.input.buffer.percent | Buffer大小占Reduce可用内存的比例。默认值0.7 |
| mapreduce.reduce.input.buffer.percent | 指定多少比例的内存用来存放Buffer中的数据,默认值是0.0 |
(2)在YARN启动之前就配置在服务器的配置文件中才能生效(yarn-default.xml)
| 配置参数 | 参数说明 |
|---|---|
| yarn.scheduler.minimum-allocation-mb | 给应用程序Container分配的最小内存,默认值:1024 |
| yarn.scheduler.maximum-allocation-mb | 给应用程序Container分配的最大内存,默认值:8192 |
| yarn.scheduler.minimum-allocation-vcores | 每个Container申请的最小CPU核数,默认值:1 |
| yarn.scheduler.maximum-allocation-vcores | 每个Container申请的最大CPU核数,默认值:32 |
| yarn.nodemanager.resource.memory-mb | 给Containers分配的最大物理内存,默认值:8192 |
(3)Shuffle性能优化的关键参数,应在YARN启动之前就配置好(mapred-default.xml)
| 配置参数 | 参数说明 |
|---|---|
| mapreduce.task.io.sort.mb | Shuffle的环形缓冲区大小,默认100m |
| mapreduce.map.sort.spill.percent | 环形缓冲区溢出的阈值,默认80% |
2)容错相关参数(MapReduce性能优化)
| 配置参数 | 参数说明 |
|---|---|
mapreduce.map.maxattempts | 每个Map Task最大重试次数,一旦重试参数超过该值,则认为Map Task运行失败,默认值:4。 |
| mapreduce.reduce.maxattempts | 每个Reduce Task最大重试次数,一旦重试参数超过该值,则认为Map Task运行失败,默认值:4。 |
| mapreduce.task.timeout | Task超时时间,经常需要设置的一个参数,该参数表达的意思为:如果一个Task在一定时间内没有任何进入,即不会读取新的数据,也没有输出数据,则认为该Task处于Block状态,可能是卡住了,也许永远会卡住,为了防止因为用户程序永远Block住不退出,则强制设置了一个该超时时间(单位毫秒),默认是600000。如果你的程序对每条输入数据的处理时间过长(比如会访问数据库,通过网络拉取数据等),建议将该参数调大,该参数过小常出现的错误提示是:AttemptID:attempt_14267829456721_123456_m_000224_0 Timed out after 300 secsContainer killed by the ApplicationMaster.。 |
4.2.3 HDFS小文件优化方法
1)HDFS小文件弊端
HDFS上每个文件都要在NameNode上建立一个索引,这个索引的大小约为150byte,这样当小文件比较多的时候,就会产生很多的索引文件,一方面会大量占用NameNode的内存空间,另一方面就是索引文件过大使得索引速度变慢。
2)HDFS小文件解决方案
小文件的优化无非以下几种方式:
(1)在数据采集的时候,就将小文件或小批数据合成大文件再上传HDFS。
(2)在业务处理之前,在HDFS上使用MapReduce程序对小文件进行合并。
(3)在MapReduce处理时,可采用CombineTextInputFormat提高效率。





