暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

debezium捕获MySQL数据推送到kafka并由doris直接消费的实践笔记

原创 数据库管理员陆美芳 2024-06-27
539

1、MySQL源表结构

CREATE TABLE `customs_declaration` (
  `id` bigint NOT NULL COMMENT '报关单ID',
  `import_export_flag` varchar(1) CHARACTER SET utf8mb4 COLLATE utf8mb4_bin DEFAULT NULL COMMENT '进出口标志;I:进口;E:出口',
  `unified_number` varchar(18) CHARACTER SET utf8mb4 COLLATE utf8mb4_bin DEFAULT NULL COMMENT '统一编号',
  `business_type` varchar(3) CHARACTER SET utf8mb4 COLLATE utf8mb4_bin DEFAULT NULL COMMENT '业务类型',
  `declaration_no` varchar(36) CHARACTER SET utf8mb4 COLLATE utf8mb4_bin DEFAULT NULL COMMENT '申报单号',
  `custom_master_code` varchar(100) CHARACTER SET utf8mb4 COLLATE utf8mb4_bin DEFAULT NULL COMMENT '申报地海关',
  `entry_exit_customs_code` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_bin DEFAULT NULL COMMENT '出入境关别代码',
  `entry_exit_port_code` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_bin DEFAULT NULL COMMENT '进境口岸/离境口岸代码',
  `record_no` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_bin DEFAULT NULL COMMENT '备案号',
  `contract_no` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_bin DEFAULT NULL COMMENT '合同协议号',
  `entry_exit_date` date DEFAULT NULL COMMENT '出入境日期',
  `declare_date` datetime DEFAULT NULL COMMENT '申报日期',
  `domestic_credit_code` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_bin DEFAULT NULL COMMENT '境内收发货人-社会信用代码',
  `domestic_customs_code` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_bin DEFAULT NULL COMMENT '境内收发货人-海关代码',
  `domestic_quarantine_no` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_bin DEFAULT NULL COMMENT '境内收发货人-检验检疫编码',
  `domestic_enterprise_name` varchar(200) CHARACTER SET utf8mb4 COLLATE utf8mb4_bin DEFAULT NULL COMMENT '境内收发货人-企业名称',
  `declare_enterprise_name` varchar(200) CHARACTER SET utf8mb4 COLLATE utf8mb4_bin DEFAULT NULL COMMENT '申报单位--企业名称',
  `transport_type_code` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_bin DEFAULT NULL COMMENT '运输方式',
  `transport_tools` varchar(150) CHARACTER SET utf8mb4 COLLATE utf8mb4_bin DEFAULT NULL COMMENT '运输工具名称',
  `voyage_no` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_bin DEFAULT NULL COMMENT '航次号',
  `take_voyage_no` varchar(32) CHARACTER SET utf8mb4 COLLATE utf8mb4_bin DEFAULT NULL COMMENT '提运单号',
  `supervise_type_code` varchar(4) CHARACTER SET utf8mb4 COLLATE utf8mb4_bin DEFAULT NULL COMMENT '监管方式',
  `exempt_nature_code` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_bin DEFAULT NULL COMMENT '征免性质',
  `licence_no` varchar(20) CHARACTER SET utf8mb4 COLLATE utf8mb4_bin DEFAULT NULL COMMENT '许可证号',
  `arrival_country_regions_code` varchar(3) CHARACTER SET utf8mb4 COLLATE utf8mb4_bin DEFAULT NULL COMMENT '运抵国 启运国',
  `departure_or_arrival_port_code` varchar(6) CHARACTER SET utf8mb4 COLLATE utf8mb4_bin DEFAULT NULL COMMENT '经停港/指运港',
  `deal_type_code` tinyint DEFAULT NULL COMMENT '成交方式',
  `trade_country_regions_code` varchar(3) CHARACTER SET utf8mb4 COLLATE utf8mb4_bin DEFAULT NULL COMMENT '贸易国别(地区)',
  `wrap_count` int DEFAULT NULL COMMENT '件数',
  `wrap_type_code` varchar(2) CHARACTER SET utf8mb4 COLLATE utf8mb4_bin DEFAULT NULL COMMENT '包装种类',
  `container_count` int DEFAULT NULL COMMENT '集装箱数',
  `item_count` int DEFAULT NULL COMMENT '商品项数',
  `document_type` varchar(4) CHARACTER SET utf8mb4 COLLATE utf8mb4_bin DEFAULT NULL COMMENT '单据类型',
  `customs_type` varchar(1) CHARACTER SET utf8mb4 COLLATE utf8mb4_bin DEFAULT NULL COMMENT '报关单类型',
  `relation_type` tinyint DEFAULT NULL COMMENT '报关/转关关系标志',
  `creation_method` int DEFAULT NULL COMMENT '报关单创建方式',
  `tax_paperless` tinyint DEFAULT NULL COMMENT '税单无纸化标志',
  `self_declaration` tinyint DEFAULT NULL COMMENT '自主报税标志',
  `water_transit` tinyint DEFAULT NULL COMMENT '水运中转标志',
  `self_declaration_pay` tinyint DEFAULT NULL COMMENT '自报自缴标志',
  `vouch_flag` tinyint DEFAULT NULL COMMENT '担保验放标志',
  `overseas_platform` tinyint DEFAULT NULL COMMENT '跨境电商海外平台标志',
  `special_channel` tinyint DEFAULT NULL COMMENT '特殊通道标志',
  `tax_flag` tinyint DEFAULT NULL COMMENT '税收征管标记',
  `foreign_verification_no` varchar(32) CHARACTER SET utf8mb4 COLLATE utf8mb4_bin DEFAULT NULL COMMENT '外汇核销单号',
  `source` int DEFAULT NULL COMMENT '报关单来源',
  `creator` bigint DEFAULT NULL COMMENT '创建人ID',
  `created_time` datetime DEFAULT NULL COMMENT '创建时间',
  `modifier` bigint DEFAULT NULL COMMENT '修改人ID',
  `modify_time` datetime DEFAULT NULL COMMENT '修改时间',
  `status` varchar(6) CHARACTER SET utf8mb4 COLLATE utf8mb4_bin DEFAULT NULL COMMENT '最后一次回执状态仅作冗余,以回执表为准',
  `status_explain` varchar(100) CHARACTER SET utf8mb4 COLLATE utf8mb4_bin DEFAULT NULL COMMENT '最后一次回执说明仅作冗余,以回执表为准',
  `data_query_status` varchar(500) CHARACTER SET utf8mb4 COLLATE utf8mb4_bin DEFAULT NULL COMMENT '电子口岸-数据状态',
  `date_dec` datetime DEFAULT NULL COMMENT '电子口岸-发送时间',
  `electronic_unified_number` varchar(18) CHARACTER SET utf8mb4 COLLATE utf8mb4_bin DEFAULT NULL COMMENT '电子口岸统一编号',
  `cus_dec_status` varchar(6) CHARACTER SET utf8mb4 COLLATE utf8mb4_bin DEFAULT NULL COMMENT '报关单状态(单一拉回来的状态)',
  PRIMARY KEY (`id`),
  UNIQUE KEY `uk_declaration_no_customs_declaration` (`declaration_no`) USING BTREE,
  UNIQUE KEY `uk_unified_number_customs_declaration` (`unified_number`) USING BTREE,
  KEY `idx_entry_exit_date_customs_declaration` (`entry_exit_date`),
  KEY `idx_domestic_credit_code_customs_declaration` (`domestic_credit_code`),
  KEY `idx_domestic_enterprise_name_customs_declaration` (`domestic_enterprise_name`),
  KEY `idx_created_time_customs_declaration` (`created_time`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin COMMENT='报关单';

2、MySQL的日期类型自定义转换器

点击下载自定义转换器源码
下载后把源码打包成 jar包:debezium-extension-2.6.0.Final.jar
然后上传jar包到容器目录中:
docker cp debezium-extension-2.6.0.Final.jar debezium-connect:/kafka/connect/debezium-connector-mysql/
最后重启容器:
docker-compose restart

3、注册MySQL的源连接器

3.1、编辑json文件

[root@192 source-connect]# cat mysql-connect-source-5.json
{
  "name": "mysql-connect-source-5",
  "config": {
    "connector.class": "io.debezium.connector.mysql.MySqlConnector",
    "database.user": "数据库账号",
    "database.hostname": "数据库IP",
    "database.password": "数据库密码",
    "database.port": "数据库端口",
    "database.server.id": "119009005",
    "schema.history.internal.kafka.bootstrap.servers": "KafkaIP地址:9092",
    "schema.history.internal.kafka.topic": "schemahistory.mysql_connect_source-5",
    "heartbeat.interval.ms": "100",
    "topic.prefix": "mysql_connect_source-5",
    "snapshot.mode": "initial",
    "database.include.list": "bill_of_entry_test_0,bill_of_entry_test_1",
    "table.include.list": "bill_of_entry_test_0.customs_declaration,bill_of_entry_test_1.customs_declaration",
    "tombstones.on.delete": "true",
    "decimal.handling.mode": "string",
    "transforms": "unwrap",
    "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
    "transforms.unwrap.drop.tombstones": "false",
    "transforms.unwrap.delete.handling.mode": "rewrite",
    "converters":"datetime",
    "datetime.type":"com.darcytech.debezium.converter.MySqlDateTimeConverter",
    "datetime.format.date":"yyyy-MM-dd",
    "datetime.format.time":"HH:mm:ss",
    "datetime.format.datetime":"yyyy-MM-dd HH:mm:ss",
    "datetime.format.timestamp":"yyyy-MM-dd HH:mm:ss",
    "datetime.format.timestamp.zone":"UTC+8"
  }
}

3.2、执行注册命令

curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" debezium的部署IP:8083/connectors/ -d @mysql-connect-source-5.json

请注意修改【debezium的部署IP】为实际的IP。

3.3、查看连接器状态

[root@192 source-connect]# curl -s debezium的部署IP:8083/connectors/mysql-connect-source-5/status | jq  
{
  "name": "mysql-connect-source-5",
  "connector": {
    "state": "RUNNING",
    "worker_id": "175.66.30.2:8083"
  },
  "tasks": [
    {
      "id": 0,
      "state": "RUNNING",
      "worker_id": "175.66.30.2:8083"
    }
  ],
  "type": "source"
}

如上看到tasks的状态都是RUNNING就表示正常。

3.4、观察Kafka的消息格式

{
   "schema": {
      "type": "struct",
      "fields": [
         {
            "type": "int64",
            "optional": false,
            "field": "id"
         },
         {
            "type": "string",
            "optional": true,
            "field": "import_export_flag"
         },
         {
            "type": "string",
            "optional": true,
            "field": "unified_number"
         },
         {
            "type": "string",
            "optional": true,
            "field": "business_type"
         },
         {
            "type": "string",
            "optional": true,
            "field": "declaration_no"
         },
         {
            "type": "string",
            "optional": true,
            "field": "custom_master_code"
         },
         {
            "type": "string",
            "optional": true,
            "field": "entry_exit_customs_code"
         },
         {
            "type": "string",
            "optional": true,
            "field": "entry_exit_port_code"
         },
         {
            "type": "string",
            "optional": true,
            "field": "record_no"
         },
         {
            "type": "string",
            "optional": true,
            "field": "contract_no"
         },
         {
            "type": "string",
            "optional": true,
            "name": "com.darcytech.debezium.date.string",
            "field": "entry_exit_date"
         },
         {
            "type": "string",
            "optional": true,
            "name": "com.darcytech.debezium.datetime.string",
            "field": "declare_date"
         },
         {
            "type": "string",
            "optional": true,
            "field": "domestic_credit_code"
         },
         {
            "type": "string",
            "optional": true,
            "field": "domestic_customs_code"
         },
         {
            "type": "string",
            "optional": true,
            "field": "domestic_quarantine_no"
         },
         {
            "type": "string",
            "optional": true,
            "field": "domestic_enterprise_name"
         },
         {
            "type": "string",
            "optional": true,
            "field": "declare_enterprise_name"
         },
         {
            "type": "string",
            "optional": true,
            "field": "transport_type_code"
         },
         {
            "type": "string",
            "optional": true,
            "field": "transport_tools"
         },
         {
            "type": "string",
            "optional": true,
            "field": "voyage_no"
         },
         {
            "type": "string",
            "optional": true,
            "field": "take_voyage_no"
         },
         {
            "type": "string",
            "optional": true,
            "field": "supervise_type_code"
         },
         {
            "type": "string",
            "optional": true,
            "field": "exempt_nature_code"
         },
         {
            "type": "string",
            "optional": true,
            "field": "licence_no"
         },
         {
            "type": "string",
            "optional": true,
            "field": "arrival_country_regions_code"
         },
         {
            "type": "string",
            "optional": true,
            "field": "departure_or_arrival_port_code"
         },
         {
            "type": "int16",
            "optional": true,
            "field": "deal_type_code"
         },
         {
            "type": "string",
            "optional": true,
            "field": "trade_country_regions_code"
         },
         {
            "type": "int32",
            "optional": true,
            "field": "wrap_count"
         },
         {
            "type": "string",
            "optional": true,
            "field": "wrap_type_code"
         },
         {
            "type": "int32",
            "optional": true,
            "field": "container_count"
         },
         {
            "type": "int32",
            "optional": true,
            "field": "item_count"
         },
         {
            "type": "string",
            "optional": true,
            "field": "document_type"
         },
         {
            "type": "string",
            "optional": true,
            "field": "customs_type"
         },
         {
            "type": "int16",
            "optional": true,
            "field": "relation_type"
         },
         {
            "type": "int32",
            "optional": true,
            "field": "creation_method"
         },
         {
            "type": "int16",
            "optional": true,
            "field": "tax_paperless"
         },
         {
            "type": "int16",
            "optional": true,
            "field": "self_declaration"
         },
         {
            "type": "int16",
            "optional": true,
            "field": "water_transit"
         },
         {
            "type": "int16",
            "optional": true,
            "field": "self_declaration_pay"
         },
         {
            "type": "int16",
            "optional": true,
            "field": "vouch_flag"
         },
         {
            "type": "int16",
            "optional": true,
            "field": "overseas_platform"
         },
         {
            "type": "int16",
            "optional": true,
            "field": "special_channel"
         },
         {
            "type": "int16",
            "optional": true,
            "field": "tax_flag"
         },
         {
            "type": "string",
            "optional": true,
            "field": "foreign_verification_no"
         },
         {
            "type": "int32",
            "optional": true,
            "field": "source"
         },
         {
            "type": "int64",
            "optional": true,
            "field": "creator"
         },
         {
            "type": "string",
            "optional": true,
            "name": "com.darcytech.debezium.datetime.string",
            "field": "created_time"
         },
         {
            "type": "int64",
            "optional": true,
            "field": "modifier"
         },
         {
            "type": "string",
            "optional": true,
            "name": "com.darcytech.debezium.datetime.string",
            "field": "modify_time"
         },
         {
            "type": "string",
            "optional": true,
            "field": "status"
         },
         {
            "type": "string",
            "optional": true,
            "field": "status_explain"
         },
         {
            "type": "string",
            "optional": true,
            "field": "data_query_status"
         },
         {
            "type": "string",
            "optional": true,
            "name": "com.darcytech.debezium.datetime.string",
            "field": "date_dec"
         },
         {
            "type": "string",
            "optional": true,
            "field": "electronic_unified_number"
         },
         {
            "type": "string",
            "optional": true,
            "field": "cus_dec_status"
         },
         {
            "type": "string",
            "optional": true,
            "field": "__deleted"
         }
      ],
      "optional": false,
      "name": "mysql_connect_source-5.bill_of_entry_test_0.customs_declaration.Value"
   },
   "payload": {
      "id": 3006719137197064000,
      "import_export_flag": "E",
      "unified_number": "E20210000593246977",
      "business_type": null,
      "declaration_no": "530420210040339743",
      "custom_master_code": "5304",
      "entry_exit_customs_code": "5304",
      "entry_exit_port_code": "470101",
      "record_no": "",
      "contract_no": "XS3-11-202103008",
      "entry_exit_date": "2021-03-17",
      "declare_date": "2021-03-13 00:00:00",
      "domestic_credit_code": "数据已脱敏",
      "domestic_customs_code": "数据已脱敏",
      "domestic_quarantine_no": "",
      "domestic_enterprise_name": "数据已脱敏",
      "declare_enterprise_name": "数据已脱敏",
      "transport_type_code": "2",
      "transport_tools": "UN9767998",
      "voyage_no": "0MH7VE1MA",
      "take_voyage_no": "EE32103122817",
      "supervise_type_code": "0110",
      "exempt_nature_code": "101",
      "licence_no": "",
      "arrival_country_regions_code": "MEX",
      "departure_or_arrival_port_code": "MEX039",
      "deal_type_code": 3,
      "trade_country_regions_code": "HKG",
      "wrap_count": 2080,
      "wrap_type_code": "22",
      "container_count": 2,
      "item_count": 1,
      "document_type": null,
      "customs_type": "M",
      "relation_type": 0,
      "creation_method": 4,
      "tax_paperless": null,
      "self_declaration": null,
      "water_transit": null,
      "self_declaration_pay": null,
      "vouch_flag": null,
      "overseas_platform": null,
      "special_channel": null,
      "tax_flag": null,
      "foreign_verification_no": null,
      "source": 1,
      "creator": 2859727746966257700,
      "created_time": "2024-06-14 13:46:33",
      "modifier": 2859727746966257700,
      "modify_time": "2024-06-14 13:46:33",
      "status": null,
      "status_explain": null,
      "data_query_status": null,
      "date_dec": null,
      "electronic_unified_number": null,
      "cus_dec_status": "10",
      "__deleted": "false"
   }
}

观察这个消息格式,发现数据部分不是debezium的默认方式,默认方式在payload里面分before和after两个部分,另外通过op字段标识数据操作类型,而这里我简化了,因为我使用了转换配置,就是下面这几行的作用:
“transforms”: “unwrap”,
“transforms.unwrap.type”: “io.debezium.transforms.ExtractNewRecordState”,
“transforms.unwrap.drop.tombstones”: “false”,
“transforms.unwrap.delete.handling.mode”: “rewrite”,
另外,日期字段也被转换成字符串,比如"created_time": “2024-06-14 13:46:33”,是因为用了自定义转换器,也就是下面这几行:
“converters”:“datetime”,
“datetime.type”:“com.darcytech.debezium.converter.MySqlDateTimeConverter”,
“datetime.format.date”:“yyyy-MM-dd”,
“datetime.format.time”:“HH:mm:ss”,
“datetime.format.datetime”:“yyyy-MM-dd HH:mm:ss”,
“datetime.format.timestamp”:“yyyy-MM-dd HH:mm:ss”,
“datetime.format.timestamp.zone”:“UTC+8”

4、Doris目标端

4.1、Doris的安装

安装直接参考官网快速体验单机安装方式

4.2、创建Doris目标表

Doris部署好后,就可以创建test库,然后在test库下创建表,本次实验采用的是唯一模型建表,脚本如下:

use test
CREATE TABLE IF NOT EXISTS `customs_declaration` (  
  `id` BIGINT NOT NULL COMMENT '报关单ID',
  `declaration_no` varchar(36)  COMMENT '申报单号',
  `unified_number` varchar(18)  COMMENT '统一编号',
  `declare_date` datetime COMMENT '申报日期',
  `import_export_flag` varchar(1)  COMMENT '进出口标志;I:进口;E:出口',  
  `business_type` varchar(3)  COMMENT '业务类型',
  `custom_master_code` varchar(100)  COMMENT '申报地海关',
  `entry_exit_customs_code` varchar(50)  COMMENT '出入境关别代码',
  `entry_exit_port_code` varchar(50)  COMMENT '进境口岸/离境口岸代码',
  `record_no` varchar(50)  COMMENT '备案号',
  `contract_no` varchar(50)  COMMENT '合同协议号',
  `entry_exit_date` date  COMMENT '出入境日期',
  `domestic_credit_code` varchar(50)  COMMENT '境内收发货人-社会信用代码',
  `domestic_customs_code` varchar(50)  COMMENT '境内收发货人-海关代码',
  `domestic_quarantine_no` varchar(50)  COMMENT '境内收发货人-检验检疫编码',
  `domestic_enterprise_name` varchar(200)  COMMENT '境内收发货人-企业名称',
  `declare_enterprise_name` varchar(200)  COMMENT '申报单位--企业名称',
  `transport_type_code` varchar(50)  COMMENT '运输方式',
  `transport_tools` varchar(150)  COMMENT '运输工具名称',
  `voyage_no` varchar(50)  COMMENT '航次号',
  `take_voyage_no` varchar(32)  COMMENT '提运单号',
  `supervise_type_code` varchar(4)  COMMENT '监管方式',
  `exempt_nature_code` varchar(50)  COMMENT '征免性质',
  `licence_no` varchar(20)  COMMENT '许可证号',
  `arrival_country_regions_code` varchar(3)  COMMENT '运抵国 启运国',
  `departure_or_arrival_port_code` varchar(6)  COMMENT '经停港/指运港',
  `deal_type_code` tinyint  COMMENT '成交方式',
  `trade_country_regions_code` varchar(3)  COMMENT '贸易国别(地区)',
  `wrap_count` int  COMMENT '件数',
  `wrap_type_code` varchar(10)  COMMENT '包装种类',
  `container_count` int  COMMENT '集装箱数',
  `item_count` int  COMMENT '商品项数',
  `document_type` varchar(4)  COMMENT '单据类型',
  `customs_type` varchar(1)  COMMENT '报关单类型',
  `relation_type` tinyint  COMMENT '报关/转关关系标志',
  `creation_method` int  COMMENT '报关单创建方式',
  `tax_paperless` tinyint  COMMENT '税单无纸化标志',
  `self_declaration` tinyint  COMMENT '自主报税标志',
  `water_transit` tinyint  COMMENT '水运中转标志',
  `self_declaration_pay` tinyint  COMMENT '自报自缴标志',
  `vouch_flag` tinyint  COMMENT '担保验放标志',
  `overseas_platform` tinyint  COMMENT '跨境电商海外平台标志',
  `special_channel` tinyint  COMMENT '特殊通道标志',
  `tax_flag` tinyint  COMMENT '税收征管标记',
  `foreign_verification_no` varchar(32)  COMMENT '外汇核销单号',
  `source` int  COMMENT '报关单来源',
  `creator` bigint  COMMENT '创建人ID',
  `created_time` datetime  COMMENT '创建时间',
  `modifier` bigint  COMMENT '修改人ID',
  `modify_time` datetime  COMMENT '修改时间',
  `status` varchar(6)  COMMENT '最后一次回执状态仅作冗余,以回执表为准',
  `status_explain` varchar(100)  COMMENT '最后一次回执说明仅作冗余,以回执表为准',
  `data_query_status` varchar(500)  COMMENT '电子口岸-数据状态',
  `date_dec` datetime  COMMENT '电子口岸-发送时间',
  `electronic_unified_number` varchar(18)  COMMENT '电子口岸统一编号',
  `cus_dec_status` varchar(6)  COMMENT '报关单状态(单一拉回来的状态)',
  `__deleted` varchar(10) COMMENT '数据操作标识'
)
UNIQUE KEY(`id`)
DISTRIBUTED BY HASH(`id`) BUCKETS 1
PROPERTIES (
"replication_allocation" = "tag.location.default: 1"
);

4.3、创建Doris的routine load

根据Doris的官方文档,Doris提供多种数据加载方式,其中routine load就可以实现实时消费Kafka数据的功能,本实验就采用这个方式加载数据,脚本如下:

CREATE ROUTINE LOAD test.rl_job_customs_declaration_1 ON customs_declaration
WITH MERGE
COLUMNS(id,
declaration_no,
unified_number,
declare_date,
import_export_flag,
business_type,
custom_master_code,
entry_exit_customs,
entry_exit_port_code,
record_no,
contract_no,
entry_exit_date,
domestic_credit_code,
domestic_customs_code,
domestic_quarantine_no,
domestic_enterprise_name,
declare_enterprise_name,
transport_type_code,
transport_tools,
voyage_no,
take_voyage_no,
supervise_type_code,
exempt_nature_code,
licence_no,
arrival_country_regions_code,
departure_or_arrival_port_code,
deal_type_code,
trade_country_regions_code,
wrap_count,
wrap_type_code,
container_count,
item_count,
document_type,
customs_type,
relation_type,
creation_method,
tax_paperless,
self_declaration,
water_transit,
self_declaration_pay,
vouch_flag,
overseas_platform,
special_channel,
tax_flag,
foreign_verification_no,
source,
creator,
created_time,
modifier,
modify_time,
status,
status_explain,
data_query_status,
date_dec,
electronic_unified_number,
cus_dec_status,
__deleted),
DELETE ON __deleted ='true'
PROPERTIES(
    "format"="json",
    "jsonpaths"="[\"$.payload.id\",
\"$.payload.declaration_no\",
\"$.payload.unified_number\",
\"$.payload.declare_date\",
\"$.payload.import_export_flag\",
\"$.payload.business_type\",
\"$.payload.custom_master_code\",
\"$.payload.entry_exit_customs_code\",
\"$.payload.entry_exit_port_code\",
\"$.payload.record_no\",
\"$.payload.contract_no\",
\"$.payload.entry_exit_date\",
\"$.payload.domestic_credit_code\",
\"$.payload.domestic_customs_code\",
\"$.payload.domestic_quarantine_no\",
\"$.payload.domestic_enterprise_name\",
\"$.payload.declare_enterprise_name\",
\"$.payload.transport_type_code\",
\"$.payload.transport_tools\",
\"$.payload.voyage_no\",
\"$.payload.take_voyage_no\",
\"$.payload.supervise_type_code\",
\"$.payload.exempt_nature_code\",
\"$.payload.licence_no\",
\"$.payload.arrival_country_regions_code\",
\"$.payload.departure_or_arrival_port_code\",
\"$.payload.deal_type_code\",
\"$.payload.trade_country_regions_code\",
\"$.payload.wrap_count\",
\"$.payload.wrap_type_code\",
\"$.payload.container_count\",
\"$.payload.item_count\",
\"$.payload.document_type\",
\"$.payload.customs_type\",
\"$.payload.relation_type\",
\"$.payload.creation_method\",
\"$.payload.tax_paperless\",
\"$.payload.self_declaration\",
\"$.payload.water_transit\",
\"$.payload.self_declaration_pay\",
\"$.payload.vouch_flag\",
\"$.payload.overseas_platform\",
\"$.payload.special_channel\",
\"$.payload.tax_flag\",
\"$.payload.foreign_verification_no\",
\"$.payload.source\",
\"$.payload.creator\",
\"$.payload.created_time\",
\"$.payload.modifier\",
\"$.payload.modify_time\",
\"$.payload.status\",
\"$.payload.status_explain\",
\"$.payload.data_query_status\",
\"$.payload.date_dec\",
\"$.payload.electronic_unified_number\",
\"$.payload.cus_dec_status\",
\"$.payload.__deleted\"]"
)
FROM KAFKA(
    "kafka_broker_list" = "KafkaIP地址:9092",
	"kafka_topic" = "mysql_connect_source-5.bill_of_entry_test_1.customs_declaration",
	"property.group.id" = "customs_declaration_0",
	"property.kafka_default_offsets" = "OFFSET_BEGINNING"	
);

4.4、查看ROUTINE LOAD任务的状态

mysql> SHOW ROUTINE LOAD FOR test.rl_job_customs_declaration_0\G
*************************** 1. row ***************************
                  Id: 189269
                Name: rl_job_customs_declaration_1
          CreateTime: 2024-06-27 14:36:37
           PauseTime: NULL
             EndTime: NULL
              DbName: test
           TableName: customs_declaration
        IsMultiTable: false
               State: RUNNING
      DataSourceType: KAFKA
      CurrentTaskNum: 1
       JobProperties: {"max_batch_rows":"200000","timezone":"Asia/Shanghai","send_batch_parallelism":"1","load_to_single_tablet":"false","current_concurrent_number":"1","delete":"(`__deleted` = 'true')","partial_columns":"false","merge_type":"MERGE","exec_mem_limit":"2147483648","strict_mode":"false","jsonpaths":"[\"$.payload.id\",\n\"$.payload.declaration_no\",\n\"$.payload.unified_number\",\n\"$.payload.declare_date\",\n\"$.payload.import_export_flag\",\n\"$.payload.business_type\",\n\"$.payload.custom_master_code\",\n\"$.payload.entry_exit_customs_code\",\n\"$.payload.entry_exit_port_code\",\n\"$.payload.record_no\",\n\"$.payload.contract_no\",\n\"$.payload.entry_exit_date\",\n\"$.payload.domestic_credit_code\",\n\"$.payload.domestic_customs_code\",\n\"$.payload.domestic_quarantine_no\",\n\"$.payload.domestic_enterprise_name\",\n\"$.payload.declare_enterprise_name\",\n\"$.payload.transport_type_code\",\n\"$.payload.transport_tools\",\n\"$.payload.voyage_no\",\n\"$.payload.take_voyage_no\",\n\"$.payload.supervise_type_code\",\n\"$.payload.exempt_nature_code\",\n\"$.payload.licence_no\",\n\"$.payload.arrival_country_regions_code\",\n\"$.payload.departure_or_arrival_port_code\",\n\"$.payload.deal_type_code\",\n\"$.payload.trade_country_regions_code\",\n\"$.payload.wrap_count\",\n\"$.payload.wrap_type_code\",\n\"$.payload.container_count\",\n\"$.payload.item_count\",\n\"$.payload.document_type\",\n\"$.payload.customs_type\",\n\"$.payload.relation_type\",\n\"$.payload.creation_method\",\n\"$.payload.tax_paperless\",\n\"$.payload.self_declaration\",\n\"$.payload.water_transit\",\n\"$.payload.self_declaration_pay\",\n\"$.payload.vouch_flag\",\n\"$.payload.overseas_platform\",\n\"$.payload.special_channel\",\n\"$.payload.tax_flag\",\n\"$.payload.foreign_verification_no\",\n\"$.payload.source\",\n\"$.payload.creator\",\n\"$.payload.created_time\",\n\"$.payload.modifier\",\n\"$.payload.modify_time\",\n\"$.payload.status\",\n\"$.payload.status_explain\",\n\"$.payload.data_query_status\",\n\"$.payload.date_dec\",\n\"$.payload.electronic_unified_number\",\n\"$.payload.cus_dec_status\",\n\"$.payload.__deleted\"]","max_batch_interval":"10","max_batch_size":"104857600","fuzzy_parse":"false","partitions":"*","columnToColumnExpr":"id,declaration_no,unified_number,declare_date,import_export_flag,business_type,custom_master_code,entry_exit_customs,entry_exit_port_code,record_no,contract_no,entry_exit_date,domestic_credit_code,domestic_customs_code,domestic_quarantine_no,domestic_enterprise_name,declare_enterprise_name,transport_type_code,transport_tools,voyage_no,take_voyage_no,supervise_type_code,exempt_nature_code,licence_no,arrival_country_regions_code,departure_or_arrival_port_code,deal_type_code,trade_country_regions_code,wrap_count,wrap_type_code,container_count,item_count,document_type,customs_type,relation_type,creation_method,tax_paperless,self_declaration,water_transit,self_declaration_pay,vouch_flag,overseas_platform,special_channel,tax_flag,foreign_verification_no,source,creator,created_time,modifier,modify_time,status,status_explain,data_query_status,date_dec,electronic_unified_number,cus_dec_status,__deleted","whereExpr":"*","desired_concurrent_number":"5","precedingFilter":"*","format":"json","max_error_number":"0","max_filter_ratio":"1.0","json_root":"","strip_outer_array":"false","num_as_string":"false"}
DataSourceProperties: {"topic":"mysql_connect_source-5.bill_of_entry_test_1.customs_declaration","currentKafkaPartitions":"0","brokerList":"192.168.0.221:9092"}
    CustomProperties: {"kafka_default_offsets":"OFFSET_BEGINNING","group.id":"customs_declaration_0"}
           Statistic: {"receivedBytes":60668262,"runningTxns":[],"errorRows":0,"committedTaskNum":1,"loadedRows":11571,"loadRowsRate":1395,"abortedTaskNum":0,"errorRowsAfterResumed":0,"totalRows":11571,"unselectedRows":0,"receivedBytesRate":7315598,"taskExecuteTimeMs":8293}
            Progress: {"0":"11570"}
                 Lag: {"0":0}
ReasonOfStateChanged: 
        ErrorLogUrls: 
            OtherMsg: 
                User: root
             Comment: 

ErrorLogUrls里面没有消息,就没有报错。

4.5、查看目标表数据

mysql> select count(*) from test.customs_declaration;
+----------+
| count(*) |
+----------+
|    23085 |
+----------+
1 row in set (0.01 sec)

5、实践总结

从4月份开始接触debezium,断断续续的研究,近两周开始结合Doris消费Kafka的功能研究,总算看到了一点成效,也更深入理解debezium的功能和各个配置参数的含义,总体感觉debezium确实挺不错的,作为Kafka的connect,实现数据实时同步,可以作为异地备份的方案,也可以作为数仓的实施方案,挺好的,把研究经验进行总结并分享给大家,希望和大家一起交流学习,共同成长!

最后修改时间:2024-06-27 18:29:29
「喜欢这篇文章,您的关注和赞赏是给作者最好的鼓励」
关注作者
【版权声明】本文为墨天轮用户原创内容,转载时必须标注文章的来源(墨天轮),文章链接,文章作者等基本信息,否则作者和墨天轮有权追究责任。如果您发现墨天轮中有涉嫌抄袭或者侵权的内容,欢迎发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论