暂无图片
Doris通过ROUTIME LOAD消费Kafka遇到的问题
我来答
分享
数据库管理员陆美芳
2024-06-24
Doris通过ROUTIME LOAD消费Kafka遇到的问题

# 情况说明

Kafka的消息来着debezium捕获MySQL的日志变化,配置如下:

{
"name": "mysql-connect-source6",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"database.user": "数据库账号",
"database.hostname": "数据库IP",
"database.password": "数据库密码",
"database.port": "3306",
"database.server.id": "119006",
"schema.history.internal.kafka.bootstrap.servers": "Kafka主机IP:9092",
"schema.history.internal.kafka.topic": "schemahistory.mysql_connect_source6",
"heartbeat.interval.ms": "100",
"topic.prefix": "mysql_connect_source6",
"snapshot.mode": "initial",
"database.include.list": "bill_of_entry_test_0,bill_of_entry_test_1",
"table.include.list": "bill_of_entry_test_0.customs_declaration,bill_of_entry_test_1.customs_declaration",
"tombstones.on.delete": "true",
"decimal.handling.mode": "string",
"database.serverTimezone":"UTC",
"database.connectionTimeZone":"GMT+8",
"transforms":"unwrap",
"transforms.unwrap.type":"io.debezium.transforms.ExtractNewRecordState",
"transforms.unwrap.delete.handling.mode":"rewrite",
"transforms": "convertTimezone",
"transforms.convertTimezone.type": "io.debezium.transforms.TimezoneConverter",
"transforms.convertTimezone.converted.timezone": "Asia/Shanghai"
}
}

# DORIS的ROUTIME LOAD的创建脚本如下:

CREATE ROUTINE LOAD test.rl_job_customs_declaration_0 ON customs_declaration
WITH MERGE
COLUMNS(id,
declaration_no,
unified_number,
declare_date,
import_export_flag,
business_type,
custom_master_code,
entry_exit_customs,
entry_exit_port_code,
record_no,
contract_no,
entry_exit_date,
domestic_credit_code,
domestic_customs_code,
domestic_quarantine_no,
domestic_enterprise_name,
declare_enterprise_name,
transport_type_code,
transport_tools,
voyage_no,
take_voyage_no,
supervise_type_code,
exempt_nature_code,
licence_no,
arrival_country_regions_code,
departure_or_arrival_port_code,
deal_type_code,
trade_country_regions_code,
wrap_count,
wrap_type_code,
container_count,
item_count,
document_type,
customs_type,
relation_type,
creation_method,
tax_paperless,
self_declaration,
water_transit,
self_declaration_pay,
vouch_flag,
overseas_platform,
special_channel,
tax_flag,
foreign_verification_no,
source,
creator,
created_time,
modifier,
modify_time,
status,
status_explain,
data_query_status,
date_dec,
electronic_unified_number,
cus_dec_status,
__deleted),
DELETE ON __deleted ='true'
PROPERTIES(
"format"="json",
"jsonpaths"="[\"$.payload.after.id\",
\"$.payload.after.declaration_no\",
\"$.payload.after.unified_number\",
\"$.payload.after.declare_date\",
\"$.payload.after.import_export_flag\",
\"$.payload.after.business_type\",
\"$.payload.after.custom_master_code\",
\"$.payload.after.entry_exit_customs_code\",
\"$.payload.after.entry_exit_port_code\",
\"$.payload.after.record_no\",
\"$.payload.after.contract_no\",
\"$.payload.after.entry_exit_date\",
\"$.payload.after.domestic_credit_code\",
\"$.payload.after.domestic_customs_code\",
\"$.payload.after.domestic_quarantine_no\",
\"$.payload.after.domestic_enterprise_name\",
\"$.payload.after.declare_enterprise_name\",
\"$.payload.after.transport_type_code\",
\"$.payload.after.transport_tools\",
\"$.payload.after.voyage_no\",
\"$.payload.after.take_voyage_no\",
\"$.payload.after.supervise_type_code\",
\"$.payload.after.exempt_nature_code\",
\"$.payload.after.licence_no\",
\"$.payload.after.arrival_country_regions_code\",
\"$.payload.after.departure_or_arrival_port_code\",
\"$.payload.after.deal_type_code\",
\"$.payload.after.trade_country_regions_code\",
\"$.payload.after.wrap_count\",
\"$.payload.after.wrap_type_code\",
\"$.payload.after.container_count\",
\"$.payload.after.item_count\",
\"$.payload.after.document_type\",
\"$.payload.after.customs_type\",
\"$.payload.after.relation_type\",
\"$.payload.after.creation_method\",
\"$.payload.after.tax_paperless\",
\"$.payload.after.self_declaration\",
\"$.payload.after.water_transit\",
\"$.payload.after.self_declaration_pay\",
\"$.payload.after.vouch_flag\",
\"$.payload.after.overseas_platform\",
\"$.payload.after.special_channel\",
\"$.payload.after.tax_flag\",
\"$.payload.after.foreign_verification_no\",
\"$.payload.after.source\",
\"$.payload.after.creator\",
\"$.payload.after.created_time\",
\"$.payload.after.modifier\",
\"$.payload.after.modify_time\",
\"$.payload.after.status\",
\"$.payload.after.status_explain\",
\"$.payload.after.data_query_status\",
\"$.payload.after.date_dec\",
\"$.payload.after.electronic_unified_number\",
\"$.payload.after.cus_dec_status\",
\"$.op\"]",
"strip_outer_array" = "false"
)
FROM KAFKA(
"kafka_broker_list" = "192.168.0.221:9092",
"kafka_topic" = "mysql_connect_source6.bill_of_entry_test_0.customs_declaration",
"property.group.id" = "customs_declaration_0",
"kafka_partitions" = "0",
"kafka_offsets" = "0"

);

