暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

数仓实战|Routine Load实时同步Kafka数据到Doris

数据中台研习社 2021-08-19
1118

      

       例行导入(Routine Load)功能为用户提供了一种自动从指定数据源进行数据导入的功能。当前我们仅支持从 Kafka 系统进行例行导入。  

基本原理

      Routine Load 的基本原理是:

Client 向 FE 提交一个例行导入作业。

FE 通过 JobScheduler 将一个导入作业拆分成若干个 Task。每个 Task 负责导入指定的一部分数据。Task 被 TaskScheduler 分配到指定的 BE 上执行。

在 BE 上,一个 Task 被视为一个普通的导入任务,通过 Stream Load 的导入机制进行导入。导入完成后,向 FE 汇报。

FE 中的 JobScheduler 根据汇报结果,继续生成后续新的 Task,或者对失败的 Task 进行重试。整个例行导入作业通过不断的产生新的 Task,来完成数据不间断的导入。 

使用场景

     Routine Load 在数据仓库中主要有两种应用场景:

(1)接口数据导入。由于批处理抽取数据存在大量重复抽取的情况,越来越多的交易系统采用binlog或者直接提供接口更新数据到Kafka的方式来完成接口数据的对接。针对binlog日志或者Kafka消息队列,批处理程序是无法抽取的,所以需要采用流式数据写入。

(2)实时数仓结果数据导入。根据Lambda架构,实时数据通过Kafka对接以后,继续经由Flink加工,加工完的数据继续写回Kafka,然后由Routine Load加载到Doris数据库,即可直接供数据分析应用读取数据。这种应用场景还不太成熟。

使用限制

  1. 支持无认证的 Kafka 访问,以及通过 SSL 方式认证的 Kafka 集群。

  2. 支持的消息格式为 csv, json 文本格式。csv 每一个 message 为一行,且行尾不包含换行符。

  3. 仅支持 Kafka 0.10.0.0(含) 以上版本。

例行导入任务案例

     例行导入的操作其实很简单,看再多的说明,不如一个案例来得实际。接下来直接给出两个导入案例。

案例一:数据源源不断的写入,不作更新或者删除操作

    CREATE ROUTINE LOAD ods_drp.rtl_ods_drp_cdc_st_entry_detail_et ON ods_drp_cdc_st_entry_detail_et 
    COLUMNS(ACCOUNT_LINE_ID, update_time,cdc_op,cdc_time=now())
    PROPERTIES
    (
    "desired_concurrent_number"="1",
    "max_batch_interval" = "20",
    "max_batch_rows" = "200000",
    "max_batch_size" = "104857600",
    "strict_mode" = "false",
    "strip_outer_array" = "true",
    "format" = "json",
    "json_root" = "$.data",
    "jsonpaths" = "[\"$.ACCOUNT_LINE_ID\",\"$.update_time\",\"$.type\"]"
    )
    FROM KAFKA
    (
    "kafka_broker_list" = "192.168.87.107:9092,192.168.87.108:9092,192.168.87.109:9092",
    "kafka_topic" = "drds_hana_ods_st_entry_detail_et",
    "kafka_partitions" = "0",
    "kafka_offsets" = "OFFSET_BEGINNING",
    "property.group.id" = "ods_drp_st_entry_detail_et",
    "property.client.id" = "doris"
    );

    案例二:接口数据存在删除和更新操作

         首先需要设置目标表为支持批量删除的模式:

      ALTER TABLE ods_drp.ods_drp_vip_weixin ENABLE FEATURE "BATCH_DELETE";

      然后创建导入任务:

        CREATE ROUTINE LOAD ods_drp.rtl_ods_drp_vip_weixin ON ods_drp_vip_weixin
        WITH MERGE
        COLUMNS(rec_id, vip_user_id, vip_id, vip_code, tel, vip_source, openid, unionid, appid, brand_code, create_user_name, create_user, create_time, modify_user_name, modify_user, modify_time, version, update_time,CDC_OP),
        DELETE ON CDC_OP="DELETE"
        PROPERTIES
        (
        "desired_concurrent_number"="1",
        "max_batch_interval" = "20",
        "max_batch_rows" = "200000",
        "max_batch_size" = "104857600",
        "strict_mode" = "false",
        "strip_outer_array" = "true",
        "format" = "json",
        "json_root" = "$.data",
        "jsonpaths" = "[\"$.rec_id\",\"$.vip_user_id\",\"$.vip_id\",\"$.vip_code\",\"$.tel\",\"$.vip_source\",\"$.openid\",\"$.unionid\",\"$.appid\",\"$.brand_code\",\"$.create_user_name\",\"$.create_user\",\"$.create_time\",\"$.modify_user_name\",\"$.modify_user\",\"$.modify_time\",\"$.version\",\"$.update_time\",\"$.type\"]"
        )
        FROM KAFKA
        (
        "kafka_broker_list" = "192.168.87.107:9092,192.168.87.108:9092,192.168.87.109:9092",
        "kafka_topic" = "drds_hana_ods_vip_weixin",
        "kafka_partitions" = "0",
        "kafka_offsets" = "OFFSET_BEGINNING",
        "property.group.id" = "ods_drp_vip_weixin",
        "property.client.id" = "doris"
        );

               其中,前面的PROPERTIES括号里面存放的是加载的配置信息,KAFKA后面的括号里面存放的是KAFKA的配置信息。


        例行导入常用操作

              创建完 Routine Load 任务以后, Routine Load 会在后台持续运行。为了监控和检查 Routine Load 任务状态,我们需要进入对应的数据库schema下执行命令查看任务。

          show routine load; --用于显示所有的例行导入任务状态
          pause routine load for xxx; --暂停xxx导入任务
          resume routine load for xxx; --重启xxx导入任务
          stop routine load for xxx; --停止xxx导入任务,停止以后任务会从队列中消失
          ALTER ROUTINE LOAD FOR XX
          PROPERTIES
          (
          "desired_concurrent_number" = "1"
          )
          FROM kafka
          (
          "kafka_partitions" = "0",
          "kafka_offsets" = "OFFSET_BEGINNING",
          "property.group.id" = "xxx_topic",
          "property.client.id" = "doris"
          ); --修改任务参数,从kafka队列最开始重新读取


              




          数据中台研习社》微信群,请添:laowang5244,备注【进群】


                                                                   🧐分享、点赞、在看,给个三连击呗!👇


          文章转载自数据中台研习社,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

          评论