暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

Spark SQL 调优分享

2714

 文本介绍了Spark sql的常用参数,以及如何通过这些参数来控制作业的并行度、资源调度、shuffle、执行计划等来提升任务的运行效率,优化存储空间和资源利用率,其中重点关注了资源、数据倾斜、join优化方案。并介绍了Spark 3.0的新特性Adaptive Query Execution。最后通过4个实际的优化案例来进一步介绍常用的优化方法。


01


常用参数和优化方法


1. 资源调优

spark 的资源分配粒度最小是container,即一个executor带的cpu和内存资源。资源的申请和移除都是通过executor 进程的增加和减少达成

1.1 Executor

Executor 的参数影响了cpu和内存资源的分配,通过设置executor的参数,可以调节实际处理的并行度。
资源最高并行度=executor num * executor core * vcore ratio
Task 数量 = partition 数量

参数

描述

spark.executor.cores

每个executor的CPU核数

spark.vcore.boost.ratio

vcore,虚拟核数。提高cpu利用率

1.1.1 静态分配

参数

描述

spark.executor.instances

静态资源下executor数

1.1.2 动态分配

当任务资源最高并发< Task 数量时,会根据参数配置向yarn申请新的executor
当任务资源最高并发 > Task 数量时,会根据参数配置释放的executor

参数

描述

spark.dynamicAllocation.enabled

动态资源开关

spark.dynamicAllocation.initialExecutor

初始化时启用的executor的个数

spark.dynamicAllocation.minExecutors

最少分配的executor的个数

spark.dynamicAllocation.maxExecutors

最大可分配的executor的个数

spark.dynamicAllocation.executorIdleTimeout

executor的空闲回收时间

spark.dynamicAllocation.schedulerBacklogTimeout

启动新executor,未分配的task等待分配的时间

1.1.3 Task 数量

不可切割文件,partition 数量为文件数

参数

描述

spark.sql.files.maxPartitionBytes

读取可切分文件时,每个Partition Split的最大值

spark.sql.shuffle.partitions

Reduce 后partition 数量

区分:application, job,stage, task

    a. Application:初始化一个 SparkContext 即生成一个 Application

    b. Job:一个 Action 算子就会生成一个 Job

    c. Stage: 遇到一个宽依赖划分一个 Stage。

    d. Task: Stage 是一个 TaskSet,将 Stage 划分的结果发送到不同的 Executor 执行即为一个

1.2 统一内存管理

参数

描述

spark.executor.memory

Executor 堆内内存

spark.storage.storageFraction

Storage内存在堆内内存占比

spark.memory.offHeap.enabled

是否开启堆外内存

spark.memory.offHeap.size

堆外内存大小

1.2.1 堆内内存

a. 存储内存:缓存 RDD 和 Broadcast变量

b. 执行内存:Shuffle 数据

c. Other:用户定义的数据结构

d. System Reserved: 保留内存防止OOM,(非序列化对象周期采样估算,JVM回收延迟)

动态内存管理

1.2.2 堆外内存

提高 Shuffle 时排序的效率,优化内存使用,减少垃圾回收对应用的影响。
堆外内存可以被精确地申请和释放,所以相比堆内内存来说降低了管理的难度,也降低了误差

2. Adaptive Query Execution

Spark 3.0 接入了 AQE,可以根据执行过程中的中间数据优化后续执行,从而提高整体执行效率。核心在于两点

    a. 执行计划可动态调整

    b. 调整的依据是中间结果的精确统计信息

参数

描述

spark.sql.adaptive.enabled

Adaptive execution开关

2.1 Reduce task 数量控制

未开启AQE,由spark.sql.shuffle.partition 参数控制,控制所有阶段的reduce个数,
开启后 reduce partition的数量初始切分为maxNumPostShufflePartitions,根据配置的targetPostShuffleInputSize大小,对于小文件map端的partition进行合并。并限制分区个数不少于minNumPostShufflePartitions

参数

描述

spark.sql.adaptive.minNumPostShufflePartitions

reduce个数区间最小值,为了防止分区数过小

spark.sql.adaptive.maxNumPostShufflePartitions

reduce个数区间最大值,同时也是shuffle分区数的初始值

spark.sql.adaptive.shuffle.targetPostShuffleInputSize

动态合并reducer的partition。map端多个partition 合并后数据阈值,小于阈值会合并

2.2 Map join 触发限制