# 查看ROUTIME LOAD的状态

mysql> SHOW ROUTINE LOAD FOR test.rl_job_customs_declaration_0\G
*************************** 1. row ***************************
Id: 183628
Name: rl_job_customs_declaration_0
CreateTime: 2024-06-24 11:28:52
PauseTime: 2024-06-24 11:29:00
EndTime: NULL
DbName: test
TableName: customs_declaration
IsMultiTable: false
State: PAUSED
DataSourceType: KAFKA
CurrentTaskNum: 0
JobProperties: {"max_batch_rows":"200000","timezone":"Asia/Shanghai","send_batch_parallelism":"1","load_to_single_tablet":"false","current_concurrent_number":"1","delete":"(`__deleted` = 'true')","partial_columns":"false","merge_type":"MERGE","exec_mem_limit":"2147483648","strict_mode":"false","jsonpaths":"[\"$.payload.after.id\",\n\"$.payload.after.declaration_no\",\n\"$.payload.after.unified_number\",\n\"$.payload.after.declare_date\",\n\"$.payload.after.import_export_flag\",\n\"$.payload.after.business_type\",\n\"$.payload.after.custom_master_code\",\n\"$.payload.after.entry_exit_customs_code\",\n\"$.payload.after.entry_exit_port_code\",\n\"$.payload.after.record_no\",\n\"$.payload.after.contract_no\",\n\"$.payload.after.entry_exit_date\",\n\"$.payload.after.domestic_credit_code\",\n\"$.payload.after.domestic_customs_code\",\n\"$.payload.after.domestic_quarantine_no\",\n\"$.payload.after.domestic_enterprise_name\",\n\"$.payload.after.declare_enterprise_name\",\n\"$.payload.after.transport_type_code\",\n\"$.payload.after.transport_tools\",\n\"$.payload.after.voyage_no\",\n\"$.payload.after.take_voyage_no\",\n\"$.payload.after.supervise_type_code\",\n\"$.payload.after.exempt_nature_code\",\n\"$.payload.after.licence_no\",\n\"$.payload.after.arrival_country_regions_code\",\n\"$.payload.after.departure_or_arrival_port_code\",\n\"$.payload.after.deal_type_code\",\n\"$.payload.after.trade_country_regions_code\",\n\"$.payload.after.wrap_count\",\n\"$.payload.after.wrap_type_code\",\n\"$.payload.after.container_count\",\n\"$.payload.after.item_count\",\n\"$.payload.after.document_type\",\n\"$.payload.after.customs_type\",\n\"$.payload.after.relation_type\",\n\"$.payload.after.creation_method\",\n\"$.payload.after.tax_paperless\",\n\"$.payload.after.self_declaration\",\n\"$.payload.after.water_transit\",\n\"$.payload.after.self_declaration_pay\",\n\"$.payload.after.vouch_flag\",\n\"$.payload.after.overseas_platform\",\n\"$.payload.after.special_channel\",\n\"$.payload.after.tax_flag\",\n\"$.payload.after.foreign_verification_no\",\n\"$.payload.after.source\",\n\"$.payload.after.creator\",\n\"$.payload.after.created_time\",\n\"$.payload.after.modifier\",\n\"$.payload.after.modify_time\",\n\"$.payload.after.status\",\n\"$.payload.after.status_explain\",\n\"$.payload.after.data_query_status\",\n\"$.payload.after.date_dec\",\n\"$.payload.after.electronic_unified_number\",\n\"$.payload.after.cus_dec_status\",\n\"$.op\"]","max_batch_interval":"10","max_batch_size":"104857600","fuzzy_parse":"false","partitions":"*","columnToColumnExpr":"id,declaration_no,unified_number,declare_date,import_export_flag,business_type,custom_master_code,entry_exit_customs,entry_exit_port_code,record_no,contract_no,entry_exit_date,domestic_credit_code,domestic_customs_code,domestic_quarantine_no,domestic_enterprise_name,declare_enterprise_name,transport_type_code,transport_tools,voyage_no,take_voyage_no,supervise_type_code,exempt_nature_code,licence_no,arrival_country_regions_code,departure_or_arrival_port_code,deal_type_code,trade_country_regions_code,wrap_count,wrap_type_code,container_count,item_count,document_type,customs_type,relation_type,creation_method,tax_paperless,self_declaration,water_transit,self_declaration_pay,vouch_flag,overseas_platform,special_channel,tax_flag,foreign_verification_no,source,creator,created_time,modifier,modify_time,status,status_explain,data_query_status,date_dec,electronic_unified_number,cus_dec_status,__deleted","whereExpr":"*","desired_concurrent_number":"5","precedingFilter":"*","format":"json","max_error_number":"0","max_filter_ratio":"1.0","json_root":"","strip_outer_array":"false","num_as_string":"false"}
DataSourceProperties: {"topic":"mysql_connect_source6.bill_of_entry_test_0.customs_declaration","currentKafkaPartitions":"0","brokerList":"192.168.0.221:9092"}
CustomProperties: {"group.id":"customs_declaration_0"}
Statistic: {"receivedBytes":104868168,"runningTxns":[],"errorRows":9488,"committedTaskNum":1,"loadedRows":0,"loadRowsRate":0,"abortedTaskNum":0,"errorRowsAfterResumed":9488,"totalRows":9488,"unselectedRows":0,"receivedBytesRate":13197604,"taskExecuteTimeMs":7946}
Progress: {"0":"9487"}
Lag: {"0":2031}
ReasonOfStateChanged: ErrorReason{code=errCode = 102, msg='current error rows is more than max_error_number or the max_filter_ratio is more than the value set'}
ErrorLogUrls: http://192.168.0.129:8040/api/_load_error_log?file=__shard_520/error_log_insert_stmt_aa167426bdef4cda-8f64655d0ef3de94_aa167426bdef4cda_8f64655d0ef3de94
OtherMsg:
User: root
Comment:
1 row in set (0.00 sec)

