暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

Paimon Deletion Vectors:近实时更新与极速查询

2973

01

01

技术背景


实时大数据分析是企业决策的关键,可以让企业获得实时反馈,及时调整策略。Apache Flink 流计算加上 OLAP 系统的组合可以让部分数据实时流动、实时更新、实时查询可见。但是,维护流批两套架构带来的成本、流批割裂、数据封闭等问题,还有大量离线数据无法迁移到实时分析的链路中。

Apache Paimon 是一个 Lake Format,可以使用 Apache Flink 和 Apache Spark 构建实时湖仓架构,进行流批一体的计算。Paimon 创新性地将 Lake Format 和 LSM (Log Structured Merge Tree) 结构相结合,为数据湖带来实时流式更新。Paimon 的主键表支持大规模更新的写入,具有非常高的更新性能,且提供丰富生态的查询,使得数据能以低成本的、开放的、流批一体的方式提供实时化的分析。


02

业务用例


业务上有一张 orders 表,它需要从上游数据库同步到湖中。

    CREATE TABLE orders (
    order_id BIGINT,
    order_name STRING,
    order_user_id BIGINT,
    order_shop_id BIGINT,
    order_product_id BIGINT,
    order_fee DECIMAL(20, 2),
    order_create_time TIMESTAMP(3),
    order_update_time TIMESTAMP(3),
    order_state INT,
    PRIMARY KEY (order_id) NOT ENFORCED
    )

    入湖后,可以进行批的 ETL 调度,也可以进行分析查询,大致架构如下:


    Batch ETL 往往对读取性能的要求没有那么高,一般整体作业在分钟级完成即可。但是分析需求要求数据秒级返回,因为分析面向人,分析人员不应该等太久。

    接下来,让我们来看看 Paimon 的底层设计是如何满足上述架构。

    03

    Paimon 主键表


    当你定义主键后,即为 Paimon 的主键表,主键表能实时写入更新数据,并且能被实时查询。

    Paimon 的基本结构如下:


    主键表的文件结构大致如上,表或者分区内包含多个 Bucket,而每一个 Bucket 都是一个单独的 LSM 树形结构,包含多个文件。
    LSM 的写入流程大概为:Flink Checkpoint 攒一批数据,Flush 为 L0 的文件,按需触发 Compaction 来合并数据:

    • MOR (Merge On Read): 合并数据默认是半异步的 (当 L0 文件太多会反压写),你也可以设置成完全异步 (不反压写)。
    • COW (Copy On Write):合并数据也可以设置为同步,也就是在写入时完成合并。

    04

    Merge-On-Read


    当模式为 MOR 时,读取需要合并所有文件,因为所有文件都是有序的,进行多路归并,里面会有主键的比较计算。


    这里明显有一个问题,单棵 LSM 树在读取时只能有单个线程去读取,所以读取并发受限制,如果 Bucket 内数据量太大,会导致读取性能较差。所以为了读取性能,有分析查询需求表,推荐设置 Bucket 内数据量在 200MB - 1GB 之间,否者读取会在10秒以上返回。但是 Bucket 太小,会有较多的小文件读写,给文件系统造成压力。

    另外,由于有合并过程,对于非主键的列也不能做基于 Filter 的 Data Skipping,否者新数据被过滤掉,导致得出错误的老数据。

    由于写入时不需要强行合并数据,写入性能是最高的。但是 LSM 下读取,由于多路归并,存在性能问题:

    1. 单 LSM 单线程,并发受限。
    2. 非主键的列不能做过滤下推。
    3. 多路归并需要一定性能消耗。

    我们可以大致定义它的性能成绩:

    • 写入:100分,非常好
    • 读取:10分,较差


    05

    Copy-On-Write


    一个直观的想法是能不能在写的时候直接合并好数据。
      ALTER TABLE orders SET
      ('full-compaction.delta-commits' = '1');


      设置 Full Compaction 的间隔为1,这意味着每次写入都会进行全量的合并,所有的数据都会被合并到最高层里,在读取时,此时并不需要合并,读取性能是最高的。但是每次写入都需要全量合并,写放大非常严重。

      我们可以大致定义它的性能成绩:

      • 写入:10分,非常差
      • 读取:100分,非常好

      看起来,MOR 和 COW 是两个极端,虽然 Paimon 的 MOR 在大部分情况下都够用了,比如 Batch ETL,但是在一些需要高性能查询分析的场景,是存在一些不足的。

      有没有一种模式能让读取和写入折中,都有比较好的性能?



      06

      Deletion Vectors


      在向量化计算中,有一种结构是 Select Vectors,代表哪一行需要选中,哪一行被删掉了。它用作在向量化计算里的过滤,过滤是最基本的 SQL 算子,而 Select Vectors 的方式结合向量化计算,甚至 SIMD (Single Instruction Multiple Data) 时,对性能的影响并不大。

      同样到 Paimon 中,我们能否在写入时产出这样一个类似的 Vectors,代表文件里面哪些数据被删掉了,这样在读取时直接过滤掉不需要的行,这样就相当于合并完成了,也不会影响读的性能。


      一个简单的例子如下:


      删除数据直接标记到 Delete 文件即可,Upsert 更新数据其实就是先删除再新增的方式。

      我们来看这种模式的读取和写入:
      1. 读取性能好:并发随意、可过滤下推、不需要合并,只是多了 Deletion Vectors 的过滤,代价小。
      2. 写入性能中:写入时,需要去查询并标记对应同主键的数据,修改历史文件的 Deletion Vectors。

      写入时查询并标记?具体好像有点难?这在 Paimon 里并不难,因为 Paimon 已经是 LSM 的组织,而 LSM 最初应用最多的就是点查,这意味着可以利用 Paimon LSM 点查的能力,快速找到需要删除的文件及其删除的行号。


      数据在写入时会去 Lookup LSM Tree,产出对应的 Deletion File,这样在读取时就直接过滤掉被删除的数据。熟悉 Paimon 的同学,应该可以想到,这个 Lookup 机制和 Changelog-Producer Lookup 是同一套底层机制,这也是 Paimon 定位实时数据湖长久来积累的能力。

      每一个 bucket 会产出对应的一个 Deletion File,文件的结构如下:


      对应每个文件,将它的 Deletion Vector 通过 Bitmap 的方式保存,一个 Bucket 一个 Deletion 文件可以尽可能避免Deletion 文件造成小文件太多的问题。

      针对每个 Bitmap,选用了 RoaringBitmap 结构。在 Paimon 前,Apache Iceberg 和 Delta 已选用此结构作为批量删除的查询加速:

      1. RoaringBitmap 是一种压缩的位图,可以大幅减少存储空间。
      2. RoaringBitmap 有多语言支持,C++ 引擎也可以方便的读取它。

      有了 Deletion Vectors 模式后,我们可以大致定义此模式的性能成绩:

      • 写入:60分,还可以
      • 读取:90分,非常好



      07

      性能测试


      测试环境


      • 集群 EMR 5.16.0:工作节点: 4台,24 CPU,96 GiB
      • Flink 1.15 & Spark 3.3.1
      • Trino 422:最新 Paimon-Trino 版本已经特殊优化过 ORC 的读取
      • Paimon 0.8:deletion-vectors.enabled 此配置可以开启 Deletion Vectors (简称 DV) 模式,默认关闭


      数据规模


      如上 orders 表结构,Datagen 生成 5 亿条,主键范围 1 ~ 10亿,bucket = 8,写入完毕后单 bucket 大概 40+ 个文件,5 GB 大小(使用 Flink SQL 写)。


      写入性能


      • 不开启 DV:455 秒,单并发每秒写入 13  万条
      • 开启 DV:    937 秒,单并发每秒写入 6.6 万条

      写入性能慢了一倍,我们会在后续版本持续优化。


      查询性能


      不开启 DV:

      Trino
      Spark
      查询0列
      COUNT(*)
      9.66s
      11.47s
      查询1列
      product_id
      11.96s
      12.61s
      查询2列
      product_id + shop_id
      13.31s
      13.33s
      查询3列
      product_id + shop_id + user_id
      15.23s
      13.83s
      可以看到,Trino 和 Spark 并没有什么区别,因为需要合并时,它们都共享相同的 Reader 实现。

      开启 DV:

      Trino
      Spark
      查询0列
      COUNT(*)
      1.29s
      3.71s
      查询1列
      product_id
      1.85s
      3.18s
      查询2列
      product_id + shop_id
      1.88s
      2.92s
      查询3列
      product_id + shop_id + user_id
      1.92s
      3.62s

      当开启 DV 时,Spark 的查询性能大幅提升,而 Trino 的提升幅度更大,为什么呢?因为 Trino 在不需要合并时会使用 Trino 的 ORC Reader 以及 Trino 的列存结构,这给了它更原生的读取性能。


      08

      总结


      Paimon 的 Deletion Vectors 模式可以让你在不损失太大写入性能的同时,获得极大的读取性能提升,你可以根据业务来决定哪些表值得此模式。后续,StarRocks 读取 Paimon 的 Deletion Vectors 优化也在路上,相信会有更强悍的性能体验。

      Paimon 持续增强实时数据湖的能力,以数据湖格式来支撑分钟级大数据全场景的支持,包括:批计算、流计算、OLAP 计算,更多精彩请:

      1. 关注 Apache Paimon 微信公众号,了解更多咨询。
      2. 点赞项目:https://github.com/apache/incubator-paimon/
      3. 加入钉钉 Paimon 用户交流群。



      文章转载自锋哥聊DORIS数仓,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

      评论