暂无图片
暂无图片
2
暂无图片
暂无图片
暂无图片

Apache Cloudberry 向量化实践(四):从 Filter+Take 到 O(1) 调度

HashData 2025-07-16
228

Apache Cloudberry™ (Incubating) 是 Apache 软件基金会孵化项目,由 Greenplum 和 PostgreSQL 衍生而来,作为领先的开源 MPP 数据库,可用于建设企业级数据仓库,并适用于大规模分析和 AI/ML 工作负载。

GitHub:  https://github.com/apache/cloudberry

文章作者:赵熙,Apache Cloudberry 贡献者;整理:酷克数据

在构建现代向量化数据仓库系统的过程中,数据重分布(redistribution)是一个绕不开的关键步骤。它既是 MPP 查询执行的协作桥梁,也是吞吐性能的敏感点。表面上,这是一件“非常工程”的事情:把数据根据分布 key 分到对应节点,再做下一步处理。但深入底层大家会发现,重分布不仅决定着 CPU 和网络资源的使用效率,还深藏着复杂度陷阱,尤其是在 Filter 与 Take 算子协同作用时。

本文将拆解 Cloudberry 在重分布路径中遇到的一个典型性能瓶颈,并分享我们如何通过调度机制的结构性重构,将其从 O(N²) 优化为 O(1)。

向量化重分布流程回顾

在 Cloudberry 的向量化执行模型中,重分布流程可简化为三步:

  1. Hash:根据分布 key 对数据进行 hash 分桶。
  2. Filter:为每个目标节点构建布尔筛选向量,挑出应发往该节点的行。
  3. Take:根据筛选结果,从原始 RecordBatch 中抽取数据构造目标批次。

这个流程可以看作是:

for each target_node:
    mask = filter(records, where node_id == target_node)
    result[target_node] = take(records, mask)

当节点数较少时,这套逻辑运行顺畅。但当我们将系统扩展到几十上百个节点时,问题就暴露出来了:执行耗时呈指数级上升,网络负载加剧,CPU 使用率暴涨。

Filter+Take 的复杂度陷阱

核心问题在于该段逻辑的复杂度:

for each node:
    mask = filter(...)
    output = take(input, mask)

这在每一个节点维度都进行了一次 filter 和 take。对于一个拥有 N 个节点、M 行数据的批次,最坏情况下就是 执行了 N 次 filter + take 操作,且每次遍历 M 行数据,其时间复杂度达到 O(N × M)。

进一步细化:

  • Filter 构建一个布尔掩码,代价为 O(M);
  • Take 则是按掩码重新构建向量,需要 O(K);
  • 并且每个节点都执行一遍(哪怕命中数据很少),构成冗余。

在集群节点增多、批次数据量增大时,这种“线性乘积”放大效应愈发明显。

实例指标(某查询下)

节点数
RecordBatch 行数
平均重分布耗时
8
10 万
63ms
64
10 万
427ms
128
10 万
1152ms(瓶颈)

大家可以看到,节点数扩大 16 倍,耗时扩大了 18 倍以上,远远超过线性增长,典型的复杂度爆发。

火焰图:性能崩塌点直观可见

我们使用 perf
 和火焰图分析 CPU 调用栈,发现热点集中于:

  • FilterExec::EvaluateBatch
  • TakeExec::Materialize
  • Arrow 相关函数如 AllocateBitmap
    MakeArray

这些函数堆栈高度在火焰图中连续升高,形成“平台状火焰墙”,说明系统在反复执行 filter 和 take,并频繁进行内存构建操作。

同时,我们将节点数量与耗时绘制为曲线图,可以清晰看到:

  • 起初性能线性增长;
  • 节点数超过 32 后,出现拐点;
  • 节点数越多,增长斜率越陡,说明瓶颈效应在放大。

这意味着:原有机制不适合在大规模场景下运行,必须重构。

分组调度机制重构:O(1) 的启发式路径

我们提出的解决方案核心思想是:从“逐节点处理”转向“全局排序+切分”

新流程如下:

  1. 每行预计算目标节点编号 → segment_id
  2. 对整个 RecordBatch 按 segment_id
     排序
  3. 记录每个 segment 的 (offset, length)
  4. 直接 slice
     排序后数据构造子批次

相当于一次排序后就完成了所有的分发分段操作,避免了对每个节点都跑一遍 filter+take。

复杂度对比:

操作
原实现
优化后
filter + take
O(N × M)
sort
O(M log M)(一次排序)
slice
O(1)(常量开销,仅定位边界)
总复杂度
O(NM)
O(M log M + 1) ≈ O(M log M)

局部性优化:提升 cache 亲和性

除了算法复杂度,我们还关注了内存访问局部性:

  1. 排序后的数据是分段连续存储的,每个节点对应的数据紧凑集中,有利于 cache 命中。
  2. 在后续算子(如 Shuffle、Encode)中处理这些分段数据时,可实现零 copy、无缓存碎片。
  3. 进一步支持分段并发处理时,每个线程只需要处理一个连续段,调度简单、内存局部性强。 这类结构性优化为系统带来的不仅是“更快”,还有“更稳”和“更可控”。

多维基准测试对比

我们设计了多维度对比测试,用以验证优化效果:

维度
原方案耗时
优化后耗时
性能提升
节点数 64,数据量 100 万行
1123ms
337ms
3.3×
节点数 128,数据量 50 万行
1430ms
362ms
3.9×
节点数 32,并发度 10
758ms
221ms
3.4×

此外,内存使用量也减少了约 20%,线程切换次数减少 40%,整体系统负载更加均衡。

👇🏻️扫码加入 Apache Cloudberry 交流群👇🏻️


文章转载自HashData,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论