暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

Hadoop 之 MapReduce总结与优化

BigData Scholar 2021-06-29
1368

4 Hadoop之MapReduce总结与优化

4.1 MapReduce开发总结


MapReduce整个工作流程如上图所示,具体包含以下步骤:


1)数据输入接口

默认使用实现类:

(1)默认使用实现类:TextInputFormat:一次读一行文本,然后将该行的起始偏移量作为key,行内容作为value返回

其他常用实现类:

(1)KeyValueInputFormat:每一行均为一条记录,被分隔符分割为key/value,默认分隔符是制表符

(2)NlineInputFormat:按照指定的行数N来划分切片

(3)CombineTextInputFormat:可以把多个小文件合并成一个切片处理,提高处理效率

用户自定义InputFormat

(1)必须实现Writable接口

(2)重写序列化和反序列化方法

(3)如果需要将自定义的bean放在key中传输,则还需要实现Comparable接口


2)逻辑处理接口:Mapper

用户根据业务需求实现:map()、setup()、cleanup()


3)Partitioner分区

默认实现HashPartitioner

逻辑是根据key的哈希值和numReduces来返回一个分区号 :key.hashCode() & Integer.MAXVALUE % numReduces

自定义分区:

如果业务上有特别的需求,可以自定义分区

(1)自定义类继承Partitioner,重写getPartition()方法

(2)在Job驱动中,设置自定义Partitioner,并根据自定义Partitioner的逻辑设置相应数量的ReduceTask


4)Comparable排序

当我们用自定义的对象作为key来输出时,就必须要实现WritableComparable接口,重写其中的compareTo()方法。

MapReduce排序分为:

(1)部分排序:对最终输出的每一个文件进行内部排序。

(2)全排序:对所有数据进行排序,通常只有一个Reduce。

(3)二次排序:排序的条件有两个。


5)Combiner合并

Combiner合并可以提高程序执行效率,减少IO传输。但是使用时必须不能影响原有的业务处理结果。


6)Reduce端分组: GroupingComparator

在Reduce端对key进行分组。应用于:在接收的key为bean对象时,想让一个或几个字段相同(全部字段比较不相同)的key进入到同-个reduce方法时 ,可以采用分组排序。


7)逻辑处理接口: Reducer

用户根据业务需求实现其中三个方法: reduce() setup0 cleanup ()


8)输出数据接口OutputFormat

默认使用实现类:

默认实现类是TextOutputFormat ,功能逻辑是:将每一 个KV对,向目标文本文件输出一-行。

二进制形式:

将SequenceFileOutputFormat输出作为后续MapReduce任务的输入,这便是一种好的输出格式 ,因为它的格式紧凑,很容易被压缩。

用户自定义OutputFormat:

(1)自定义一个类继承FileOutputFormat。

(2)改写RecordWriter ,具体改写输出数据的方法write()。

(3)要将自定义的输出格式组件设置到job中


4.2 Hadoop优化

4.2.1 MapReduce慢的原因

MapReduce程序效率的瓶颈在于两点:

1)计算机性能

CPU、内存、磁盘健康、网络

2)I/O 操作优化

(1)数据倾斜

(2) Map和Reduce数设置不合理

(3) Map运行时间太长,导致Reduce等待过久

(4)小文件过多

(5)大量的不可分块的超大文件

(6) Spill次数过多

(7) Merge次数过多等。

4.2.2 MapReduce优化方法

MapReduce优化方法主要从六个方面考虑:数据输入、Map阶段、Reduce阶段、IO传输、数据倾斜问题和常用的调优参数。

1)数据输入

(1)合并小文件:在执行MR任务前将小文件进行合并,大量的小文件会产生大量的Map任务,增大Map任务装载次数,而任务的装载比较耗时,从而导致MR运行较慢。

(2)采用CombineTextInputFormat来作为输入,解决输入端大量小文件场景。

2)Map阶段

(1)减少溢写(Spill)次数:通过调整io.sort.mb及sort.pill.percent参数值,增大触发Spill的内存上限,减少Spill次数,从而减少磁盘IO。

(2)减少合并(Merge)次数:通过调整io.sort.factor参数,增大Merge的文件数目,减少Merge的次数,从而缩短MR处理时间。

(3)在Map之后,不影响业务逻辑前提下,先进行Combine处理, 减少IO。

3)Reduce阶段

(1)合理设置Map和Reduce数:两个都不能设置太少,也不能设置太多。太少,会导致Task等待,延长处理时间;太多,会导致Map、Reduce任务间竞争资源,造成处理超时等错误。

(2)设置Map、Reduce共存 :调整slowstart.completedmaps参数,使Map运行到一定程度后, Reduce也开始运行,减少Reduce的等待时间。

(3)规避使用Reduce :因为Reduce在用于连接数据集的时候将会产生大量的网络消耗。

(4)合理设置Reduce端的Buffer :默认情况下,数据达到一个阈值的时候,Buffer中的数据就会写入磁盘,然后Reduce会从磁盘中获得所有的数据。也就是说, Buffer和Reduce是没有直接关联的,中间多次写磁盘->读磁盘的过程,既然有这个弊端,那么就可以通过参数来配置,使得Buffer中的一部分数据可以直接输送到Reduce ,从而减少IO开销: mapreduce.reduce.input buffer.percent ,默认为0.0。当值大于0的时候,会保留指定比例的内存读Buffer中的数据直接拿给Reduce使用。这样一来,设置Buffer需要内存,读取数据需要内存, Reduce计算也要内存,所以要根据作业的运行情况进行调整。

4)IO传输

(1)采用数据压缩的方式,减少网络IO的的时间。安装Snappy和LZO压缩编码器。

(2)使用SequenceFile进制文件。

5)数据倾斜问题

(1)现象:

  • 数据频率倾斜:某一个区域的数据要远大于其他区域

  • 数据大小倾斜:部分记录的大小远远大于平均值

(2)优化

  • 抽样和范围分区:可以通过对原始数据进行抽样得到的结果集来预设分区边界值

  • 自定义分区:基于输出件的业务知识进行自定义分区

  • combine:使用combine可以大量减少数据倾斜

  • Mapjoin:尽量避免reduce join

6)常见参数调优

1)资源相关参数

(1)以下参数是在用户自己的MR应用程序中配置就可以生效(mapred-default.xml)

配置参数参数说明

mapreduce.map.memory.mb

一个MapTask可使用的资源上限(单位:MB),默认为1024。如果MapTask实际使用的资源量超过该值,则会被强制杀死。
mapreduce.reduce.memory.mb一个ReduceTask可使用的资源上限(单位:MB),默认为1024。如果ReduceTask实际使用的资源量超过该值,则会被强制杀死。
mapreduce.map.cpu.vcores每个MapTask可使用的最多cpu core数目,默认值: 1
mapreduce.reduce.cpu.vcores每个ReduceTask可使用的最多cpu core数目,默认值: 1
mapreduce.reduce.shuffle.parallelcopies每个Reduce去Map中取数据的并行数。默认值是5
mapreduce.reduce.shuffle.merge.percentBuffer中的数据达到多少比例开始写入磁盘。默认值0.66
mapreduce.reduce.shuffle.input.buffer.percentBuffer大小占Reduce可用内存的比例。默认值0.7
mapreduce.reduce.input.buffer.percent指定多少比例的内存用来存放Buffer中的数据,默认值是0.0

(2)在YARN启动之前就配置在服务器的配置文件中才能生效(yarn-default.xml)

配置参数参数说明
yarn.scheduler.minimum-allocation-mb给应用程序Container分配的最小内存,默认值:1024
yarn.scheduler.maximum-allocation-mb给应用程序Container分配的最大内存,默认值:8192
yarn.scheduler.minimum-allocation-vcores每个Container申请的最小CPU核数,默认值:1
yarn.scheduler.maximum-allocation-vcores每个Container申请的最大CPU核数,默认值:32
yarn.nodemanager.resource.memory-mb给Containers分配的最大物理内存,默认值:8192

(3)Shuffle性能优化的关键参数,应在YARN启动之前就配置好(mapred-default.xml)

配置参数参数说明
mapreduce.task.io.sort.mbShuffle的环形缓冲区大小,默认100m
mapreduce.map.sort.spill.percent环形缓冲区溢出的阈值,默认80%

2)容错相关参数(MapReduce性能优化)

配置参数参数说明

mapreduce.map.maxattempts

每个Map Task最大重试次数,一旦重试参数超过该值,则认为Map Task运行失败,默认值:4。
mapreduce.reduce.maxattempts每个Reduce Task最大重试次数,一旦重试参数超过该值,则认为Map Task运行失败,默认值:4。
mapreduce.task.timeoutTask超时时间,经常需要设置的一个参数,该参数表达的意思为:如果一个Task在一定时间内没有任何进入,即不会读取新的数据,也没有输出数据,则认为该Task处于Block状态,可能是卡住了,也许永远会卡住,为了防止因为用户程序永远Block住不退出,则强制设置了一个该超时时间(单位毫秒),默认是600000。如果你的程序对每条输入数据的处理时间过长(比如会访问数据库,通过网络拉取数据等),建议将该参数调大,该参数过小常出现的错误提示是:AttemptID:attempt_14267829456721_123456_m_000224_0 Timed out after 300 secsContainer killed by the ApplicationMaster.。

4.2.3  HDFS小文件优化方法

1)HDFS小文件弊端

HDFS上每个文件都要在NameNode上建立一个索引,这个索引的大小约为150byte,这样当小文件比较多的时候,就会产生很多的索引文件,一方面会大量占用NameNode的内存空间,另一方面就是索引文件过大使得索引速度变慢。

2)HDFS小文件解决方案

小文件的优化无非以下几种方式:

(1)在数据采集的时候,就将小文件或小批数据合成大文件再上传HDFS。

(2)在业务处理之前,在HDFS上使用MapReduce程序对小文件进行合并。

(3)在MapReduce处理时,可采用CombineTextInputFormat提高效率。



文章转载自BigData Scholar,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论