
1、技术调研和需求说明
我有2个业务系统,其中一个业务系统早期版本的数据都存储在mongodb中,新版本存储在MySQL中,现在想要为客户提供数据分析的服务。数据分析平台可以直连数据库获取数据,也可以通过接口获取数据,但是,不管通过何种方式进行数据分析,都不可能直接拉取业务系统的数据进行查询分析,于是考虑搭建实时数仓,进行OLTP和OLAP的业务隔离,同时实现不同数据源同步到数仓中【目前我选择MySQL,因为数据量不算大,且初期需求不多】。经过几番搜索和了解,最终和同事商量确认,采用debezium这个中间件,因为我们本身已经部署有kafka,采用的都是deocker容器部署的方式,刚好debezium提供官方容器,2.5版本以上还支持sink connect(也就是debezium JDBC connector),可以直接部署配置,实现捕获源端MySQL的binlog日志的变化,推送到kafka,然后配置接收器连接器消费kafka的消息,实现数据实时同步的目的。
2、debezium的部署
(1) kafka 使用现有环境,zookeeper自然也是现有的,因为kafka的使用必须依赖zookeeper。
(2)debezium需要部署2个部分,一个是debezium-connect,一个是debezium-ui
2.1、 debezium-connect
我采用docker-compose来部署debezium,关于docker-compose的安装这里不介绍,可以网上搜索。因为我的环境已经部署好docker-compose,所以直接从docker-compose部署debezium开始。
2.1.1、创建目录并授权
-- 安装目录
mkdir -p /opt/dockerfile/debezium-connect
-- 存储目录
mkdir -p /opt/dockerstore/debezium-connect
-- 授权
chown -R 1001:1001 /opt/dockerfile/debezium-connect
chown -R 1001:1001 /opt/dockerstore/debezium-connect
2.1.2、编辑docker-compose.yaml文件
文件内容如下,编辑好保存:
version: '3.7'
services:
debezium-connect:
privileged: true
container_name: debezium-connect
image: debezium/connect:2.5
restart: always
ports:
- 8083:8083
environment:
- BOOTSTRAP_SERVERS=192.168.0.221:9092
- GROUP_ID=debezium
# connector相关信息持久化到kafka指定的topic
- CONFIG_STORAGE_TOPIC=debezium_connect_configs_1
# 存储连接器偏移量的Kafka主题的名称
- OFFSET_STORAGE_TOPIC=debezium_connect_offsets_1
# 存储连接器和任务状态的Kafka主题的名称
- STATUS_STORAGE_TOPIC=debezium_connect_statuses_1
- ENABLE_DEBEZIUM_KC_REST_EXTENSION=true
- ENABLE_DEBEZIUM_SCRIPTING=true
volumes:
- "/usr/share/zoneinfo/Asia/Shanghai:/etc/localtime"
- "/opt/dockerstore/debezium-connect/config/:/kafka/config"
- /opt/dockerstore/debezium-connect/data/:/kafka/data
- /opt/dockerstore/debezium-connect/logs/:/kafka/logs
2.1.3、构建容器
在 /opt/dockerfile/debezium-connect目录下,执行下面脚本进行容器构建:
docker-compose up -d --build
执行命令后会看到下载镜像的进度,等待安装完成即可。
2.1.4、启动容器
docker start debezium-connect
2.1.5、查看运行中的容器
docker ps
执行如上命令可以看到如下内容,其中容器名称为 debezium-connect的容器就是刚部署好的,还有debezium-ui,也是部署好的,部署过程根debezium-connect基本一致,只是配置文件不同,这里不做过多赘述,仅提供debezium-ui的docker-compose.yaml文件内容,供部署参考。

到这里,debezium-connect的部署就完成了!下面开始测试数据同步的情况。
2.2、debezium-ui
2.2.1、编辑docker-compose.yaml
version: '3.7'
services:
debezium-ui:
privileged: true
container_name: debezium-ui
image: debezium/debezium-ui:2.5
ports:
- "8080:8080"
restart: always
environment:
- KAFKA_CONNECT_URIS=http://192.168.51.72:8083
# 时区上海
请注意把KAFKA_CONNECT_URIS的值改成你的debezium-connect的IP和端口。
2.2.2、debezium-ui的访问
容器构建过程参考debezium-connect即可,构建成功后,即可通过ip:端口链接访问,按照我的配置,我的访问地址为:http://192.168.51.72:8080/,在浏览器输入访问地址,可以看到如下界面:

图中第二个连接器就是本次研究注册的源连接器mysql-test-connect。
3、debezium的使用
3.1、注册源连接器
3.1.1、编辑配置
编写json配置文件,文件名为:mysql-test-connect.json,内容如下:
{
"name": "mysql-test-connect",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"database.user": "root",
"database.hostname": "192.168.0.119",
"database.password": "这里写你数据库的密码",
"database.port": "3306",
"database.server.id": "11903",
"schema.history.internal.kafka.bootstrap.servers": "192.168.0.221:9092",
"schema.history.internal.kafka.topic": "schemahistory.mysql_test_connect",
"heartbeat.interval.ms": "100",
"include.schema.changes": "true",
"include.schema.comments": "true",
"topic.prefix": "mysql_test_connect",
"snapshot.mode": "initial",
"database.include.list": "test",
"transforms": "route",
"transforms.route.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.route.regex": "([^.]+)\\.([^.]+)\\.([^.]+)",
"transforms.route.replacement": "$3",
"message.key.columns": "test.(.*):id",
"tombstones.on.delete": "true"
}
}
参数的意思建议阅读官网说明:
debeziumd MySQL源连接器配置参数官网说明
主要是源库相关的地址端口、账号密码等信息,以及我要捕获的数据库的名称,配置在database.include.list中,我这里就是test这个库,另外要关注的就是transforms开头的配置了,意思就是通过正则表达式获取库下面的所有表,并以表名为主题的名称推送到kafka。而参数message.key.columns是考虑到有可能源库某些表没有主键,要定义以什么字段进行数据连接的配置。
3.1.2、执行命令注册源连接器
上面文件编写完成保存,然后执行如下命令注册连接器:
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" 192.168.51.72:8083/connectors/ -d @mysql-test-connect.json
通过如下命令,查看连接器列表:
curl -s 192.168.51.72:8083/connectors/ | jq
通过如下命令,查看刚注册的连接器的状态:
curl -s 192.168.51.72:8083/connectors/mysql-test-connect/status | jq
连接器任务状态正常的话,输出内容如下图:

3.2、验证kafka生成的主题和消息
源连接器注册成功,任务状态正常后,可以登录Kafka的ui,我这里用的是kafdrop来查看和管理Kafka主题,可以看到如下主题,其中下划线为红线的,就是我的连接器配置执行捕获到生成的主题,mytest是我的表,我列出库名,表名是通过路由表达式生成的:

点击某个主题进去,还可以查看具体的消息格式,如下图为点击了mytest这个主题进去的情况:

再点击左上角的【View Messages】后如下图:

再点击图上右边的【View Messages】,进入下图:

图上就是具体的kafka收到的来自源连接器捕获推送来的消息了。可以展开每个消息,查看到具体的内容,数据的变化消息格式如下图:

before部分存的是修改之前的值,after部分存的是修改后的值,op字段标记操作的类型,c表示新增,u表示修改,d表示删除。
3.3、注册接收器连接器
3.3.1、编辑配置
编写json配置文件,文件名为:mysql-test-sink.json,内容如下:
{
"name": "mysql-test-sink",
"config":{
"connector.class": "io.debezium.connector.jdbc.JdbcSinkConnector",
"connection.url": "jdbc:mysql://192.168.51.212/lmftest",
"connection.username": "root",
"connection.password": "你的目标数据库的密码",
"tasks.max": "1",
"topics": "mytest",
"dialect.name": "MySqlDatabaseDialect",
"debezium.sink.databend.upsert-keep-deletes": "false",
"transforms": "unwrap",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms.unwrap.drop.tombstones": "true",
"transforms.unwrap.delete.handling.mode": "rewrite",
"schema.evolution": "basic",
"insert.mode": "upsert",
"delete.enabled": "true",
"primary.key.mode": "record_key",
"database.time_zone": "UTC",
"primary.key.fields": "id"
}
}
需要注意的是,保证删除能正确消费的参数是:
“debezium.sink.databend.upsert-keep-deletes”: “false”,
“transforms”: “unwrap”,
“transforms.unwrap.type”: “io.debezium.transforms.ExtractNewRecordState”,
“transforms.unwrap.drop.tombstones”: “true”,
“transforms.unwrap.delete.handling.mode”: “rewrite”,
“schema.evolution”: “basic”,
如果不写这几个参数,删除的操作不会被消费!!!这块花了我不少时间研究,最后是在同事的协助下找到了解决方法的,给我的小伙伴点赞!
3.3.2、执行命令注册接收连接器
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" 192.168.51.72:8083/connectors/ -d @mysql-test-sink.json
3.3.3、验证同步结果
(1)首先检查接收连接器的状态
curl -s 192.168.51.72:8083/connectors/mysql-test-sink/status | jq
输出如下,表示正常:

(2)查看Kafka的消息主题,可以在有下面的消费者列表看到接收连接器:

如上图中,右下角红框部分Consumers下面的connect-mysql-test-sink就是刚才创建的接收连接器。
(3)检查数据库的数据情况
到目标库看,可以看到,目标库新增了mytest表,并有数据,测试删除后,表追加了__deleted字段,且源端delete的记录,__deleted的值置为true,数据同步如图:

删除的同步原理debezium是采用的逻辑删除,这样也很好,可还可以帮助业务系统做误操作的恢复。
到此debezium配合kafka搭建实时数仓的研究可以说告一段落了,虽然后面还要继续研究其他类型作为来源的数据库的情况,但是,通过MySQL的测试,实践思路已经明确。所以整理了这篇文章,为自己的工作进行总结,也希望能为其他人提供参考,因为删除的同步我还没看到网上有实际有效的参考。