未开启AQE 由spark.sql.autoBroadcastJoinThreshold 参数限制,缺点是从application执行一开始就确定,由于数据统计信息的缺失或不准确,或者是过滤条件的影响会导致最开始的估算不准确,导致执行的计划不是最优计划。
开启AQE之后,在join前会根据准确的上游数据来指定执行计划。

参数

描述

spark.sql.adaptive.join.enabled

自动转换sortMergeJoin到broadcast join 开关

spark.sql.adaptiveBroadcastJoinThreshold

将sortMergeJoin转换成broadcast join 阈值

2.3 数据倾斜优化

可解决 Join 时数据倾斜问题,根据预先的配置在作业运行过程中自动检测是否出现倾斜,并对倾斜的partition进行拆分由多个task来进行处理,最后通过union进行结果合并。
满足倍数限制且满足(大小限制 或者 条数限制 )才会被当做倾斜的partition处理

参数

描述

spark.sql.adaptive.skewedJoin.enabled

倾斜处理开关

spark.sql.adaptive.skewedPartitionFactor

倾斜的partition/同一stage的中位数 不能小于该值

spark.sql.adaptive.skewedPartitionSizeThreshold

倾斜的partition大小不能小于该值

spark.sql.adaptive.skewedPartitionRowCountThreshold

倾斜的partition条数不能小于该值

spark.sql.adaptive.skewedPartitionMaxSplits

被判定为数据倾斜后最多会被拆分成的份数

3. Join调优

3.1 多种join的差别

3.1.1 Broadcast Join

适用:大表join极小表
限制:
    1. 对广播的表大小有比较大的限制。见 Map join 触发限制
    2. 对driver和executor的内存、带宽等资源增加压力



3.1.2 Shuffle Hash Join

适用:大表join小表
限制:
    1. spark.sql.join.preferSortMergeJoin = false(默认是true)
    1. 每个分区的平均大小不超过BroadcastJoinThreshold限制
    1. 大表大小 > 3*小表大小

3.1.3 Sort Merge Join

适用:大表join大表

3.1.4 Bucket join

优点:

    a. 根据bucket key(或超集)进行 join、group by, 不会进行shuffle操作

    b. 可兼容历史数据,根据修改时间的后一天作为bucket起始分区,之前的分区还是被视为非bucket

    c. 可修改分桶数量

    d. 提升压缩比

    e. 提升过滤速度

限制:

    a. 两张表必须分bucket

    b. 两张表的bucket是倍数关系

    c. Bucket key要与join\group key相等或者是子集

    d. 不支持full join

    e. 在写入bucket表的执行计划的最后一个节点按照分桶key join/group by 才能消除shuffle操作

3.2 优化场景

3.2.1 大表join小表

采用map join。可以适当增加broadcast的存储限制,加大executor和driver内存
采用hash join。

3.2.2 大表join大表

a. 提前过滤:
    i. 过滤掉不需要参与join的数据。
    ii. 如果A表left join B表。主表A只有少量的key在B表中,可以提前把A表的key聚合,广播到B表。预先过滤B表无用数据
b. 采用Bucket join

4. 数据倾斜调优

表现

部分task执行非常慢,或者报OOM错误

解决方案

    a. spark 3.0 可以开启AE的数据倾斜优化

    b. 判断这些key是不是可以提前过滤掉

    c. 小表改成map join去除shuffle

    d. 增加reduce并行度,减少shuffle后每个task的数据量,适用于有较多 key 对应的数据量都比较大

    的情况,一定情况下可以缓解数据倾斜,但是浪费资源

    e. 将聚合分为两步。(1) 将key加前缀打散后分散到多个task上做聚合 (2) 去除随机值聚合

    f. 将数据拆成两份,找到倾斜的key,对这些key单独加盐按照4处理之后,在union其他非倾斜数据的聚合结果

5. 其他调优

5.1 本地化调优

5.1.1 本地化级别

名称

解析

PROCESS_LOCAL

进程本地化,task 和数据在同一个Executor 中

NODE_LOCAL

节点本地化,task 和数据在同一个节点中,但是 task 和数据不在同一个 Executor中,数据需要在进程间进行传输。

RACK_LOCAL

机架本地化,task 和数据在同一个机架的两个节点上,数据需要通过网络在节点之间进行传输。

NO_PREF

对于 task 来说,从哪里获取都一样,没有好坏之分。

ANY

task 和数据可以在集群的任何地方,而且不在一个机架中,性能最差。

参数

描述

spark.locality.wait

本地化等待时长

Driver 会对每一个 stage 的 task 进行分配,从最优节点开始分配,超过等待阈值,会自动降级到下个本地化级别。对于shuffle大的任务,为减少网络传输,可适当调节等待时长。

