供稿 : 甜橙金融大数据研发 kindred
背景
很多朋友在使用 distributed 表作 join 的时候 , 可能都会发现一个问题 , 即使设置了 distributed_product_mode='local' , join 右侧的表还是无法被改写成对应的 local 表从而加速 join 的性能。
非 local 表查询
# 创建MergeTree表
CREATE TABLE default.local_t
(
`id` Int64,
`age` Int32,
`name` String,
`createdat` DateTime
)
ENGINE = MergeTree
ORDER BY id;
# 创建Distributed表
CREATE TABLE default.all_t
(
`id` Int64,
`age` Int32,
`name` String,
`createdat` DateTime
)
ENGINE = Distributed('twoshard_onerepl', 'default', 'local_t', id);
# 插入约1000万数据并执行以下查询
select a.age,sum(b.age)
from all_t a local
inner join all_t b
on a.id=b.id
group by a.age;
查看设置
distributed_product_mode='local'

查询效果
花费 10.445 秒,通过 explain syntax 可以看出来右表未被改写成 local 表

local 表查询
# 把右侧的表改成嵌入一个子查询
select a.age,sum(b.age)
from all_t a local
inner join (
select id,age
from all_t t) b
on a.id=b.id
group by a.age;
查询效果 花费 3.029 秒,通过 explain syntax 查看, join 右侧的表被改写成了 local 表:

源码分析
本例中代码版本为 21.3.9.83-lts
涉及相关类说明
ASTTablesInSelectQuery:代表from子句部分, 包括from表, join, array join等部分。
ASTTablesInSelectQueryElement: 这是ASTTablesInSelectQuery的子元素, 它有三个属性table_join, table_expression, array_join, table_join对应的类是ASTTableJoin, table_expression对应的类是ASTTableExpression, array_join在此不涉及。
ASTTableExpression: 形式是指表,table function或子查询。如果是表的形式则对应database_and_table_name属性,如果是table_function形式则对应table_function属性,如果是子查询形式则对应subquery属性。
ASTTableJoin: 是指join相关, 包括join方式, on子句, using子句等。
测试用例分析
# ASTTablesInSelectQuery代表以下部分:
from all_t a local
inner join (
select id,age
from all_t t) b
on a.id=b.id
# 它有两个ASTTablesInSelectQueryElement元素 :
# 第一个元素ASTTableExpression不为空,代表了all_t a,ASTTableJoin的属性是空的。
# 另一个元素的ASTTableExpression和ASTTableJoin都不为空,ASTTableExpression就是子查询b部分,ASTTableJoin就是 inner join ... on a.id=b.id部分。
测试用例 AST 结构
整个语句的 AST 结构可以通过 explain AST 命令展示出来

visit 函数
针对 In 或 join 分布式表改写成 local 表是在 Interpreters/InJoinSubqueriesPreprocessor 类里进行的,从 visit 函数开始入手
有了之前对 AST 结构的了解对这个函数的代码就比较容易理解了。这里提几个点:
1、如果 from 的表不是 ordinary table,语句不会被改写。
2、如果 from 的表不是 distributed 表,或者 shard 数少于 2 个,语句也不会被改写。
3、真正改写操作在 NonGlobalSubqueryVisitor 类里实现。NonGlobalSubqueryVisitor 类分析
NonGlobalSubqueryVisitor 类,在同一个 cpp 文件里。这个类和其它的 SQL 处理 Visitor 类一样,是基于 visitor 模式对 SQL 进行解析或别的操作。采用的是自顶向底方式遍历 AST。主要处理 in/notin 和 join 模块。主要分析 join 模块,代码如下:

代码中可以看到,只有在 ASTTablesInSelectQueryElement 中 table_join 和 table_expression 都不为空的时候才会进行处理。也就是说不会处理 from 表,而只处理 join 右侧的数据集(table, table function, subquery), 并且如果刻意指定了 global join 的话也不会进行改写。当 join 的右侧是 subquery 的话使用 NonGlobalTableVisitor 类开始对 subquery 进一步处理。NonGlobalTableVisitor 类分析
NonGlobalTableVisitor 类,这个 visitor 对应的 matcher 是 OneTypeMatcher,会处理上述 subquery 中所有的 ASTTableExpression 节点

如果 TableExpression 是 ordinary table,则会调用 renameIfNeeded 函数进行改写renameIfNeeded 函数分析
renameIfNeeded 函数 , 这里只分析 distributed_product_mode=local 的情况 , 代码如下 :

代码中可以看出:
1、subquery 中的 distributed 表需要添加 alias。
2、subquery 中的 distributed 表被改写成了对应的 local 表
测试用例分析结论
通过分析可以看出,问题就出在 NonGlobalSubqueryVisitor 类的 visit 函数里。
在对 join 右侧的 TableExpression 进行处理的时候,只考虑到了 subquery 的情况,而没有考虑 ordinary table 的情况。
源码改写
修改代码如下:
逻辑很简单,再增加一个处理 ordinary table 的分支即可。由于 NonGlobalTableVisitor 本身就处理 TableExpression,直接把 table_expression 传入由它处理就可以了。
验证
代码修改完编译,启动服务,设置 distributed_product_mode = 'local', 不带子查询能正常改写 distributed 表,执行耗时 3 秒左右,与之前测试走 local join 耗时差不多:








