
● LAS Spark基本原理
● LAS Spark性能更高
● LAS Spark功能更多
● LAS Spark未来规划


/ 整体架构




/ 基本概念
● 一个SQL是如何执行的?

● Spark 基本组件有哪些?

● 数据是如何组织存储的?


/ 如何算得更少?/
●Partition Skipping:仅读取必要的分区。例如下图中的分区过滤条件date = ‘20230101’,经过Partition Skipping,实际只需要读红色部分的数据文件。
● File Skipping:仅读取必要的文件。经过Partition Skipping后,对于需要读取的文件,可基于文件级别的索引等信息进一步过滤出必要的文件。
● RowGroup Skipping:仅读取必要的数据块。经过前两步的Data Skipping得到文件集合,但依然没有必要读取这些文件内的所有数据。由于Parquet文件是基于 RowGroup的方式分块存储的,并且Parquet Footer中存储了每个RowGroup的 min/max等索引信息,因此可以结合Data Filter进一步过滤出必要的RowGroup。例如下图中的过滤条件a=10,RowGroup2中的a列min/max为[11, 99],因此 RowGroup2不可能存在a=10的记录,最终只需要读取RowGroup1即可。

1. Range Partition

2. LocalSort

3. 合并小文件

● MergeFile:主要适用分区数据量均匀的场景,即每个分区的总数据量差异不大,且分区内部均有小文件。这种场景主要是因为Spark任务的最后一个stage并行度较大导致,如下左图,InsertInto之前的最后一个Operator的并行度为7,则最终也会产出7个文件。
解决这种问题的思路也比较简单,直接在Operator和InsertInto算子之间增加一个 Exchange算子,做一次整体Shuffle,将7个并行度调整为2个并行度,最终产出2个文件。
● FragPartitionCompaction:主要适用分区数据量不均匀的场景,即每个分区的总数据量有一定差异,仅部分分区内存在小文件。对于这种场景,如果依然使用 MergeFile增加整体Shuffle的方式,则无法为每个分区都产出合适的文件大小,虽然也可以解决小文件问题,但部分分区文件则会过大,同时还会引入比较大的性能损耗。
为此我们提出了FragPartitionCompaction,主要思路是在InsertInto算子执行完成之后,会加一个Operator算子去检测产出的分区中是否存在小文件,然后仅对存在小文件的分区进行文件合并。如下右图,检测到event=B和event=C分区存在小文件,仅会对这两个分区中的文件做合并,event=A分区不会做任何操作。

4. Prewhere

5. Dynamic BloomFilterJoin

/ 如何智能计算?/
● 数据整体重分布,引入额外计算成本以及网络开销。
● 数据倾斜,出现长尾Task,拖慢整个任务执行。
● 并行度设置困难,任务并发不够,任务整体执行慢,容易引起OOM;任务并发度过大,Driver压力较大,导致任务失败。

1. Bucket


2. AQE Skewed Join


● 提高数据倾斜的识别能力
● 提高倾斜数据的切分均匀程度
● 支持更多的场景,提高AQE SkewedJoin的覆盖范围,包括 JoinWithAggOrWin、MultipleSkewedJoin、SkewedJoinWithUnion、MultipleJoinWithAggOrWin。
● 支持引入Shuffle的强制优化


3. AQE ShuffleHashJoin
● 小表数据量必须小于指定大小,默认10MB*并行度
● 大表数据量必须大于等于小表数据量的3倍

/ 如何算得更快?/
1. 语言层面

2. 贴近硬件

3.算子层面

● ShuffleHashJoin
● PushedOrderLimit
select a, b, c, agg_f0, agg_f1, agg_f2from tgroup by a, b, corder by c, b, [agg_f0]...limit 100-- 限制条件: order by 的前缀字段需要是 group by 字段的子集
/ 如何预先计算
1. 物化列
● 增加大量无效IO:即使只查询一个子列,也需要将整个嵌套类型列的数据读入内存(实测存在80%+的无效IO);
● 增加额外计算:每次查询涉及嵌套类型列时,都需要在内存中对嵌套类型列进行解析;
●嵌套类型不支持filter push down:对于where people.age>10,虽然people.age是原子类型,但Spark还是会先将people列所有数据读入内存,提取出age,最后再进行filter过滤出age > 10的数据;
● 无法进行向量化读取:只支持原子类型的向量化读取(实测使用向量化读取会有30%~50%的性能提升)。
为解决此类问题,我们引入物化列的解决方案,写入时将高频子列物化下来,并在查询时,由Spark引擎负责自动将用户的查询rewrite为读取物化列,从而提升整体查询速度。
例如下图中的物化列读取流程,我们在base_table表中增加一个age列(物化列),并且绑定一个表达式people.age(物化表达式),写入过程中便将people.age提前写入到age列中,查询时,Spark引擎会将执行计划中的people.age改写成age,从而直接读取age列的值。
以此思路,我们拓展物化列到更多场景,覆盖几乎所有表达式。通过复用重复表达式的方式,平均性能可提升60%。

2. 物化视图



/ 自研 UIMeta

/ 深度融合数据湖



产品介绍






文章转载自字节跳动数据平台,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。




