暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

Apache InLong 实时同步数据到 StarRocks 原理与实践

StarRocks 2024-01-31
803

作为业界首个一站式、全场景海量数据集成框架Apache InLong(应龙) 提供了自动、安全、可靠和高性能的数据传输能力,方便业务快速构建基于流式的数据分析、建模和应用。目前 InLong 正广泛应用于广告、支付、社交、游戏、人工智能等各个行业领域,服务上千个业务,其中高性能场景数据规模超百万亿条/天,高可靠场景数据规模超十万亿条/天。

在运营过程中,业务实时入湖的诉求愈发凸显,本文将详细阐述 InLong 中实时写入 StarRocks 的技术原理,主要对写入过程中的精准一次性保证进行阐述。

StarRocks 数据导入

01


StarRocks 数据导入能力

StarRocks 作为数据湖云原生化、弹性化、实时化的重要产品,为业务在报表推送、实时数据分析、数据湖分析等场景提供了助力。目前 StarRocks 提供四种数据导入能力,分别是 Stream Load, Broker Load, Routine Load,Spark Load。目前 Apache InLong 选用 Stream Load 方式进行实时写入,由于需要事务性保证,使用了带事务能力的 Stream Load 接口 Stream_Load_Transaction_Interface[1],下面介绍下 StarRocks 中的事务写入原理。

02


StarRocks 事务写入原理

StarRocks 的事务写入基于典型的两阶段提交事务实现,客户端使用事务服务主要包含以下几个接口:

  • /api/transaction/begin:开启一个新事务。
  • /api/transaction/prepare:预提交当前事务,临时持久化变更。预提交一个事务后,可以继续提交或者回滚该事务。这种机制下,如果在事务预提交成功以后 StarRocks 发生宕机,仍然可以在系统恢复后继续执行提交。
  • /api/transaction/commit:提交当前事务,持久化变更。
  • /api/transaction/rollback:回滚当前事务,回滚变更。
  • /api/transaction/load:发送数据,可以使用已有的事务,如果没有指定事务label,会随机生成一个 label 进行数据写入

不同阶段对应的 StarRocks 内部流程如下:
  • Begin  + load 阶段:


开始数据导入时,客户端通过 begin transaction 接口开启一个新的事务,提交给 FE leader 中的事务管理模块,事务管理模块充当了两阶段提交中的事务管理者,用来管理事务的原子性、事务的回滚等。每一个事务可以设置一个 label ,StarRocks FE 会检查本次 begin transaction 请求的 label 是否已经存在,如果 label 在系统中不存在,则会为当前label 开启一个新的事务。begin 阶段之后可以使用该 label 对 StarRocks 进行 Stream Load 导入,Stream Load 返回成功的条件是写入数据的副本数量超过了 tablet 副本总数的一半,剩下的一半由 StarRocks 的副本机制保证完整写入;
  • Commit 阶段

FE 接受 commit 信息之后,会将事务状态改为 committed。之后事务管理器会向 BE 节点发送 publish version 消息,BE 收到 publish 中的版本消息后,会将本地的消息版本改为本次事务对应的版本;同时会向 FE 回包代表数据版本已经成功修改,并且将数据的状态改为 VISIBLE。此时数据对用户可见,客户端执行查询的时候会比较版本号,从而解决读写版本冲突;

  • Rollback 阶段

