例行导入(Routine Load)功能为用户提供了一种自动从指定数据源进行数据导入的功能。当前我们仅支持从 Kafka 系统进行例行导入。
基本原理
Routine Load 的基本原理是:
Client 向 FE 提交一个例行导入作业。
FE 通过 JobScheduler 将一个导入作业拆分成若干个 Task。每个 Task 负责导入指定的一部分数据。Task 被 TaskScheduler 分配到指定的 BE 上执行。
在 BE 上,一个 Task 被视为一个普通的导入任务,通过 Stream Load 的导入机制进行导入。导入完成后,向 FE 汇报。
FE 中的 JobScheduler 根据汇报结果,继续生成后续新的 Task,或者对失败的 Task 进行重试。整个例行导入作业通过不断的产生新的 Task,来完成数据不间断的导入。
使用场景
Routine Load 在数据仓库中主要有两种应用场景:
(1)接口数据导入。由于批处理抽取数据存在大量重复抽取的情况,越来越多的交易系统采用binlog或者直接提供接口更新数据到Kafka的方式来完成接口数据的对接。针对binlog日志或者Kafka消息队列,批处理程序是无法抽取的,所以需要采用流式数据写入。
(2)实时数仓结果数据导入。根据Lambda架构,实时数据通过Kafka对接以后,继续经由Flink加工,加工完的数据继续写回Kafka,然后由Routine Load加载到Doris数据库,即可直接供数据分析应用读取数据。这种应用场景还不太成熟。

使用限制
支持无认证的 Kafka 访问,以及通过 SSL 方式认证的 Kafka 集群。
支持的消息格式为 csv, json 文本格式。csv 每一个 message 为一行,且行尾不包含换行符。
仅支持 Kafka 0.10.0.0(含) 以上版本。
例行导入任务案例
例行导入的操作其实很简单,看再多的说明,不如一个案例来得实际。接下来直接给出两个导入案例。
案例一:数据源源不断的写入,不作更新或者删除操作
CREATE ROUTINE LOAD ods_drp.rtl_ods_drp_cdc_st_entry_detail_et ON ods_drp_cdc_st_entry_detail_etCOLUMNS(ACCOUNT_LINE_ID, update_time,cdc_op,cdc_time=now())PROPERTIES("desired_concurrent_number"="1","max_batch_interval" = "20","max_batch_rows" = "200000","max_batch_size" = "104857600","strict_mode" = "false","strip_outer_array" = "true","format" = "json","json_root" = "$.data","jsonpaths" = "[\"$.ACCOUNT_LINE_ID\",\"$.update_time\",\"$.type\"]")FROM KAFKA("kafka_broker_list" = "192.168.87.107:9092,192.168.87.108:9092,192.168.87.109:9092","kafka_topic" = "drds_hana_ods_st_entry_detail_et","kafka_partitions" = "0","kafka_offsets" = "OFFSET_BEGINNING","property.group.id" = "ods_drp_st_entry_detail_et","property.client.id" = "doris");
案例二:接口数据存在删除和更新操作
首先需要设置目标表为支持批量删除的模式:
ALTER TABLE ods_drp.ods_drp_vip_weixin ENABLE FEATURE "BATCH_DELETE";
然后创建导入任务:
CREATE ROUTINE LOAD ods_drp.rtl_ods_drp_vip_weixin ON ods_drp_vip_weixinWITH MERGECOLUMNS(rec_id, vip_user_id, vip_id, vip_code, tel, vip_source, openid, unionid, appid, brand_code, create_user_name, create_user, create_time, modify_user_name, modify_user, modify_time, version, update_time,CDC_OP),DELETE ON CDC_OP="DELETE"PROPERTIES("desired_concurrent_number"="1","max_batch_interval" = "20","max_batch_rows" = "200000","max_batch_size" = "104857600","strict_mode" = "false","strip_outer_array" = "true","format" = "json","json_root" = "$.data","jsonpaths" = "[\"$.rec_id\",\"$.vip_user_id\",\"$.vip_id\",\"$.vip_code\",\"$.tel\",\"$.vip_source\",\"$.openid\",\"$.unionid\",\"$.appid\",\"$.brand_code\",\"$.create_user_name\",\"$.create_user\",\"$.create_time\",\"$.modify_user_name\",\"$.modify_user\",\"$.modify_time\",\"$.version\",\"$.update_time\",\"$.type\"]")FROM KAFKA("kafka_broker_list" = "192.168.87.107:9092,192.168.87.108:9092,192.168.87.109:9092","kafka_topic" = "drds_hana_ods_vip_weixin","kafka_partitions" = "0","kafka_offsets" = "OFFSET_BEGINNING","property.group.id" = "ods_drp_vip_weixin","property.client.id" = "doris");
其中,前面的PROPERTIES括号里面存放的是加载的配置信息,KAFKA后面的括号里面存放的是KAFKA的配置信息。
例行导入常用操作
创建完 Routine Load 任务以后, Routine Load 会在后台持续运行。为了监控和检查 Routine Load 任务状态,我们需要进入对应的数据库schema下执行命令查看任务。
show routine load; --用于显示所有的例行导入任务状态pause routine load for xxx; --暂停xxx导入任务resume routine load for xxx; --重启xxx导入任务stop routine load for xxx; --停止xxx导入任务,停止以后任务会从队列中消失ALTER ROUTINE LOAD FOR XXPROPERTIES("desired_concurrent_number" = "1")FROM kafka("kafka_partitions" = "0","kafka_offsets" = "OFFSET_BEGINNING","property.group.id" = "xxx_topic","property.client.id" = "doris"); --修改任务参数,从kafka队列最开始重新读取

《数据中台研习社》微信群,请添:laowang5244,备注【进群】
🧐分享、点赞、在看,给个三连击呗!👇




