暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

Partitionwise Join

手机用户2895 2023-12-31
285

背景

什么是Partitionwise Join

IMCI现有hash join算法为grace join。对于T join U,IMCI会将T和U使用相同hash方式分桶(T1,T2,…,Tn)和(U1,U2,…,Un),然后相同的分桶之间进行Join。
对于分区表而言,目前的做法是将其当作逻辑上的一张表进行处理,这样做忽略了数据本身的分区信息,产生不必要的数据扫描以及数据交换开销(多机场景下),并且由于需要对左表整张表构建hashtable,内存占用也会更高。Partitionwise Join(PWJ)则是在关联条件正好为分区键时,利用数据本身分区信息优化join,由于只需要关联分区之间进行Join,因此可减少以上开销。
下图显示了两种PWJ方式,其中左边为Full Partitionwise Join(FPWJ),它是两张表均为JoinKey上的分区表的情况,则让对应分区之间进行Join即可;右边为Partial Partitionwise Join(PPWJ),表示只有一张表是JoinKey上的分区表(记为RT),可将另一张表按照RT的分区方式进行Repartition,之后再采用FPWJ的方式进行Join。

由于单机上进行Repartition操作需要扫一遍数据进行落盘,带来一定的额外开销,因此单机上的PPWJ暂且不考虑实现,而这个过程在MPP上则比较自然。

Partitionwise Join适用场景

场景一:执行内存不足,需要落盘的场景

由于Partitionwise Join可以单独处理每个分区数据,从而所需要的执行内存更少,避免数据落盘。并且,数据访问locality 更好,cache miss更少(IMCI LRU cache/ L3 cache),从而提升性能。
例如HashJoin,Build HashTable 占用内存M,假设N个分区且串行处理分区,则Partitionwise Hash Join的HashTable只需要占用内存M/N,从而可能避免落盘。

场景二:部分分区理论上无需参与Join

例如:A表与B表分区一致,但其中A表的部分分区没有数据,此时执行A表空分区与B表Join时,可以直接返回空结果(Outer Join 另外考虑),从而避免扫描B表的对应分区数据。

场景三:多机场景,减少数据交换

共享磁盘架构使得colocate join 更加容易,而不是分布式数据那样需要用户提前按照分区将数据分布到相同节点上。

设计目标

该功能主要用于解决对于分区表的Join,如何利用分区信息来实现加速,主要有以下目标:

  1. 实现尽可能简单通用,对现有代码侵入较少。这是因为该功能属于特殊场景下的优化,不适合投入过多的人力去实现该功能。
  2. 尽可能不造成性能回退。这需要较为精准的识别其有用的场景。

方案设计

该功能主要分为算子执行、场景匹配两个大模块,分别解决how与when的问题。

Full Partitionwise Join

场景匹配

首先需要判断是否适合做pwj,目前考虑满足以下前提:

  • hash分区,且分区数一致。想要检查不同类型的分区schema是否完全一致仍然有一定的难度,目前支持的分区种类比较多,其检查方法需要每个单独适配,由于行存还没有pwj功能,因此这个没法直接借鉴。
  • join算子下面是两个ctablepartsscan算子。对于多表Join,仅对执行树最底下的join有效。
  • join predicate 为eq,且两个field正好是分区key。

Join Partition Prune

Join 的partition prune主要是确定哪些分区之间要进行Join。从不同维度可以进行如下方式划分:
从分区表schema角度划分,对于不同分区类型的支持:

  1. Hash/Key分区:要求分区schema完全一致。
  2. Range分区:要求分区schema完全一致(一期)。分区粒度可不一致(二期),例如t1表按着月份进行分区,t2表按照季度进行分区,分区之间仍然具有一定的对应关系。
  3. List分区:要求分区schema完全一致(一期)。分区可不完全一致(二期),同上。
  4. 二级分区:要求分区schema完全一致(一期)。仅要求子分区完全一致(二期),例如(p1s1 U p1s2 U p2s1 U p2s2) Join (p1s1 U p1s2 U p2s1 U p2s2) ==> ((p1s1 U p2s1) Join (p1s1 U p2s1)) U ((p1s2 U p2s2) Join (p1s2 U p2s2))。
  5. columns分区:要求Partition Key为Join Key的子集,且两张表的分区schema完全一致 。

从查询角度上来说,主要有两种类型的prune:

  1. 至少有一张表有自己的分区prune条件。形如select * from t1 join t2 on t1.a = t2.a and t1.a < 10这样的查询。这里t1.a<10会被用作t1表的静态prune condition。由于t1.a=t2.a,因此 t2.a<10也可作为 t2的prune condition。这个功能在https://work.aone.alibaba-inc.com/issue/45425115 中已经实现。
  2. 两张表都没有自己的prune条件。形如select * from t1 join t2 on t1.a=t2.a。区别在于t1和t2都没有prune condition,但t1与t2可能分区schema不完全匹配的情况,对应于上文讨论的2~4这几种分区不完全一致的case。

以下是先有系统对不同场景的支持情况。

Oracle PG
支持一级分区schema完全一致
支持一级分区schema不完全一致
支持二级分区schema完全一致
支持二级分区schema不完全一致

Partition Iterator算子实现

执行器层

分区间串行/并行:普通的hash join,将底层分区表当作逻辑上的一个大表进行处理,由于现有hash join算子也能支持并发,因此性能也不差。另一种方案是采用现有集中系统(Oracle/PG)的PWJ方案,分区之间单独处理。如果串行处理,则可以减少内存的占用;如果并行处理,总体预期对Imci hash join性能影响不大,并且需要解决分区数据倾斜、分区数与并发数整除的问题。