如果写入过程或者 commit 过程失败,则事务 abort,清理事务的任务在 BE 节点异步执行,将数据导入生成的数据标记为不可用,这些数据之后会从 BE 上被删除。状态为 committed 的事务(commit 成功但 publish version 超时的事务)不能被回滚。

    03


    总结

    • StarRocks 可以通过给数据设置版本控制来解决读写冲突

    • StarRocks 通过引入 FE 中的事务管理实现了两阶段导入,保证了导入的原子性

    InLong 实时写入 StarRocks 原理

    01


    基本原理

    Apache InLong 实时写入 StarRocks 如图所示,实时写入通过 Flink 实时任务来实现,Flink 任务写入侧具体执行逻辑如下:

    • 根据 Flink 并行度配置生成多个 Task 执行写入

    • 每一个 Task 基于 StarRocks 提供的 Stream Load 机制进行写入,每一个 Flink checkpoint 周期使用相同的 StarRocks 事务 label;

    • Flink 开始做 checkpoint 时,状态中存入当前写入的 table 以及对应的事务 label;

    • 写入算子收到 checkpoint 完成的消息时,将所有的 table 对应的事务进行 commit,此时数据对用户可见

    02


    详细流程

    1 任务写入数据


    在写入数据时,首先不会直接将数据写入到 StarRocks 中,而是将每个 table 对应的数据进行缓存。单个表数据达到一定大小之后才会调用一次刷新 flush 操作,flush 操作包括以下流程:

    • 启动一个事务,每个 checkpoint 内会使用同一个事务 label,调用 api/transaction/begin
    • 使用该 label 进行数据写入,调用 api/transaction/load 实际写入数据

    这种写入流程保证了:

    • 每次写入使用相同的事务 label,提交时可以提交一整个 checkpoint 内的所有数据,单个 checkpoint 只会提交一次,重复提交 StarRocks 不会生效;
    • 每次写入都是批次写入,缓解 StarRocks 写入压力

    2 任务保存检查点

    任务保存检查点的时候会进行以下流程:
    • 对目前内存中保存的所有表的数据都进行 flush, 确保内存中所有的数据已经导入到 StarRocks, 当前数据在 StarRocks 中不可见

    • 对所有的表对应的导入事务,进行 prepare 调用,如果 prepare 失败,则表示当前 StarRocks 不支持该事务的提交,调用 abort 接口,并失败重试

    • 对于 prepare 成功的事务,保存在当前 Flink 状态信息中

    3 任务如何确认保存点成功

    当 Flink Task 收到检查点已经完成的确认信息后,对 checkpoint 过程中保存的事务信息进行 commit,如果 commit 失败,则重启任务。commit 成功的事务会在 checkpoint 中删除
    4 任务如何初始化
    当任务启动时,Task 拿到上一个保存点的状态信息,恢复暂时未 commit 的事务信息,对 checkpoint id 小于等于当前 checkpoint 号的事务进行提交

    03


    Exactly Once 保证

    要保证流式写入的 exactly once 语义等同于需要保证数据的不重复以及不丢失。

    1 数据不重复保证

    基于 Flink 的流式任务产生数据重复的原因主要是 Flink 从某一个 checkpoint 启动时,重复提交之前已经提交过的数据。InLong 实时写入中,状态中会记录本 checkpoint 下 prepare 成功的事务 id ,故障恢复时,会提交该事务 id, 如果该事务 id 在之前的流程中被提交过,StarRocks 会返回报错信息表示该事务 id 已经提交过,该次提交会被忽略,通过这种机制保证了数据的不重复。

    2 数据不丢失保证

    假设在数据写入过程中,有部分数据写入失败,Flink checkpoint 机制会保证任务重启后从上一个保存点启动,Source 端会从上次保存消费位置开始消费,这样能够保证数据的不丢失,之前写入失败的数据会在重启后继续执行写入。

    性能测试

    01


    测试步骤

    使用 Iceberg -> StarRocks 链路进行压测
    • 使用压测程序以 1000 qps 的写入速度生成数据到 Iceberg 测试表中

    • 生成到 1 亿数据时,启动实时同步任务同步到 StarRocks 表

    • StarRocks 测试表清空

    • 测试在不同并行度的情况下任务追上最新数据的时间

    02


    Iceberg -> StarRocks 测试结果


    并行度数据量同步时间同步吞吐
    场景一11 亿60 分钟27 Mb/s
    场景二31 亿10 分钟162 Mb/s
    场景三401 亿2 分钟800 Mb/s
    本测试结果可能受到 Iceberg 读取速度、Iceberg 压缩方式、单条数据字段数影响,实际使用速度与测试可能有出入。

    结语

    欢迎大家使用 InLong 进行 StarRocks 实时同步,使用文档可以参考实时同步 MySQL->StarRocks[2] ,使用过程中有任何问题都可以关注“Apache InLong”公众号,进入微信交流群讨论交流!
    参考资料
    [1]

    Stream_Load_Transaction_Interface: https://docs.starrocks.io/docs/loading/Stream_Load_transaction_interface/

    [2]

    实时同步 MySQL->StarRocks: https://inlong.apache.org/zh-CN/docs/next/quick_start/data_sync/mysql_starrocks_example/


    关于 StarRocks 

    Linux 基金会项目 StarRocks 是数据分析新范式的开创者、新标准的领导者。面世三年来,StarRocks 一直专注打造世界顶级的新一代极速全场景 MPP 数据库,帮助企业构建极速统一的湖仓分析新范式,是实现数字化转型和降本增效的关键基础设施。
    StarRocks 持续突破既有框架,以技术创新全面驱动用户业务发展。当前全球超过 330 家市值 70 亿元以上的头部企业都在基于 StarRocks 构建新一代数据分析能力,包括腾讯、携程、平安银行、中原银行、中信建投、招商证券、大润发、百草味、顺丰、京东物流、TCL、OPPO 等,并与全球云计算领导者亚马逊云、阿里云、腾讯云等达成战略合作伙伴。
    拥抱开源,StarRocks 全球开源社区飞速成长。目前,已有超过 300 位贡献者,社群用户近万人,吸引几十家国内外行业头部企业参与共建。项目在 GitHub 星数已超 7200 个,成为年度开源热力值增速第一的项目,市场渗透率跻身中国前十名。


    金融:中信建投中原银行 | 申万宏源 | 平安银行 | 中欧财富


    互联网:微信|小红书|网易邮箱|滴滴|美团餐饮SaaS | B站|携程 | 同程旅行|36058同城|芒果TV得物 |贝壳|汽车之家


    游戏:腾讯游戏波克城市欢聚集团37手游 | 游族网络


    新经济:蔚来汽车理想汽车顺丰京东物流跨越速运 | 大润发华润万家TCL |万物新生 | 百草味 | 多点 DMALL 酷开科技


    StarRocks 技术内幕:极速湖仓神器:物化视图存算分离,兼顾降本与增效   |实时更新与极速查询如何兼得Query Cache,一招搞定高并发资源隔离大数据自动管理查询原理浅析

    文章转载自StarRocks,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

    评论