暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

流式 dbt:RisingWave 流计算的正确打开方式

让我们先来看一则 Reddit 经典提问[1]

“为啥 dbt 如此流行?有没有替代品?”

获赞最多的回答抓住了精髓:

“dbt 这么流行是因为它为 SQL 分析师和数据集市构建者提供了 Git 工作流,使得他们可以跟其他软件工程师一样工作。”

1为何选择 dbt?

dbt (Data Build Tool) 是一个在数据仓库领域内广泛流行的工具。



设想您的团队维护了 200+ 个 SQL 视图,它们逐层清理、转换原始数据,构建了一条极其复杂的 ETL 链路。然而,这数千行 SQL 代码却被放在单独一个文件里,到处充斥着重复和混乱。

这就是 dbt 上场的机会 —— dbt 提供的是一个面向数据转换(ETL 的 T)的开发框架。它的核心基于 Jinja[2] 模板语言,使得 SQL 也可以像函数一样接受参数,并被多处复用。

{% set payment_methods = ["bank_transfer""credit_card""gift_card"] %}

select
    order_id,
    {% for payment_method in payment_methods %}
    sum(case when payment_method = '{{payment_method}}' then amount endas {{payment_method}}_amount,
    {% endfor %}
    sum(amount) as total_amount
from app_data.payments
group by 1

与许多数据工具不同,dbt 几乎不对数据处理带来性能方面的益处,但它能为 SQL 开发上带来的流畅和规范的体验:

  • 模块化dbt 让 SQL 能够以模块的形式被复用,从而减少重复代码。
  • Jinja 宏:dbt 的 Jinja 宏基本等同于许多数据库所提供的 SQL UDF[3],但 UDF 缺乏标准统一,不如规范通用的 Jinja 语法。
  • 文档管理:dbt 内置了将数据模型生成为 Web 网页的工具,使得不同团队相互可见对方所提供的数据产品,并理解其中的上下文,例如数据血缘关系[4]
  • 数据测试:dbt 内置了对模型进行测试的框架,例如验证是否存在重复数据,是否有非预期的值等等。
  • 与数据库集成:dbt 深度集成了主流的数据库,如 Snowflake、BigQuery、Redshift 等,包括为每个系统都单独提供了许多宏扩展,弥补了一些表达式的缺失。
  • 丰富的第三方包:dbt 的社区项目[5]十分丰富,例如包含了各种日期处理函数[6]的库。

2流式 dbt

当 dbt 在数据仓库的批处理场景中大放异彩时,许多用户也在将 RisingWave 纳入实时数仓的技术栈里。在这个场景下,dbt-risingwave[7]适配器成了一个必需品。

我们先来看一个数据仓库的增量批处理作业:

{{config(materialized='incremental') }}

select *, my_slow_function(my_column)
from raw_app_data.events

{% if is_incremental() %}
  -- this filter will only be applied on an incremental run
  where event_time > (select max(event_time) from {{ this }})
{% endif %}

而这是 RisingWave 的实时物化视图版本,显然可维护性更高:

{{config(materialized='materialized_view') }}

select *, my_slow_function(my_column)
from raw_app_data.events;

与其他数据仓库不同,RisingWave 不需要每隔一段周期触发 dbt run
来更新结果,也不需要沿用 Incremental Model[8],写冗长的代码来按时间过滤增量数据(即 if is_incremental()
的部分)。这是因为 RisingWave 的物化视图是自动刷新的,一旦数据到达了就会被快速处理。

另一方面,相比于物化视图,dbt 的用户更多接触到的还是 table,更准确地说,是 CREATE TABLE AS SELECT (CTAS)。这个语句在绝大多数数据仓库系统中,都是对当前时间点的查询结果记录一个静态快照。而在 RisingWave,CTAS 等同于物化视图,所有表结果会随着数据流实时更新。

换句话说,不论是 materialized_view
抑或是 table
,它们都是动态的,不再需要靠一个外部 Cron 脚本定期更新。

3实时看板

dbt 绘制的数据血缘图

最后,让我们实现一个 dbt + Metabase + RisingWave 的实时看板。与 Metabase 的连接配置可以参考 RisingWave 官方文档[9]

exposures:
  - name: jaffle_shop
    type: dashboard
    maturity: high
    url: http://127.0.0.1:3000/dashboard/1-jaffle-shop#refresh=5

    depends_on:
      - ref('customers')
      - ref('orders')

基于已经创建的模型,我们配置一个 dbt exposure,这个功能主要用于让数据工程团队交付他们的最终成果。


需要注意的是,Metabase 最小可配置的自动刷新频率是 1 分钟,而 RisingWave 提供的数据新鲜度则默认在 1 秒(得益于我们的流式架构)。用户可以在 URL 后面加上 #refresh=5
,表示每 5 秒刷新一次。

4总结

dbt 是现代数据栈中的 Git,它原理简单易上手,但所提供的能力却覆盖了数据仓库构建流程的方方面面。随着实时数据仓库的普及,越来越多的用户开始青睐 dbt + RisingWave 的组合。RisingWave 降低了实时数据处理的门槛,而 dbt 则让数据团队的组织协作变得更加方便。

dbt-risingwave适配器已发布到 PyPl[10]。欢迎访问我们的文档[11],了解如何安装配置和使用。如果您对 dbt-risingwave 适配器的功能有任何建议和疑问,欢迎来到我们的 Slack 社区[12]Github Issues[13]进行反馈!

参考资料

[1]

Reddit 经典提问: https://www.reddit.com/r/dataengineering/comments/r8pa3i/why_is_data_build_tool_dbt_is_so_popular_what_are/

[2]

Jinja: https://jinja.palletsprojects.com/

[3]

SQL UDF: https://www.postgresql.org/docs/current/xfunc.html

[4]

数据血缘关系: https://docs.getdbt.com/terms/data-lineage

[5]

社区项目: https://github.com/Hiflylabs/awesome-dbt#packages

[6]

各种日期处理函数: https://github.com/calogica/dbt-date

[7]

dbt-risingwave: https://github.com/risingwavelabs/dbt-risingwave

[8]

Incremental Model: https://docs.getdbt.com/docs/build/incremental-models

[9]

RisingWave 官方文档: https://docs.risingwave.com/docs/current/metabase-integration/

[10]

PyPl: https://pypi.org/project/dbt-risingwave/

[11]

文档: https://docs.risingwave.com/docs/current/use-dbt/

[12]

Slack 社区: https://risingwave.com/slack

[13]

Github Issues: https://github.com/risingwavelabs/risingwave/issues

关于 RisingWave 

RisingWave 是一款基于 Apache 2.0 协议开源的分布式流数据库,致力于为用户提供极致简单、高效的流数据处理与管理能力。RisingWave 采用存算分离架构,实现了高效的复杂查询、瞬时动态扩缩容以及快速故障恢复,并助力用户极大地简化流计算架构,轻松搭建稳定且高效的流计算应用。
RisingWave 始终聆听来自社区的声音,并积极回应用户的反馈。目前,RisingWave 已汇聚了近 150 名开源贡献者和近 3000 名社区成员。全球范围内,已有上百个 RisingWave 集群在生产环境中部署。



往期推荐

技术内幕

如何上手 RisingWave 👉 新手入门教程

深入探索 RisingWave 中的高可用性与容错机制

深入理解 RisingWave 流处理引擎(三):触发机制

深入理解 RisingWave 流处理引擎(二):计算模型

深入理解 RisingWave 流处理引擎(一):总览


用户案例
视源股份(CVTE)IT 流计算应用历程
尘锋 SCRM 如何使用 RisingWave 实时打宽
RisingWave 在超百亿管理规模对冲基金公司中的应用
金融科技公司 Kaito 使用 RisingWave 实现实时智能化
龙腾出行如何通过 RisingWave 实现实时数据分析
RisingWave 助力乾象投资打造实时监控平台

文章转载自RisingWave中文开源社区,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论