SHOW LOAD WARNINGS ON

输出了很多,任取一行,具体如下:

Reason: column(__DORIS_DELETE_SIGN__) values is null while columns is not nullable. src line [3006719137197064192 530420210040339743 E20210000593246977 1615564800000 E NULL 5304 5304 470101 XS3-11-202103008 18703 9144030032629079XF 4403160DS3 公司名称脱敏限公司 企业名称脱敏有限公司 2 UN9767998 0MH7VE1MA EE32103122817 0110 101 MEX MEX039 3 HKG 2080 22 2 1 NULL M 0 4 NULL NULL NULL NULL NULL NULL NULL NULL NULL 1 2859727746966257666 1718343993000 2859727746966257666 1718343993000 NULL NULL NULL NULL NULL 10 NULL];

# 求解

谁指定这个啥问题,怎么解决呢?

# 疑惑

有个很纳闷的问题,一直没搞明白,我从https://www.modb.pro/db/626837 这里参考,第一次创建ROUTIME LOAD的时候,看Kafka的主题,发现捕获的消息格式,数据那部分 没有before和after,直接就是payload里面,然后有__deleted字段,但是我以前记得是都有before和after,也尝试找配置如何去掉before和after,让捕获的消息格式只有payload,但是怎么也没找到是怎么配置的,才只有payload?谁有经验愿意分享,万分感谢!

 

