50M# 情况说明
Kafka的消息来着debezium捕获MySQL的日志变化,配置如下:
{
"name": "mysql-connect-source6",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"database.user": "数据库账号",
"database.hostname": "数据库IP",
"database.password": "数据库密码",
"database.port": "3306",
"database.server.id": "119006",
"schema.history.internal.kafka.bootstrap.servers": "Kafka主机IP:9092",
"schema.history.internal.kafka.topic": "schemahistory.mysql_connect_source6",
"heartbeat.interval.ms": "100",
"topic.prefix": "mysql_connect_source6",
"snapshot.mode": "initial",
"database.include.list": "bill_of_entry_test_0,bill_of_entry_test_1",
"table.include.list": "bill_of_entry_test_0.customs_declaration,bill_of_entry_test_1.customs_declaration",
"tombstones.on.delete": "true",
"decimal.handling.mode": "string",
"database.serverTimezone":"UTC",
"database.connectionTimeZone":"GMT+8",
"transforms":"unwrap",
"transforms.unwrap.type":"io.debezium.transforms.ExtractNewRecordState",
"transforms.unwrap.delete.handling.mode":"rewrite",
"transforms": "convertTimezone",
"transforms.convertTimezone.type": "io.debezium.transforms.TimezoneConverter",
"transforms.convertTimezone.converted.timezone": "Asia/Shanghai"
}
}
# DORIS的ROUTIME LOAD的创建脚本如下:
CREATE ROUTINE LOAD test.rl_job_customs_declaration_0 ON customs_declaration
WITH MERGE
COLUMNS(id,
declaration_no,
unified_number,
declare_date,
import_export_flag,
business_type,
custom_master_code,
entry_exit_customs,
entry_exit_port_code,
record_no,
contract_no,
entry_exit_date,
domestic_credit_code,
domestic_customs_code,
domestic_quarantine_no,
domestic_enterprise_name,
declare_enterprise_name,
transport_type_code,
transport_tools,
voyage_no,
take_voyage_no,
supervise_type_code,
exempt_nature_code,
licence_no,
arrival_country_regions_code,
departure_or_arrival_port_code,
deal_type_code,
trade_country_regions_code,
wrap_count,
wrap_type_code,
container_count,
item_count,
document_type,
customs_type,
relation_type,
creation_method,
tax_paperless,
self_declaration,
water_transit,
self_declaration_pay,
vouch_flag,
overseas_platform,
special_channel,
tax_flag,
foreign_verification_no,
source,
creator,
created_time,
modifier,
modify_time,
status,
status_explain,
data_query_status,
date_dec,
electronic_unified_number,
cus_dec_status,
__deleted),
DELETE ON __deleted ='true'
PROPERTIES(
"format"="json",
"jsonpaths"="[\"$.payload.after.id\",
\"$.payload.after.declaration_no\",
\"$.payload.after.unified_number\",
\"$.payload.after.declare_date\",
\"$.payload.after.import_export_flag\",
\"$.payload.after.business_type\",
\"$.payload.after.custom_master_code\",
\"$.payload.after.entry_exit_customs_code\",
\"$.payload.after.entry_exit_port_code\",
\"$.payload.after.record_no\",
\"$.payload.after.contract_no\",
\"$.payload.after.entry_exit_date\",
\"$.payload.after.domestic_credit_code\",
\"$.payload.after.domestic_customs_code\",
\"$.payload.after.domestic_quarantine_no\",
\"$.payload.after.domestic_enterprise_name\",
\"$.payload.after.declare_enterprise_name\",
\"$.payload.after.transport_type_code\",
\"$.payload.after.transport_tools\",
\"$.payload.after.voyage_no\",
\"$.payload.after.take_voyage_no\",
\"$.payload.after.supervise_type_code\",
\"$.payload.after.exempt_nature_code\",
\"$.payload.after.licence_no\",
\"$.payload.after.arrival_country_regions_code\",
\"$.payload.after.departure_or_arrival_port_code\",
\"$.payload.after.deal_type_code\",
\"$.payload.after.trade_country_regions_code\",
\"$.payload.after.wrap_count\",
\"$.payload.after.wrap_type_code\",
\"$.payload.after.container_count\",
\"$.payload.after.item_count\",
\"$.payload.after.document_type\",
\"$.payload.after.customs_type\",
\"$.payload.after.relation_type\",
\"$.payload.after.creation_method\",
\"$.payload.after.tax_paperless\",
\"$.payload.after.self_declaration\",
\"$.payload.after.water_transit\",
\"$.payload.after.self_declaration_pay\",
\"$.payload.after.vouch_flag\",
\"$.payload.after.overseas_platform\",
\"$.payload.after.special_channel\",
\"$.payload.after.tax_flag\",
\"$.payload.after.foreign_verification_no\",
\"$.payload.after.source\",
\"$.payload.after.creator\",
\"$.payload.after.created_time\",
\"$.payload.after.modifier\",
\"$.payload.after.modify_time\",
\"$.payload.after.status\",
\"$.payload.after.status_explain\",
\"$.payload.after.data_query_status\",
\"$.payload.after.date_dec\",
\"$.payload.after.electronic_unified_number\",
\"$.payload.after.cus_dec_status\",
\"$.op\"]",
"strip_outer_array" = "false"
)
FROM KAFKA(
"kafka_broker_list" = "192.168.0.221:9092",
"kafka_topic" = "mysql_connect_source6.bill_of_entry_test_0.customs_declaration",
"property.group.id" = "customs_declaration_0",
"kafka_partitions" = "0",
"kafka_offsets" = "0"
);
# 查看ROUTIME LOAD的状态
mysql> SHOW ROUTINE LOAD FOR test.rl_job_customs_declaration_0\G
*************************** 1. row ***************************
Id: 183628
Name: rl_job_customs_declaration_0
CreateTime: 2024-06-24 11:28:52
PauseTime: 2024-06-24 11:29:00
EndTime: NULL
DbName: test
TableName: customs_declaration
IsMultiTable: false
State: PAUSED
DataSourceType: KAFKA
CurrentTaskNum: 0
JobProperties: {"max_batch_rows":"200000","timezone":"Asia/Shanghai","send_batch_parallelism":"1","load_to_single_tablet":"false","current_concurrent_number":"1","delete":"(`__deleted` = 'true')","partial_columns":"false","merge_type":"MERGE","exec_mem_limit":"2147483648","strict_mode":"false","jsonpaths":"[\"$.payload.after.id\",\n\"$.payload.after.declaration_no\",\n\"$.payload.after.unified_number\",\n\"$.payload.after.declare_date\",\n\"$.payload.after.import_export_flag\",\n\"$.payload.after.business_type\",\n\"$.payload.after.custom_master_code\",\n\"$.payload.after.entry_exit_customs_code\",\n\"$.payload.after.entry_exit_port_code\",\n\"$.payload.after.record_no\",\n\"$.payload.after.contract_no\",\n\"$.payload.after.entry_exit_date\",\n\"$.payload.after.domestic_credit_code\",\n\"$.payload.after.domestic_customs_code\",\n\"$.payload.after.domestic_quarantine_no\",\n\"$.payload.after.domestic_enterprise_name\",\n\"$.payload.after.declare_enterprise_name\",\n\"$.payload.after.transport_type_code\",\n\"$.payload.after.transport_tools\",\n\"$.payload.after.voyage_no\",\n\"$.payload.after.take_voyage_no\",\n\"$.payload.after.supervise_type_code\",\n\"$.payload.after.exempt_nature_code\",\n\"$.payload.after.licence_no\",\n\"$.payload.after.arrival_country_regions_code\",\n\"$.payload.after.departure_or_arrival_port_code\",\n\"$.payload.after.deal_type_code\",\n\"$.payload.after.trade_country_regions_code\",\n\"$.payload.after.wrap_count\",\n\"$.payload.after.wrap_type_code\",\n\"$.payload.after.container_count\",\n\"$.payload.after.item_count\",\n\"$.payload.after.document_type\",\n\"$.payload.after.customs_type\",\n\"$.payload.after.relation_type\",\n\"$.payload.after.creation_method\",\n\"$.payload.after.tax_paperless\",\n\"$.payload.after.self_declaration\",\n\"$.payload.after.water_transit\",\n\"$.payload.after.self_declaration_pay\",\n\"$.payload.after.vouch_flag\",\n\"$.payload.after.overseas_platform\",\n\"$.payload.after.special_channel\",\n\"$.payload.after.tax_flag\",\n\"$.payload.after.foreign_verification_no\",\n\"$.payload.after.source\",\n\"$.payload.after.creator\",\n\"$.payload.after.created_time\",\n\"$.payload.after.modifier\",\n\"$.payload.after.modify_time\",\n\"$.payload.after.status\",\n\"$.payload.after.status_explain\",\n\"$.payload.after.data_query_status\",\n\"$.payload.after.date_dec\",\n\"$.payload.after.electronic_unified_number\",\n\"$.payload.after.cus_dec_status\",\n\"$.op\"]","max_batch_interval":"10","max_batch_size":"104857600","fuzzy_parse":"false","partitions":"*","columnToColumnExpr":"id,declaration_no,unified_number,declare_date,import_export_flag,business_type,custom_master_code,entry_exit_customs,entry_exit_port_code,record_no,contract_no,entry_exit_date,domestic_credit_code,domestic_customs_code,domestic_quarantine_no,domestic_enterprise_name,declare_enterprise_name,transport_type_code,transport_tools,voyage_no,take_voyage_no,supervise_type_code,exempt_nature_code,licence_no,arrival_country_regions_code,departure_or_arrival_port_code,deal_type_code,trade_country_regions_code,wrap_count,wrap_type_code,container_count,item_count,document_type,customs_type,relation_type,creation_method,tax_paperless,self_declaration,water_transit,self_declaration_pay,vouch_flag,overseas_platform,special_channel,tax_flag,foreign_verification_no,source,creator,created_time,modifier,modify_time,status,status_explain,data_query_status,date_dec,electronic_unified_number,cus_dec_status,__deleted","whereExpr":"*","desired_concurrent_number":"5","precedingFilter":"*","format":"json","max_error_number":"0","max_filter_ratio":"1.0","json_root":"","strip_outer_array":"false","num_as_string":"false"}
DataSourceProperties: {"topic":"mysql_connect_source6.bill_of_entry_test_0.customs_declaration","currentKafkaPartitions":"0","brokerList":"192.168.0.221:9092"}
CustomProperties: {"group.id":"customs_declaration_0"}
Statistic: {"receivedBytes":104868168,"runningTxns":[],"errorRows":9488,"committedTaskNum":1,"loadedRows":0,"loadRowsRate":0,"abortedTaskNum":0,"errorRowsAfterResumed":9488,"totalRows":9488,"unselectedRows":0,"receivedBytesRate":13197604,"taskExecuteTimeMs":7946}
Progress: {"0":"9487"}
Lag: {"0":2031}
ReasonOfStateChanged: ErrorReason{code=errCode = 102, msg='current error rows is more than max_error_number or the max_filter_ratio is more than the value set'}
ErrorLogUrls: http://192.168.0.129:8040/api/_load_error_log?file=__shard_520/error_log_insert_stmt_aa167426bdef4cda-8f64655d0ef3de94_aa167426bdef4cda_8f64655d0ef3de94
OtherMsg:
User: root
Comment:
1 row in set (0.00 sec)
# SHOW LOAD WARNINGS ON
输出了很多,任取一行,具体如下:
Reason: column(__DORIS_DELETE_SIGN__) values is null while columns is not nullable. src line [3006719137197064192 530420210040339743 E20210000593246977 1615564800000 E NULL 5304 5304 470101 XS3-11-202103008 18703 9144030032629079XF 4403160DS3 公司名称脱敏限公司 企业名称脱敏有限公司 2 UN9767998 0MH7VE1MA EE32103122817 0110 101 MEX MEX039 3 HKG 2080 22 2 1 NULL M 0 4 NULL NULL NULL NULL NULL NULL NULL NULL NULL 1 2859727746966257666 1718343993000 2859727746966257666 1718343993000 NULL NULL NULL NULL NULL 10 NULL];
# 求解
谁指定这个啥问题,怎么解决呢?
# 疑惑
有个很纳闷的问题,一直没搞明白,我从https://www.modb.pro/db/626837 这里参考,第一次创建ROUTIME LOAD的时候,看Kafka的主题,发现捕获的消息格式,数据那部分 没有before和after,直接就是payload里面,然后有__deleted字段,但是我以前记得是都有before和after,也尝试找配置如何去掉before和after,让捕获的消息格式只有payload,但是怎么也没找到是怎么配置的,才只有payload?谁有经验愿意分享,万分感谢!
墨值悬赏

评论

