1、MySQL源表结构
CREATE TABLE `customs_declaration` (
`id` bigint NOT NULL COMMENT '报关单ID',
`import_export_flag` varchar(1) CHARACTER SET utf8mb4 COLLATE utf8mb4_bin DEFAULT NULL COMMENT '进出口标志;I:进口;E:出口',
`unified_number` varchar(18) CHARACTER SET utf8mb4 COLLATE utf8mb4_bin DEFAULT NULL COMMENT '统一编号',
`business_type` varchar(3) CHARACTER SET utf8mb4 COLLATE utf8mb4_bin DEFAULT NULL COMMENT '业务类型',
`declaration_no` varchar(36) CHARACTER SET utf8mb4 COLLATE utf8mb4_bin DEFAULT NULL COMMENT '申报单号',
`custom_master_code` varchar(100) CHARACTER SET utf8mb4 COLLATE utf8mb4_bin DEFAULT NULL COMMENT '申报地海关',
`entry_exit_customs_code` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_bin DEFAULT NULL COMMENT '出入境关别代码',
`entry_exit_port_code` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_bin DEFAULT NULL COMMENT '进境口岸/离境口岸代码',
`record_no` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_bin DEFAULT NULL COMMENT '备案号',
`contract_no` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_bin DEFAULT NULL COMMENT '合同协议号',
`entry_exit_date` date DEFAULT NULL COMMENT '出入境日期',
`declare_date` datetime DEFAULT NULL COMMENT '申报日期',
`domestic_credit_code` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_bin DEFAULT NULL COMMENT '境内收发货人-社会信用代码',
`domestic_customs_code` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_bin DEFAULT NULL COMMENT '境内收发货人-海关代码',
`domestic_quarantine_no` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_bin DEFAULT NULL COMMENT '境内收发货人-检验检疫编码',
`domestic_enterprise_name` varchar(200) CHARACTER SET utf8mb4 COLLATE utf8mb4_bin DEFAULT NULL COMMENT '境内收发货人-企业名称',
`declare_enterprise_name` varchar(200) CHARACTER SET utf8mb4 COLLATE utf8mb4_bin DEFAULT NULL COMMENT '申报单位--企业名称',
`transport_type_code` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_bin DEFAULT NULL COMMENT '运输方式',
`transport_tools` varchar(150) CHARACTER SET utf8mb4 COLLATE utf8mb4_bin DEFAULT NULL COMMENT '运输工具名称',
`voyage_no` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_bin DEFAULT NULL COMMENT '航次号',
`take_voyage_no` varchar(32) CHARACTER SET utf8mb4 COLLATE utf8mb4_bin DEFAULT NULL COMMENT '提运单号',
`supervise_type_code` varchar(4) CHARACTER SET utf8mb4 COLLATE utf8mb4_bin DEFAULT NULL COMMENT '监管方式',
`exempt_nature_code` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_bin DEFAULT NULL COMMENT '征免性质',
`licence_no` varchar(20) CHARACTER SET utf8mb4 COLLATE utf8mb4_bin DEFAULT NULL COMMENT '许可证号',
`arrival_country_regions_code` varchar(3) CHARACTER SET utf8mb4 COLLATE utf8mb4_bin DEFAULT NULL COMMENT '运抵国 启运国',
`departure_or_arrival_port_code` varchar(6) CHARACTER SET utf8mb4 COLLATE utf8mb4_bin DEFAULT NULL COMMENT '经停港/指运港',
`deal_type_code` tinyint DEFAULT NULL COMMENT '成交方式',
`trade_country_regions_code` varchar(3) CHARACTER SET utf8mb4 COLLATE utf8mb4_bin DEFAULT NULL COMMENT '贸易国别(地区)',
`wrap_count` int DEFAULT NULL COMMENT '件数',
`wrap_type_code` varchar(2) CHARACTER SET utf8mb4 COLLATE utf8mb4_bin DEFAULT NULL COMMENT '包装种类',
`container_count` int DEFAULT NULL COMMENT '集装箱数',
`item_count` int DEFAULT NULL COMMENT '商品项数',
`document_type` varchar(4) CHARACTER SET utf8mb4 COLLATE utf8mb4_bin DEFAULT NULL COMMENT '单据类型',
`customs_type` varchar(1) CHARACTER SET utf8mb4 COLLATE utf8mb4_bin DEFAULT NULL COMMENT '报关单类型',
`relation_type` tinyint DEFAULT NULL COMMENT '报关/转关关系标志',
`creation_method` int DEFAULT NULL COMMENT '报关单创建方式',
`tax_paperless` tinyint DEFAULT NULL COMMENT '税单无纸化标志',
`self_declaration` tinyint DEFAULT NULL COMMENT '自主报税标志',
`water_transit` tinyint DEFAULT NULL COMMENT '水运中转标志',
`self_declaration_pay` tinyint DEFAULT NULL COMMENT '自报自缴标志',
`vouch_flag` tinyint DEFAULT NULL COMMENT '担保验放标志',
`overseas_platform` tinyint DEFAULT NULL COMMENT '跨境电商海外平台标志',
`special_channel` tinyint DEFAULT NULL COMMENT '特殊通道标志',
`tax_flag` tinyint DEFAULT NULL COMMENT '税收征管标记',
`foreign_verification_no` varchar(32) CHARACTER SET utf8mb4 COLLATE utf8mb4_bin DEFAULT NULL COMMENT '外汇核销单号',
`source` int DEFAULT NULL COMMENT '报关单来源',
`creator` bigint DEFAULT NULL COMMENT '创建人ID',
`created_time` datetime DEFAULT NULL COMMENT '创建时间',
`modifier` bigint DEFAULT NULL COMMENT '修改人ID',
`modify_time` datetime DEFAULT NULL COMMENT '修改时间',
`status` varchar(6) CHARACTER SET utf8mb4 COLLATE utf8mb4_bin DEFAULT NULL COMMENT '最后一次回执状态仅作冗余,以回执表为准',
`status_explain` varchar(100) CHARACTER SET utf8mb4 COLLATE utf8mb4_bin DEFAULT NULL COMMENT '最后一次回执说明仅作冗余,以回执表为准',
`data_query_status` varchar(500) CHARACTER SET utf8mb4 COLLATE utf8mb4_bin DEFAULT NULL COMMENT '电子口岸-数据状态',
`date_dec` datetime DEFAULT NULL COMMENT '电子口岸-发送时间',
`electronic_unified_number` varchar(18) CHARACTER SET utf8mb4 COLLATE utf8mb4_bin DEFAULT NULL COMMENT '电子口岸统一编号',
`cus_dec_status` varchar(6) CHARACTER SET utf8mb4 COLLATE utf8mb4_bin DEFAULT NULL COMMENT '报关单状态(单一拉回来的状态)',
PRIMARY KEY (`id`),
UNIQUE KEY `uk_declaration_no_customs_declaration` (`declaration_no`) USING BTREE,
UNIQUE KEY `uk_unified_number_customs_declaration` (`unified_number`) USING BTREE,
KEY `idx_entry_exit_date_customs_declaration` (`entry_exit_date`),
KEY `idx_domestic_credit_code_customs_declaration` (`domestic_credit_code`),
KEY `idx_domestic_enterprise_name_customs_declaration` (`domestic_enterprise_name`),
KEY `idx_created_time_customs_declaration` (`created_time`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin COMMENT='报关单';
2、MySQL的日期类型自定义转换器
点击下载自定义转换器源码
下载后把源码打包成 jar包:debezium-extension-2.6.0.Final.jar
然后上传jar包到容器目录中:
docker cp debezium-extension-2.6.0.Final.jar debezium-connect:/kafka/connect/debezium-connector-mysql/
最后重启容器:
docker-compose restart
3、注册MySQL的源连接器
3.1、编辑json文件
[root@192 source-connect]# cat mysql-connect-source-5.json
{
"name": "mysql-connect-source-5",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"database.user": "数据库账号",
"database.hostname": "数据库IP",
"database.password": "数据库密码",
"database.port": "数据库端口",
"database.server.id": "119009005",
"schema.history.internal.kafka.bootstrap.servers": "KafkaIP地址:9092",
"schema.history.internal.kafka.topic": "schemahistory.mysql_connect_source-5",
"heartbeat.interval.ms": "100",
"topic.prefix": "mysql_connect_source-5",
"snapshot.mode": "initial",
"database.include.list": "bill_of_entry_test_0,bill_of_entry_test_1",
"table.include.list": "bill_of_entry_test_0.customs_declaration,bill_of_entry_test_1.customs_declaration",
"tombstones.on.delete": "true",
"decimal.handling.mode": "string",
"transforms": "unwrap",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms.unwrap.drop.tombstones": "false",
"transforms.unwrap.delete.handling.mode": "rewrite",
"converters":"datetime",
"datetime.type":"com.darcytech.debezium.converter.MySqlDateTimeConverter",
"datetime.format.date":"yyyy-MM-dd",
"datetime.format.time":"HH:mm:ss",
"datetime.format.datetime":"yyyy-MM-dd HH:mm:ss",
"datetime.format.timestamp":"yyyy-MM-dd HH:mm:ss",
"datetime.format.timestamp.zone":"UTC+8"
}
}
3.2、执行注册命令
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" debezium的部署IP:8083/connectors/ -d @mysql-connect-source-5.json
请注意修改【debezium的部署IP】为实际的IP。
3.3、查看连接器状态
[root@192 source-connect]# curl -s debezium的部署IP:8083/connectors/mysql-connect-source-5/status | jq
{
"name": "mysql-connect-source-5",
"connector": {
"state": "RUNNING",
"worker_id": "175.66.30.2:8083"
},
"tasks": [
{
"id": 0,
"state": "RUNNING",
"worker_id": "175.66.30.2:8083"
}
],
"type": "source"
}
如上看到tasks的状态都是RUNNING就表示正常。
3.4、观察Kafka的消息格式
{
"schema": {
"type": "struct",
"fields": [
{
"type": "int64",
"optional": false,
"field": "id"
},
{
"type": "string",
"optional": true,
"field": "import_export_flag"
},
{
"type": "string",
"optional": true,
"field": "unified_number"
},
{
"type": "string",
"optional": true,
"field": "business_type"
},
{
"type": "string",
"optional": true,
"field": "declaration_no"
},
{
"type": "string",
"optional": true,
"field": "custom_master_code"
},
{
"type": "string",
"optional": true,
"field": "entry_exit_customs_code"
},
{
"type": "string",
"optional": true,
"field": "entry_exit_port_code"
},
{
"type": "string",
"optional": true,
"field": "record_no"
},
{
"type": "string",
"optional": true,
"field": "contract_no"
},
{
"type": "string",
"optional": true,
"name": "com.darcytech.debezium.date.string",
"field": "entry_exit_date"
},
{
"type": "string",
"optional": true,
"name": "com.darcytech.debezium.datetime.string",
"field": "declare_date"
},
{
"type": "string",
"optional": true,
"field": "domestic_credit_code"
},
{
"type": "string",
"optional": true,
"field": "domestic_customs_code"
},
{
"type": "string",
"optional": true,
"field": "domestic_quarantine_no"
},
{
"type": "string",
"optional": true,
"field": "domestic_enterprise_name"
},
{
"type": "string",
"optional": true,
"field": "declare_enterprise_name"
},
{
"type": "string",
"optional": true,
"field": "transport_type_code"
},
{
"type": "string",
"optional": true,
"field": "transport_tools"
},
{
"type": "string",
"optional": true,
"field": "voyage_no"
},
{
"type": "string",
"optional": true,
"field": "take_voyage_no"
},
{
"type": "string",
"optional": true,
"field": "supervise_type_code"
},
{
"type": "string",
"optional": true,
"field": "exempt_nature_code"
},
{
"type": "string",
"optional": true,
"field": "licence_no"
},
{
"type": "string",
"optional": true,
"field": "arrival_country_regions_code"
},
{
"type": "string",
"optional": true,
"field": "departure_or_arrival_port_code"
},
{
"type": "int16",
"optional": true,
"field": "deal_type_code"
},
{
"type": "string",
"optional": true,
"field": "trade_country_regions_code"
},
{
"type": "int32",
"optional": true,
"field": "wrap_count"
},
{
"type": "string",
"optional": true,
"field": "wrap_type_code"
},
{
"type": "int32",
"optional": true,
"field": "container_count"
},
{
"type": "int32",
"optional": true,
"field": "item_count"
},
{
"type": "string",
"optional": true,
"field": "document_type"
},
{
"type": "string",
"optional": true,
"field": "customs_type"
},
{
"type": "int16",
"optional": true,
"field": "relation_type"
},
{
"type": "int32",
"optional": true,
"field": "creation_method"
},
{
"type": "int16",
"optional": true,
"field": "tax_paperless"
},
{
"type": "int16",
"optional": true,
"field": "self_declaration"
},
{
"type": "int16",
"optional": true,
"field": "water_transit"
},
{
"type": "int16",
"optional": true,
"field": "self_declaration_pay"
},
{
"type": "int16",
"optional": true,
"field": "vouch_flag"
},
{
"type": "int16",
"optional": true,
"field": "overseas_platform"
},
{
"type": "int16",
"optional": true,
"field": "special_channel"
},
{
"type": "int16",
"optional": true,
"field": "tax_flag"
},
{
"type": "string",
"optional": true,
"field": "foreign_verification_no"
},
{
"type": "int32",
"optional": true,
"field": "source"
},
{
"type": "int64",
"optional": true,
"field": "creator"
},
{
"type": "string",
"optional": true,
"name": "com.darcytech.debezium.datetime.string",
"field": "created_time"
},
{
"type": "int64",
"optional": true,
"field": "modifier"
},
{
"type": "string",
"optional": true,
"name": "com.darcytech.debezium.datetime.string",
"field": "modify_time"
},
{
"type": "string",
"optional": true,
"field": "status"
},
{
"type": "string",
"optional": true,
"field": "status_explain"
},
{
"type": "string",
"optional": true,
"field": "data_query_status"
},
{
"type": "string",
"optional": true,
"name": "com.darcytech.debezium.datetime.string",
"field": "date_dec"
},
{
"type": "string",
"optional": true,
"field": "electronic_unified_number"
},
{
"type": "string",
"optional": true,
"field": "cus_dec_status"
},
{
"type": "string",
"optional": true,
"field": "__deleted"
}
],
"optional": false,
"name": "mysql_connect_source-5.bill_of_entry_test_0.customs_declaration.Value"
},
"payload": {
"id": 3006719137197064000,
"import_export_flag": "E",
"unified_number": "E20210000593246977",
"business_type": null,
"declaration_no": "530420210040339743",
"custom_master_code": "5304",
"entry_exit_customs_code": "5304",
"entry_exit_port_code": "470101",
"record_no": "",
"contract_no": "XS3-11-202103008",
"entry_exit_date": "2021-03-17",
"declare_date": "2021-03-13 00:00:00",
"domestic_credit_code": "数据已脱敏",
"domestic_customs_code": "数据已脱敏",
"domestic_quarantine_no": "",
"domestic_enterprise_name": "数据已脱敏",
"declare_enterprise_name": "数据已脱敏",
"transport_type_code": "2",
"transport_tools": "UN9767998",
"voyage_no": "0MH7VE1MA",
"take_voyage_no": "EE32103122817",
"supervise_type_code": "0110",
"exempt_nature_code": "101",
"licence_no": "",
"arrival_country_regions_code": "MEX",
"departure_or_arrival_port_code": "MEX039",
"deal_type_code": 3,
"trade_country_regions_code": "HKG",
"wrap_count": 2080,
"wrap_type_code": "22",
"container_count": 2,
"item_count": 1,
"document_type": null,
"customs_type": "M",
"relation_type": 0,
"creation_method": 4,
"tax_paperless": null,
"self_declaration": null,
"water_transit": null,
"self_declaration_pay": null,
"vouch_flag": null,
"overseas_platform": null,
"special_channel": null,
"tax_flag": null,
"foreign_verification_no": null,
"source": 1,
"creator": 2859727746966257700,
"created_time": "2024-06-14 13:46:33",
"modifier": 2859727746966257700,
"modify_time": "2024-06-14 13:46:33",
"status": null,
"status_explain": null,
"data_query_status": null,
"date_dec": null,
"electronic_unified_number": null,
"cus_dec_status": "10",
"__deleted": "false"
}
}
观察这个消息格式,发现数据部分不是debezium的默认方式,默认方式在payload里面分before和after两个部分,另外通过op字段标识数据操作类型,而这里我简化了,因为我使用了转换配置,就是下面这几行的作用:
“transforms”: “unwrap”,
“transforms.unwrap.type”: “io.debezium.transforms.ExtractNewRecordState”,
“transforms.unwrap.drop.tombstones”: “false”,
“transforms.unwrap.delete.handling.mode”: “rewrite”,
另外,日期字段也被转换成字符串,比如"created_time": “2024-06-14 13:46:33”,是因为用了自定义转换器,也就是下面这几行:
“converters”:“datetime”,
“datetime.type”:“com.darcytech.debezium.converter.MySqlDateTimeConverter”,
“datetime.format.date”:“yyyy-MM-dd”,
“datetime.format.time”:“HH:mm:ss”,
“datetime.format.datetime”:“yyyy-MM-dd HH:mm:ss”,
“datetime.format.timestamp”:“yyyy-MM-dd HH:mm:ss”,
“datetime.format.timestamp.zone”:“UTC+8”
4、Doris目标端
4.1、Doris的安装
安装直接参考官网快速体验单机安装方式
4.2、创建Doris目标表
Doris部署好后,就可以创建test库,然后在test库下创建表,本次实验采用的是唯一模型建表,脚本如下:
use test
CREATE TABLE IF NOT EXISTS `customs_declaration` (
`id` BIGINT NOT NULL COMMENT '报关单ID',
`declaration_no` varchar(36) COMMENT '申报单号',
`unified_number` varchar(18) COMMENT '统一编号',
`declare_date` datetime COMMENT '申报日期',
`import_export_flag` varchar(1) COMMENT '进出口标志;I:进口;E:出口',
`business_type` varchar(3) COMMENT '业务类型',
`custom_master_code` varchar(100) COMMENT '申报地海关',
`entry_exit_customs_code` varchar(50) COMMENT '出入境关别代码',
`entry_exit_port_code` varchar(50) COMMENT '进境口岸/离境口岸代码',
`record_no` varchar(50) COMMENT '备案号',
`contract_no` varchar(50) COMMENT '合同协议号',
`entry_exit_date` date COMMENT '出入境日期',
`domestic_credit_code` varchar(50) COMMENT '境内收发货人-社会信用代码',
`domestic_customs_code` varchar(50) COMMENT '境内收发货人-海关代码',
`domestic_quarantine_no` varchar(50) COMMENT '境内收发货人-检验检疫编码',
`domestic_enterprise_name` varchar(200) COMMENT '境内收发货人-企业名称',
`declare_enterprise_name` varchar(200) COMMENT '申报单位--企业名称',
`transport_type_code` varchar(50) COMMENT '运输方式',
`transport_tools` varchar(150) COMMENT '运输工具名称',
`voyage_no` varchar(50) COMMENT '航次号',
`take_voyage_no` varchar(32) COMMENT '提运单号',
`supervise_type_code` varchar(4) COMMENT '监管方式',
`exempt_nature_code` varchar(50) COMMENT '征免性质',
`licence_no` varchar(20) COMMENT '许可证号',
`arrival_country_regions_code` varchar(3) COMMENT '运抵国 启运国',
`departure_or_arrival_port_code` varchar(6) COMMENT '经停港/指运港',
`deal_type_code` tinyint COMMENT '成交方式',
`trade_country_regions_code` varchar(3) COMMENT '贸易国别(地区)',
`wrap_count` int COMMENT '件数',
`wrap_type_code` varchar(10) COMMENT '包装种类',
`container_count` int COMMENT '集装箱数',
`item_count` int COMMENT '商品项数',
`document_type` varchar(4) COMMENT '单据类型',
`customs_type` varchar(1) COMMENT '报关单类型',
`relation_type` tinyint COMMENT '报关/转关关系标志',
`creation_method` int COMMENT '报关单创建方式',
`tax_paperless` tinyint COMMENT '税单无纸化标志',
`self_declaration` tinyint COMMENT '自主报税标志',
`water_transit` tinyint COMMENT '水运中转标志',
`self_declaration_pay` tinyint COMMENT '自报自缴标志',
`vouch_flag` tinyint COMMENT '担保验放标志',
`overseas_platform` tinyint COMMENT '跨境电商海外平台标志',
`special_channel` tinyint COMMENT '特殊通道标志',
`tax_flag` tinyint COMMENT '税收征管标记',
`foreign_verification_no` varchar(32) COMMENT '外汇核销单号',
`source` int COMMENT '报关单来源',
`creator` bigint COMMENT '创建人ID',
`created_time` datetime COMMENT '创建时间',
`modifier` bigint COMMENT '修改人ID',
`modify_time` datetime COMMENT '修改时间',
`status` varchar(6) COMMENT '最后一次回执状态仅作冗余,以回执表为准',
`status_explain` varchar(100) COMMENT '最后一次回执说明仅作冗余,以回执表为准',
`data_query_status` varchar(500) COMMENT '电子口岸-数据状态',
`date_dec` datetime COMMENT '电子口岸-发送时间',
`electronic_unified_number` varchar(18) COMMENT '电子口岸统一编号',
`cus_dec_status` varchar(6) COMMENT '报关单状态(单一拉回来的状态)',
`__deleted` varchar(10) COMMENT '数据操作标识'
)
UNIQUE KEY(`id`)
DISTRIBUTED BY HASH(`id`) BUCKETS 1
PROPERTIES (
"replication_allocation" = "tag.location.default: 1"
);
4.3、创建Doris的routine load
根据Doris的官方文档,Doris提供多种数据加载方式,其中routine load就可以实现实时消费Kafka数据的功能,本实验就采用这个方式加载数据,脚本如下:
CREATE ROUTINE LOAD test.rl_job_customs_declaration_1 ON customs_declaration
WITH MERGE
COLUMNS(id,
declaration_no,
unified_number,
declare_date,
import_export_flag,
business_type,
custom_master_code,
entry_exit_customs,
entry_exit_port_code,
record_no,
contract_no,
entry_exit_date,
domestic_credit_code,
domestic_customs_code,
domestic_quarantine_no,
domestic_enterprise_name,
declare_enterprise_name,
transport_type_code,
transport_tools,
voyage_no,
take_voyage_no,
supervise_type_code,
exempt_nature_code,
licence_no,
arrival_country_regions_code,
departure_or_arrival_port_code,
deal_type_code,
trade_country_regions_code,
wrap_count,
wrap_type_code,
container_count,
item_count,
document_type,
customs_type,
relation_type,
creation_method,
tax_paperless,
self_declaration,
water_transit,
self_declaration_pay,
vouch_flag,
overseas_platform,
special_channel,
tax_flag,
foreign_verification_no,
source,
creator,
created_time,
modifier,
modify_time,
status,
status_explain,
data_query_status,
date_dec,
electronic_unified_number,
cus_dec_status,
__deleted),
DELETE ON __deleted ='true'
PROPERTIES(
"format"="json",
"jsonpaths"="[\"$.payload.id\",
\"$.payload.declaration_no\",
\"$.payload.unified_number\",
\"$.payload.declare_date\",
\"$.payload.import_export_flag\",
\"$.payload.business_type\",
\"$.payload.custom_master_code\",
\"$.payload.entry_exit_customs_code\",
\"$.payload.entry_exit_port_code\",
\"$.payload.record_no\",
\"$.payload.contract_no\",
\"$.payload.entry_exit_date\",
\"$.payload.domestic_credit_code\",
\"$.payload.domestic_customs_code\",
\"$.payload.domestic_quarantine_no\",
\"$.payload.domestic_enterprise_name\",
\"$.payload.declare_enterprise_name\",
\"$.payload.transport_type_code\",
\"$.payload.transport_tools\",
\"$.payload.voyage_no\",
\"$.payload.take_voyage_no\",
\"$.payload.supervise_type_code\",
\"$.payload.exempt_nature_code\",
\"$.payload.licence_no\",
\"$.payload.arrival_country_regions_code\",
\"$.payload.departure_or_arrival_port_code\",
\"$.payload.deal_type_code\",
\"$.payload.trade_country_regions_code\",
\"$.payload.wrap_count\",
\"$.payload.wrap_type_code\",
\"$.payload.container_count\",
\"$.payload.item_count\",
\"$.payload.document_type\",
\"$.payload.customs_type\",
\"$.payload.relation_type\",
\"$.payload.creation_method\",
\"$.payload.tax_paperless\",
\"$.payload.self_declaration\",
\"$.payload.water_transit\",
\"$.payload.self_declaration_pay\",
\"$.payload.vouch_flag\",
\"$.payload.overseas_platform\",
\"$.payload.special_channel\",
\"$.payload.tax_flag\",
\"$.payload.foreign_verification_no\",
\"$.payload.source\",
\"$.payload.creator\",
\"$.payload.created_time\",
\"$.payload.modifier\",
\"$.payload.modify_time\",
\"$.payload.status\",
\"$.payload.status_explain\",
\"$.payload.data_query_status\",
\"$.payload.date_dec\",
\"$.payload.electronic_unified_number\",
\"$.payload.cus_dec_status\",
\"$.payload.__deleted\"]"
)
FROM KAFKA(
"kafka_broker_list" = "KafkaIP地址:9092",
"kafka_topic" = "mysql_connect_source-5.bill_of_entry_test_1.customs_declaration",
"property.group.id" = "customs_declaration_0",
"property.kafka_default_offsets" = "OFFSET_BEGINNING"
);
4.4、查看ROUTINE LOAD任务的状态
mysql> SHOW ROUTINE LOAD FOR test.rl_job_customs_declaration_0\G
*************************** 1. row ***************************
Id: 189269
Name: rl_job_customs_declaration_1
CreateTime: 2024-06-27 14:36:37
PauseTime: NULL
EndTime: NULL
DbName: test
TableName: customs_declaration
IsMultiTable: false
State: RUNNING
DataSourceType: KAFKA
CurrentTaskNum: 1
JobProperties: {"max_batch_rows":"200000","timezone":"Asia/Shanghai","send_batch_parallelism":"1","load_to_single_tablet":"false","current_concurrent_number":"1","delete":"(`__deleted` = 'true')","partial_columns":"false","merge_type":"MERGE","exec_mem_limit":"2147483648","strict_mode":"false","jsonpaths":"[\"$.payload.id\",\n\"$.payload.declaration_no\",\n\"$.payload.unified_number\",\n\"$.payload.declare_date\",\n\"$.payload.import_export_flag\",\n\"$.payload.business_type\",\n\"$.payload.custom_master_code\",\n\"$.payload.entry_exit_customs_code\",\n\"$.payload.entry_exit_port_code\",\n\"$.payload.record_no\",\n\"$.payload.contract_no\",\n\"$.payload.entry_exit_date\",\n\"$.payload.domestic_credit_code\",\n\"$.payload.domestic_customs_code\",\n\"$.payload.domestic_quarantine_no\",\n\"$.payload.domestic_enterprise_name\",\n\"$.payload.declare_enterprise_name\",\n\"$.payload.transport_type_code\",\n\"$.payload.transport_tools\",\n\"$.payload.voyage_no\",\n\"$.payload.take_voyage_no\",\n\"$.payload.supervise_type_code\",\n\"$.payload.exempt_nature_code\",\n\"$.payload.licence_no\",\n\"$.payload.arrival_country_regions_code\",\n\"$.payload.departure_or_arrival_port_code\",\n\"$.payload.deal_type_code\",\n\"$.payload.trade_country_regions_code\",\n\"$.payload.wrap_count\",\n\"$.payload.wrap_type_code\",\n\"$.payload.container_count\",\n\"$.payload.item_count\",\n\"$.payload.document_type\",\n\"$.payload.customs_type\",\n\"$.payload.relation_type\",\n\"$.payload.creation_method\",\n\"$.payload.tax_paperless\",\n\"$.payload.self_declaration\",\n\"$.payload.water_transit\",\n\"$.payload.self_declaration_pay\",\n\"$.payload.vouch_flag\",\n\"$.payload.overseas_platform\",\n\"$.payload.special_channel\",\n\"$.payload.tax_flag\",\n\"$.payload.foreign_verification_no\",\n\"$.payload.source\",\n\"$.payload.creator\",\n\"$.payload.created_time\",\n\"$.payload.modifier\",\n\"$.payload.modify_time\",\n\"$.payload.status\",\n\"$.payload.status_explain\",\n\"$.payload.data_query_status\",\n\"$.payload.date_dec\",\n\"$.payload.electronic_unified_number\",\n\"$.payload.cus_dec_status\",\n\"$.payload.__deleted\"]","max_batch_interval":"10","max_batch_size":"104857600","fuzzy_parse":"false","partitions":"*","columnToColumnExpr":"id,declaration_no,unified_number,declare_date,import_export_flag,business_type,custom_master_code,entry_exit_customs,entry_exit_port_code,record_no,contract_no,entry_exit_date,domestic_credit_code,domestic_customs_code,domestic_quarantine_no,domestic_enterprise_name,declare_enterprise_name,transport_type_code,transport_tools,voyage_no,take_voyage_no,supervise_type_code,exempt_nature_code,licence_no,arrival_country_regions_code,departure_or_arrival_port_code,deal_type_code,trade_country_regions_code,wrap_count,wrap_type_code,container_count,item_count,document_type,customs_type,relation_type,creation_method,tax_paperless,self_declaration,water_transit,self_declaration_pay,vouch_flag,overseas_platform,special_channel,tax_flag,foreign_verification_no,source,creator,created_time,modifier,modify_time,status,status_explain,data_query_status,date_dec,electronic_unified_number,cus_dec_status,__deleted","whereExpr":"*","desired_concurrent_number":"5","precedingFilter":"*","format":"json","max_error_number":"0","max_filter_ratio":"1.0","json_root":"","strip_outer_array":"false","num_as_string":"false"}
DataSourceProperties: {"topic":"mysql_connect_source-5.bill_of_entry_test_1.customs_declaration","currentKafkaPartitions":"0","brokerList":"192.168.0.221:9092"}
CustomProperties: {"kafka_default_offsets":"OFFSET_BEGINNING","group.id":"customs_declaration_0"}
Statistic: {"receivedBytes":60668262,"runningTxns":[],"errorRows":0,"committedTaskNum":1,"loadedRows":11571,"loadRowsRate":1395,"abortedTaskNum":0,"errorRowsAfterResumed":0,"totalRows":11571,"unselectedRows":0,"receivedBytesRate":7315598,"taskExecuteTimeMs":8293}
Progress: {"0":"11570"}
Lag: {"0":0}
ReasonOfStateChanged:
ErrorLogUrls:
OtherMsg:
User: root
Comment:
ErrorLogUrls里面没有消息,就没有报错。
4.5、查看目标表数据
mysql> select count(*) from test.customs_declaration;
+----------+
| count(*) |
+----------+
| 23085 |
+----------+
1 row in set (0.01 sec)
5、实践总结
从4月份开始接触debezium,断断续续的研究,近两周开始结合Doris消费Kafka的功能研究,总算看到了一点成效,也更深入理解debezium的功能和各个配置参数的含义,总体感觉debezium确实挺不错的,作为Kafka的connect,实现数据实时同步,可以作为异地备份的方案,也可以作为数仓的实施方案,挺好的,把研究经验进行总结并分享给大家,希望和大家一起交流学习,共同成长!