我来答
添加附件
收藏
分享
问题补充
4条回答
默认
最新
数据库管理员陆美芳

经过2天的研究,不断尝试,终于明白了,一切的都是因为date、datetime这些日期类型被转换成了int且精度到毫秒的问题,即:MySQL datetime类型:使用的转换类是io.debezium.time.Timestamp,最终转成了时间戳(1970年01月01日0时0分0秒到指定日期的毫秒数),形如:1718343993000

MySQL date类型:使用的转换类是io.debezium.time.Date,最终转成了天数(1970年01月01日到指定日期的天数),形如:18703

目前我在目标表上先按转换后的类型存储,即:源端date的字段,目标端是int,源端datetime的字段,目标设计为bigint,查询的时候通过函数转换,比如:

SELECT id,declaration_no ,unified_number ,FROM_UNIXTIME(declare_date/1000) declare_date,

DATE_ADD('1970-01-01', INTERVAL entry_exit_date DAY) AS entry_exit_date,
FROM_UNIXTIME(created_time/1000) created_time,
FROM_UNIXTIME(modify_time/1000) modify_time,
FROM_UNIXTIME(date_dec/1000) date_dec
FROM test.customs_declaration where id=4124546945088217088

这样就可以了,先阶段性这么干。

暂无图片 评论
暂无图片 有用 0
暂无图片
数据库管理员陆美芳

坚持一下,有了一点进步,发现"transforms": "unwrap",

"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms.unwrap.drop.tombstones": "false",
"transforms.unwrap.delete.handling.mode": "rewrite"

这些配置是可以保证捕获的消息格式只保留简单的payload,而不需要before和after;只是我后面加了一个类型,没注意,所以这个参数"transforms": "unwrap",后面还有"transforms": "convertTimezone",这样就不起作用了;改成"transforms": "unwrap,convertTimezone"就好了。但是还存在一个问题:debezium捕获的数据推送给Kafka后,数据类型会有转换,比如查看到Kafka可视化主题中有"creator": 2859727746966257700,

      "created_time": 1718372793000,
      "modifier": 2859727746966257700,
      "modify_time": 1718372793000,
      "status": null,
      "status_explain": null,
      "data_query_status": null,
      "date_dec": null,
      "electronic_unified_number": null,
      "cus_dec_status": "10",
      "__deleted": "false"

但是,写入Doris就是NULL。还需要继续研究。

暂无图片 评论
暂无图片 有用 0
吾亦可往

根据你提供的信息,Doris 在消费 Kafka 消息时遇到了问题。ROUTIME LOAD 任务的状态为 PAUSED,并且有错误发生。错误原因是当前错误行数超过了 max_error_number 或 max_filter_ratio 超过了设置的值。

你可以通过以下步骤解决这个问题:

  1. 检查 max_error_number 和 max_filter_ratio 的设置,确保它们的值合理。
  2. 检查 Kafka 消息的内容,确保没有错误或异常的数据。
  3. 检查 Doris 的配置,确保与 Kafka 的连接和数据处理设置正确。
  4. 查看错误日志,了解更多关于错误的详细信息。


如果问题仍然存在,你可能需要进一步调试和排查,或者提供更多的上下文信息以便能够更准确地帮助你解决问题。

暂无图片 评论
暂无图片 有用 0
数据库管理员陆美芳
题主
2024-06-25
报错,其实是因为我创建ROUTIME LOAD用了WITH MERGE,不用这个方式,用默认的就不报错了。只是目前还剩一个问题,就是debezium推送给Kafka的日期字段datetime类型的,会转成int64,目前还不知道怎么在写入Doris的时候从int64再转回datetime。
数据库管理员陆美芳

MySQL的datetime类型的问题,最终邀请了java开发的同事帮忙改了下debezium-datetime-converter-master的代码,最终解决了!!!源端和目标端结果一模一样的同步没问题了,开心!

暂无图片 评论
暂无图片 有用 0
回答交流
提交
问题信息
请登录之后查看
邀请回答
暂无人订阅该标签,敬请期待~~
暂无图片墨值悬赏