{
"name": "my02",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"database.hostname": "xx.xx.xx.xx",
"database.port": "3306",
"database.user": "xx",
"database.password": "xx",
"database.server.id": "8023",
"database.server.name": "xx",
"database.whitelist":"xx",
"table.whitelist":"xx.xx",
"database.history.kafka.bootstrap.servers": "xx.xx.xx.xx:9092, xx.xx.xx.xx:9092, xx.xx.xx.xx:9092",
"database.history.kafka.topic": "xx.xx",
"include.schema.changes": "false",
"snapshot.locking.mode": "none",
"snapshot.mode": "schema_only",
"decimal.handling.mode": "string",
"database.serverTimezone":"UTC",
"database.connectionTimeZone":"GMT+8",
"transforms":"unwrap",
"transforms.unwrap.type":"io.debezium.transforms.ExtractNewRecordState",
"transforms.unwrap.delete.handling.mode":"rewrite",
"converters":"datetime",
"datetime.type":"com.darcytech.debezium.converter.MySqlDateTimeConverter",
"datetime.format.date":"yyyy-MM-dd",
"datetime.format.time":"HH:mm:ss",
"datetime.format.datetime":"yyyy-MM-dd HH:mm:ss",
"datetime.format.timestamp":"yyyy-MM-dd HH:mm:ss",
"datetime.format.timestamp.zone":"UTC+8"
}
}
CREATE TABLE `audit_log` (
`id` int(11) NOT NULL COMMENT "",
`user_id` int(11),
`user_name` varchar(255),
`op` varchar(10)
) ENGINE=OLAP
UNIQUE KEY(`id`)
COMMENT "OLAP"
DISTRIBUTED BY HASH(`id`) BUCKETS 2
PROPERTIES (
"replication_num" = "2"
);
CREATE ROUTINE LOAD archery.auditjob ON audit_log
WITH MERGE
COLUMNS(id, user_id, user_name,action_time,test_col,__deleted),
DELETE ON __deleted ='true'
PROPERTIES
(
"desired_concurrent_number"="3",
"max_batch_interval" = "20",
"max_batch_rows" = "300000",
"max_batch_size" = "209715200",
"strict_mode" = "true",
"format" = "json",
"jsonpaths" = "[\"$.id\",\"$.user_id\",\"$.user_name\",\"$.action_time\",\"$.test_col\",\"$.__deleted\"]",
"strip_outer_array" = "false"
)
FROM KAFKA
(
"kafka_broker_list" = "xx.xx.xx.xx:9092, xx.xx.xx.xx:9092, xx.xx.xx.xx:9092",
"kafka_topic" = "xxxxxx",
"property.group.id" = "xxxx",
"kafka_partitions" = "0",
"kafka_offsets" = "0"
);
ALTER ROUTINE LOAD FOR archery.auditjob
PROPERTIES
(
"strip_outer_array" = "false"
);
ALTER ROUTINE LOAD FOR archery.auditjob
PROPERTIES
(
"jsonpaths" = "[\"$.after.id\",\"$.after.user_id\",\"$.after.user_name\",\"$.op\"]"
);
SHOW ROUTINE LOAD FOR auditjob\G
PAUSE ROUTINE LOAD FOR archery.auditjob;
RESUME ROUTINE LOAD FOR archery.auditjob;
STOP ROUTINE LOAD FOR archery.auditjob;「喜欢这篇文章,您的关注和赞赏是给作者最好的鼓励」
关注作者
【版权声明】本文为墨天轮用户原创内容,转载时必须标注文章的来源(墨天轮),文章链接,文章作者等基本信息,否则作者和墨天轮有权追究责任。如果您发现墨天轮中有涉嫌抄袭或者侵权的内容,欢迎发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。