原始HashJoin(简化)

单个分区串行处理
**采用串行处理减少内存占用:**由于join内部是多个build、probe的过程,而build和probe之间需要有屏障,如果分区数有1024个,那么就会有2047个屏障,对性能影响有限。在内存充足时,这种方式可能不会带来收益,但是在内存不足时,预期带来明显的性能提升(见附录)。
从实现层面上,要实现各个分区可以单独处理,并且串行处理,目前在IMCI上要实现这样的功能,有以下几种实现方案:

实现方案 优势 劣势
方案一:专门为每种Join 实现一个对应的PWJ算子。例如对于HashJoin,其相应的PWJ在DoOpen阶段不会Build所有的子分区的HashTable,而是在DoFetch阶段逐个分区进行Build 和Probe。 可以针对不同Join算子做一些专门的优化 可扩展性差,灵活性很差。且实现代价比较高,不符合设计目标
方案二:实现一个类似于Oracle的PX PARTITION 算子(见附录),这是一个Iterator算子,用来驱动不同分区之间的Join 执行计划比较简单优雅,只需要在执行计划中指定Iterator要访问哪些分区即可,而Oracle有个PARTITION BY REFERENCE的功能使得保证两张表分区完全一致对用户来说非常简单(MySQL没有)
  1. Partition Iterator的实现没有参考,需要我们自己设计。
  2. 每个Partition使用相同的Join算子
    |
    | 方案三:通过实现一个类似于PG的Append操作(见附录)的算子,来驱动分区之间的Join并返回结果集给上层算子,它本质上就是一个Union All算子。 | 灵活度高:不需要分区之间一一对应,且不同分区可使用不同的Join算子。 | 1.当前IMCI的UnionAll算子不能完全按预期处理分区之间的Join,导致部分场景下内存占用与并行处理分区一样。
    2.执行计划树会变得非常复杂,各个分区也需要分别构建自己的一套算子,这个开销也比较大。
    |

本方案主要采用Partition Iterator算子的实现。Partition Iterator算子主要用于驱动Join算子按照分区逐个进行Join,同时保证上一个分区Join完成后,内存可以释放。

Open函数行为:

  • 将后序遍历 改成先序遍历,因为必须在scan算子open之前给它指定分区信息

DoOpen阶段行为:

  • 在自己的DoOpen函数中完成part_id的初始化
  • 在context中完成scan算子的分区信息初始化

DoFetch阶段行为:

  • 调用Join算子的Fetch函数,如果发现Fetch返回结果为空,则由一个Fetch线程释放内存,并调用Join算子的Close以及Open操作(算子重用,只是重新build hashtable),其它Fetch线程等待Open结束时唤醒
  • 被唤醒的Fetch线程继续调用Join算子的Fetch函数,直到所有分区访问完了。
  • 需要注意上层有Limit算子的情况,需要根据Finish信息停止Fetch。

Close函数行为:

  • 正常的close函数行为即可

DoClose阶段行为:

  • 正常的DoClose阶段

PartitionIterator算子的block属性应该和其下面的Join算子block属性一致,即正常情况为非block算子

优化器层

优化器层需要考虑何时、以及如何将PartitionIterator算子加入执行计划中。
IMCI 优化器的优化流程主要分为两个部分:

  1. Rewriting phase:这个部分主要 rule-based 的改写,不考虑改写前后的代价,i.e., unconditionally rewrite,因此这个 phase 的规则主要包含:
    1. 非常简单、几乎总是有益的改写,比如 filter pushdown;
    2. 配合执行器的改写,比如子查询的改写(IMCI 执行器不支持子查询的执行,所以就在优化器里面改写掉了);

在这个改写阶段,总是将 logical plan 从一个形态转换成另外一个等价的形态,不会同时存在多个等价的形态;

  1. Cost-based optimize phase:这个部分是更复杂的 cascade-style 的 top-down 优化流程,会利用“转换规则”从已有的执行计划中转换出一个或者多个 plan,并对每个 plan 计算代价;在转换过程中,会保留多种等价的执行计划,同时根据 cost 做计划树的 prunning;由于是自上而下的优化流程,因此可以从上往下传递“required property”,比如 partition、order,又比如 row id(parent relation 要求 child relation 输出 ROW ID 作为中间结果,而不是 DATA);
  2. 多机执行计划的生成,作为 cost-based optimize phase 的其中一部分(其中一种转换规则)。


在最终执行计划之上加入PartitionIterator算子。理论上,在任何一个阶段都可以将PartitionIterator算子加入到执行计划中,然而考虑到cost-based optimize阶段会有比较多的transform rules,如果在这之前加入,会影响pattern的匹配,因此比较适合在apply transform rules之后加入。而如果在apply implement rules阶段加入,则扩展性会降低,需要修改每个join算子的implement rule 才能适配其他join类型。因此,决定在最终执行计划上加入PartitionIterator算子,主要是以下两种pattern。
单机版

多机版

主要是MPP在分区时需要感知分区表的分区schema,然后按规则指定不同机器处理哪些分区,单机上的执行计划没有什么变化,可能需要处理下数据倾斜的问题,保障每个worker执行时间接近。

「喜欢这篇文章,您的关注和赞赏是给作者最好的鼓励」
关注作者
【版权声明】本文为墨天轮用户原创内容,转载时必须标注文章的来源(墨天轮),文章链接,文章作者等基本信息,否则作者和墨天轮有权追究责任。如果您发现墨天轮中有涉嫌抄袭或者侵权的内容,欢迎发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论