点击上方“IT那活儿”公众号,关注后了解更多内容,不管IT什么活儿,干就完了!!!
简单来说就是数据分配到各个分区数据量不一致,主要是指shuffle中出现的数据倾斜问题,是由于不同key对应的数据量不同导致不同的task处理的数据量不一致的问题。
例如假设reduce端一共一共需要处理100万的数据,共有3个task来处理,其中第一个和第二个task被各分配了1万数据(处理需要时间1分钟),而第三个task被分配了98万数据(处理需要1小时),那么整个spark作业最终所需要的时间就是1小时,这样看起来明显不合理。1)Spark作业的大部分task都能快速执行,但是有一个或几个task运行的明显比其他task执行的要慢。2)个别task运行过程中会出现OOM错误,多次执行都是某一个task出现的OOM错误,此时可以就有可能出现了数据倾斜。3)检查代码中的shuffle算子,例如reduceByKey、groupByKey、koin等算子,可以根据逻辑来判断此处是否会出现数据倾斜。4)查看日志文件,判断错误出现的代码行,来判断其出现再哪个stagey以及shuffle算子。
1. 预聚合原始数据
1.1 避免shuffle
如果避免了shuffle过程,那么就从根本上避免了数据倾斜产生的可能。假设spark作业数据来源为hive表,多数情况下,spark作业的数据来源也是hive表,这样我们就可以再hive表中提前对数据进行聚合,例如按照key对数据进行分组,将同一key对应的value拼接成一个字符串,那么一个key就对应一条数据了,这样对key对应的value进行处理是就只需进行map操作而无需进行shuffle操作了。当然只是举一个例子,不一定非要按照拼接字符串的操作,实际需求也可能根据key进行累计。1.2 增大每个task的数据量
如果无法对每个key聚合出一条数据,再特定场景下可以考虑扩大key的聚合粒度。假设有100万的数据,当前key的粒度划分是(省,市,年,月),现在我们考虑扩大粒度,将粒度扩大为(省,月),这样key的数量就会减少,key之间对应的数据量差异也会减少,由此可以减轻数据倾斜的问题,这种方法只针对特定场景有效。2. 预处理导致倾斜的key
2.1 过滤
如果允许丢弃某些数据,那么可以考虑过滤掉可能导致数据倾斜的key对应的数据,这样就可避免再spark作业中发生数据倾斜的问题了。2.2 使用随机key
当使用了类似与groupByKey、reduceByKey这样的算子时,可以考虑使用随机key实现双重聚合,如图:
首先通过map算子给每个数据的key添加随机数前缀,将key进行打散,将原来一样的key变成不一样的key,然后进行第一次聚合,这样就可以让原本被一个task处理的数据分散到多个task上进行局部聚合,随后去掉每个key的前缀,再次进行聚合。
此方法对groupByKey、reduceByKey造成的数据倾斜有比较好的效果,仅仅适合聚合类的shuffle的操作,而不适合join类的操作,使用范围较窄。2.3 对倾斜数据单独进行join操作
在spark作业中,如果某个RDD只有一个key,那么在shuffle过程中会默认将此key对应的数据打散,由reduce端不同的task进行处理。所以当有某个key导致数据倾斜时,可以将发生数据倾斜的key对应的RDD数据单独提取出来和其他RDD进行单独join,此时根据spark运行机制,此RDD数据会在shuffle阶段被分散到多个task中进行join操作。
对于RDD中的数据,可以将其转换为一个中间表,或是直接使用countByKey()看一下这个RDD中各个key对应的数据量,如果发现整个RDD就一个key对应的数据量特别多,就可以考虑使用此方法。
当数据量非常大时,可以通过采样的方法获取一部分数据,然后分析数据中可能导致数据倾斜的key,那么就可以将该Key对应的数据单独提取出来。如果一个RDD中导致数据倾斜的key很多,那么该方法就不适合。3. 提高reduce并行度
通过提高reduce并行度也可以减少每个task处理的数据量,当上面的方法效果不明显的话可以考虑该方法。在大部分shuffle算子中,都可以传入一个设置并行度的参数,比如reduceByKey(500),500就代表了shuffle过程中reduce端的并行度,再进行shuffle操作的时候就是分配500个reduce task。再spark sql类似的操作中,可以通过设置spark.sql.shuffle.partitions参数,该参数就代表了shuffle过程中reduce task的并行度,默认值是200,一般是不够用的,可以适当的增大该参数。增加shuffle reduce task数量,可以让原本分给一个task多个key数据量分配给多个task,从而减少每个task处理的数据量。例如假设原本有10个key,每个key对应的数据量都是1万,这10个key本来都会分配给同一个task,那么这个task就要处理10万数据,如果增加reduce并行度,那么可能每个task就只处理1万数据,从而提高整个作业的执行速度。1)第1点和第2点中的方法从根本上避免了数据倾斜的产生,但是该方法并没有改变数据倾斜的问题和本质,该方法更像是减轻数据压力。2)该方法会在一定程度上减轻数据处理压力,但是再某些情况下,特别是极端情况下,例如某个key对应的数据远远大于其他数据,不在一个数量级,那么该方法并不能有效的解决数据倾斜的问题,就要考虑其他方法了。4. 使用map join
一般情况下,join操作都会执行shuffle操作,并且执行的是reduce join操作,也就是将所有相同的key对应的数据汇聚到一个reduce task中,然后再进行join,普通join过程如下:很明显是经过了shuffle过程,就是将相同key对应数据拉取到同一个shuffle read task中再进行join。但是如果join的一个RDD比较小,那么可以采用广播的方式将小RDD全量数据+map算子来实现与join相同的效果,也就是map join,此时不会产生shuffle操作,也就不会产生数据倾斜的。注意:RDD并不能直接进行广播,需要通过collect算子将数据拉取到Driver内存中再进行广播。将小RDD通过collect算子拉取到Driver端内存中,然后对其创建一个broadcast变量 ,然后对另一个RDD执行map算子,再算子函数内,从broadcast变量中获取小RDD全量数据,与当前RDD按照key进行比对,如果连接的key相同,就将两个RDD数据用需要的方式连接起来。这种情况下根本就不会产生shuffle操作,就避免了数据倾斜的问题。此方法适合join操作中有一个RDD数据量较小时。
由于spark广播变量相当于再executor中保存一个副本,如果两个RDD数据都非常大,这种情况下如果强行使用map join就有可能导致OOM。