暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

技术分享|Doris的分布join操作

原创 大数据模型 2023-09-04
1292

多表关联查询

多维查询和即席查询底座下免不了多表查询,复杂的业务分析洞察免不了多表查询,多表查询是什么?简单的多表查询下面是星型,星型大多数情况是一个大表和多个小表关联,普通的多表查询是雪花模型,即两个事实表关联多个小表,复杂多表查询是星座模型 内含多个大事实表关联。

Clickhouse早已经放弃雪花模型和星座模型,目前仅支持星型模型,一般复杂一点的SQL操作都用了,TPC-H和TPC-DS基准测试都无法支持。

Doris比Clickhouse支持更多的SQL操作,内部JOIN操作有4种模式 Shuffle join、Bucket Shuffle join 、Broadcast join 、Colocate Join

内部JOIN原理

多表查询与单表查询是不同的问题,单表是一元计算,而多表是两元计算。两元计算要提高性能,一个方法提高硬件配置CPU、内存、网络, 例如CPU绑核多线程并发,扩大网络为10000提高带宽传输 , 或者软硬件件结合的方式 ,例如向量化处理,首先CPU实现单指令多数据集,再实现较多的算子向量化。

还有一个策略办法计算贴 近存储,这是 Doris的秘决,通过计算贴 近存储去理解Doris的运算方式 。假如A表要G与B表进行关联,有下面4种策略。

  1. 预先干预,两个表的数据已经在同一个节点,不需要跨网络传输,直接本地计算。Colocate Join
  2. 计算前,把其中一个表的数据全量放到另外一个表上面,避够太多无谓的网络传输。Broadcast join
  3. 计算前,一个表的数据保持不动,另一个表把数据分散打到固定表上面,减少网络传输。Bucket Shuffle join
  4. 计算时, 两个表各自把数据打散后,网络上东奔西走按照特征合并,再向上汇总。 Shuffle join

Shuffle join

image.png

shuffle join是最传统的内部join方式,最早可以追溯到单机运行的 nestd loop join、merge join、hash join。shuffle join就是基于hash join的基础上发展起来的,不同的是shuffle必须有网络传输。

  1. 首先,AB两表根据连接操作的键(关联列),通过hash将输入数据集按键进行分组,这将导致将具有相同键值的记录划分到同一个分区中。
  2. 然后,将每个分区的数据通过网络传输到合适的计算节点,以便将具有相同键值的记录聚集在一起。
  3. 在目标计算节点上,对具有相同键值的记录进行合并操作。
  4. 最后,合并的结果可以存储在一个新的数据集中,或者用于进一步的计算或输出。

Bucket Shuffle

image.png
Bucket Shuffle join是Shuffle的升级,保持一个表不动,并把表的数据粒度最细,并且数据有序摆放。另外一个表【一般是较少的表】,小表主动做HASH分发,去匹配大表

  1. 首先,AB两表根据连接操作的键(关联列),较小表通过hash将输入数据集按键进行分组,这将使具有相同键值的记录划分到同一个分区中。
  2. 小表分区分区的数据通过网络传输到大表已经整理好的的数据上面,相同键值的记录聚集在一起。
  3. 目标计算节点上,对具有相同键值的记录进行合并操作。
  4. 合并的结果可以存储在一个新的数据集中,或者用于进一步的计算或输出。

Broadcast join

image.png

hadoop时代就用的分布式JOIN好方法,流传至今,Broadcast join依然是一个工程好方法。

Broadcast join是一个对内存要求很高的方法,因为它会提前把小表全放到目标计算点的内存里面。

  1. 首先,AB两表根据连接操作的键(关联列),小表发到所有的计算节点上面,保存到目标计算节点的内存。
  2. 目标计算节点上,两表的数据都在同一个位置了,根据相同的键值进行合并操作。
  3. 合并的结果可以存储在一个新的数据集中,或者用于进一步的计算或输出。

Colocate Join

image.png
Colocate join其实是预计算的方式,必须要对业务有所洞悉,笔者看了Doris的建表,感觉还算友好。

  1. 首先,AB两表根据连接操作的键(关联列),大表小表已经同在一个节点。
  2. 计算节点上,根据相同的键值进行合并操作。
  3. 合并的结果向上汇总,直接输出结果。

附几段实战

Doris默认会使用broadcast join的方式

例如以下例子,优先会使用table12表的user_id列广播到每一个计算节点上面。

select sum(table12.cost) from table12 join table11 where table12.user_id = 10000;


显示braodcast  join的方式
select sum(table12.cost) from table12 join [broadcast] table11 where table12.user_id = 10000;


显示 shuffle join的方式
select sum(table12.cost) from table12 join [shuffle] table11 where table12.user_id = 10000;

设置bucket shuffle join的方式

 set enable_bucket_shuffle_join = true;


定义colocate,预先干预数据存储分布
CREATE TABLE  tbl2  (
    `k1` datetime NOT NULL COMMENT "k1",
    `k2` int(11) NOT NULL COMMENT "k2",
    `v1` double SUM NOT NULL COMMENT "v1"
) ENGINE=OLAP
AGGREGATE KEY(`k1`, `k2`)
DISTRIBUTED BY HASH(`k2`) BUCKETS 8
PROPERTIES (
    "colocate_with" = "group1"
);



CREATE TABLE IF NOT EXISTS tbl1  (
    `k1` date NOT NULL COMMENT "k1",
    `k2` int(11) NOT NULL COMMENT "k2",
    `v1` int(11) SUM NOT NULL COMMENT "v1"
) ENGINE=OLAP
AGGREGATE KEY(`k1`, `k2`)
PARTITION BY RANGE(`k1`)
(
    PARTITION p1 VALUES LESS THAN ('2023-02-28'),
    PARTITION p2 VALUES LESS THAN ('2023-03-31')
)
DISTRIBUTED BY HASH(`k2`) BUCKETS 8
PROPERTIES (
    "colocate_with" = "group1"
);





「喜欢这篇文章,您的关注和赞赏是给作者最好的鼓励」
关注作者
【版权声明】本文为墨天轮用户原创内容,转载时必须标注文章的来源(墨天轮),文章链接,文章作者等基本信息,否则作者和墨天轮有权追究责任。如果您发现墨天轮中有涉嫌抄袭或者侵权的内容,欢迎发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

文章被以下合辑收录

评论