暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

debezium2kafka2doris

原创 Temi 2023-05-05
387
{
"name": "my02",
"config": {
    "connector.class": "io.debezium.connector.mysql.MySqlConnector",
    "database.hostname": "xx.xx.xx.xx",
    "database.port": "3306",
    "database.user": "xx",
    "database.password": "xx",
    "database.server.id": "8023",
    "database.server.name": "xx",
    "database.whitelist":"xx",
	"table.whitelist":"xx.xx",
    "database.history.kafka.bootstrap.servers": "xx.xx.xx.xx:9092, xx.xx.xx.xx:9092, xx.xx.xx.xx:9092",
    "database.history.kafka.topic": "xx.xx",
    "include.schema.changes": "false",
	"snapshot.locking.mode": "none",
	"snapshot.mode": "schema_only",
	"decimal.handling.mode": "string",
    "database.serverTimezone":"UTC",
    "database.connectionTimeZone":"GMT+8",
    "transforms":"unwrap",
    "transforms.unwrap.type":"io.debezium.transforms.ExtractNewRecordState",
	"transforms.unwrap.delete.handling.mode":"rewrite",
    "converters":"datetime",
    "datetime.type":"com.darcytech.debezium.converter.MySqlDateTimeConverter",
    "datetime.format.date":"yyyy-MM-dd",
    "datetime.format.time":"HH:mm:ss",
    "datetime.format.datetime":"yyyy-MM-dd HH:mm:ss",
    "datetime.format.timestamp":"yyyy-MM-dd HH:mm:ss",
    "datetime.format.timestamp.zone":"UTC+8"
  }
}

CREATE TABLE `audit_log` (
  `id` int(11) NOT NULL COMMENT "",
  `user_id` int(11),
  `user_name` varchar(255),
  `op` varchar(10)
) ENGINE=OLAP
UNIQUE KEY(`id`)
COMMENT "OLAP"
DISTRIBUTED BY HASH(`id`) BUCKETS 2
PROPERTIES (
    "replication_num" = "2"
);

CREATE ROUTINE LOAD archery.auditjob ON  audit_log
WITH MERGE
COLUMNS(id, user_id, user_name,action_time,test_col,__deleted),
DELETE ON __deleted ='true'
PROPERTIES
(
    "desired_concurrent_number"="3",
    "max_batch_interval" = "20",
    "max_batch_rows" = "300000",
    "max_batch_size" = "209715200",
    "strict_mode" = "true",
    "format" = "json",
    "jsonpaths" = "[\"$.id\",\"$.user_id\",\"$.user_name\",\"$.action_time\",\"$.test_col\",\"$.__deleted\"]",
    "strip_outer_array" = "false"
)
FROM KAFKA
(
    "kafka_broker_list" = "xx.xx.xx.xx:9092, xx.xx.xx.xx:9092, xx.xx.xx.xx:9092",
    "kafka_topic" = "xxxxxx",
	"property.group.id" = "xxxx",
    "kafka_partitions" = "0",
    "kafka_offsets" = "0"
);

ALTER ROUTINE LOAD FOR archery.auditjob
PROPERTIES
(
    "strip_outer_array" = "false"
);

ALTER ROUTINE LOAD FOR archery.auditjob
PROPERTIES
(
    "jsonpaths" = "[\"$.after.id\",\"$.after.user_id\",\"$.after.user_name\",\"$.op\"]"
);

SHOW ROUTINE LOAD FOR auditjob\G

PAUSE ROUTINE LOAD FOR archery.auditjob;

RESUME ROUTINE LOAD FOR archery.auditjob;
STOP ROUTINE LOAD FOR archery.auditjob;
「喜欢这篇文章,您的关注和赞赏是给作者最好的鼓励」
关注作者
【版权声明】本文为墨天轮用户原创内容,转载时必须标注文章的来源(墨天轮),文章链接,文章作者等基本信息,否则作者和墨天轮有权追究责任。如果您发现墨天轮中有涉嫌抄袭或者侵权的内容,欢迎发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论