5.2 Shuffle调优

参数

描述

spark.shuffle.io.maxRetries

reduce 端拉取数据重试次数

spark.shuffle.io.retryWait

reduce 端拉取数据时间间隔

spark.shuffle.hdfs.enabled

map 端开启双写

spark.shuffle.hdfs.replication

写hdfs的副本数

spark.shuffle.file.buffer

map 端拉取数据缓冲区大小

spark.reducer.maxSizeInFlight

reduce 端拉取数据缓冲区大小

    a. GC导致shuffle 文件拉取失败

执行 GC 会影响 Executor 内所有的工作,可以适当调节大maxRetries和retryWait。

    b. 长任务,shuffle量大的任务,FetchFailedException 报错

建议开启spark.shuffle.hdfs.enabled,默认shuffle中map端只存储数据到本地磁盘上,当IO和CPU满载时,会导致reducer从map端拉取数据失败。
开启hdfs.enabled后会将mapper存储到本地shuffle数据多存储一份到HDFS上,当reducer获取不到shuffle数据时,会从HDFS拉取
可适当调大 spark.shuffle.hdfs.replication

    c. Map 端处理大量数据

调节shuffle.file.buffer,来调大map端缓冲区大小,避免频繁的磁盘IO操作。

    d. Reduce 端处理大量数据

如果内存资源较为充足,适当增加reducer.maxSizeInFlight,来增加拉取数据缓冲区的大小,可以减少拉取数据的次数,也就可以减少网络传输的次数,进而提升性能

02


实际优化案例

1. 大文件优化案

目的:限定统一的cpu和内存资源的情况下,优化大文件的处理速度并减小存储压力
限定条件:

spark.dynamicAllocation.enabled = false;
spark.executor.instances = 1;
spark.executor.cores = 3;

实验结果:

 同样的资源,从耗时上来看实验5相较于1,性能有10倍的提升,并且存储也降低了12%

总结:

    a. 采取合适的文件压缩格式,可提高任务执行效率同时减少存储压力。

    b. 在内存充足的情况下可以适当调节vcore

    c. 调节vcore如果内存资源不够,会导致GC压力过大,反而执行效率会退化

2. Bucket join 优化案例

目的:优化运行速度,并减小存储。表5.6T,右表635M,两表根据多个字段join,ABjoin的压力在于A表的shuffle耗时. 

限定条件:

spark.sql.adaptive.maxNumPostShufflePartitions=10000;
spark.executor.memory=10g;
spark.dynamicAllocation.maxExecutors=1000;

实验结果:

对比1和4,从普通join转为bucket join,耗时优化了37%,大表存储优化了48%
对比4和5,如果减少了大表的一次group操作,耗时优化了50%

总结:

    a. 对大表分bucket可以进一步提高压缩率,但是对小表分bucket由于会增加大量小文件,未最大化压缩收益,反而可能会减低压缩率

    b. 要设置合理的bucket数量,提高任务运行的并行度

    c. 在开始调优之前,先分析sql是否可以优化。优化sql带来的性能提升效率可能大于调参优化

3 通过增加资源提高运行效率案例

目的:提高运行速度
原始参数和运行状态:

spark.sql.adaptive.maxNumPostShufflePartitions=3000;
spark.dynamicAllocation.maxExecutors=250;

优化结果运行耗时减少了近65%

spark.sql.adaptive.maxNumPostShufflePartitions=10000;
spark.dynamicAllocation.maxExecutors=1000;

总结:

    a. 合理的调节Reduce 的partition数量可以提升运行效率

    b. 在资源满足要求以及任务数多的情况下可以增加cpu资源来提升运行并行度

4 减少资源浪费率案例

原始参数和运行状态:

spark.shuffle.hdfs.enabled=true;
spark.sql.adaptive.maxNumPostShufflePartitions=3000;
spark.executor.memory=10g

优化结果:总cpu浪费下降3倍

优化后参数和运行状态

spark.shuffle.hdfs.enabled=true;

spark.executor.memory=8g;

spark.dynamicAllocation.executorIdleTimeout=30;

spark.dynamicAllocation.maxExecutors=250;

spark.sql.adaptive.maxNumPostShufflePartitions=500;

总结:

    a. Partition 的数据不是越多越好,太多的小文件也会影响运行效率并且浪费资源

    b. 可以适当减少executor空闲回收时间,加快资源释放

    c. 可以通过限制maxExecutors数量来控制最高并行度,减少无效的executor申请,提高资源使用率


文章转载自来自火星的飞船,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论