一、前言
debezium属于kafka的一个connect插件,红帽的一个开源项目,有2.7和3.0两个稳定版本,可以根据kafka的版本自定义选择,kafka存在非常多connect插件,其中同类的商业版本为confluent。
debezium捕获功能基本支持大部分主流的数据库MySQL、MariaDB、MongoDB、PostgreSQL、Oracle、SQL Server、Db2、Cassandra、Vitess、Spanner、Informix,入库方式本身有jdbc connector组件去消费kafka的topic,当然开发者也可以自行写一些程序去消费。
主要管理方式以kafka为主,connector使用REST的方式进行管理,debezium本身也有图形化管理工具debezium UI,也可以使用一些kafka的管理工
具,开源的也非常多。
基本原理为捕获的数据写入到kafka,之后 从kafka进行消费,本次实战以Oracle到Oracle同步测试来演示。
架构

二、kafka配置
1、配置JDK
mkdir /app/jdk
tar -zxvf jdk-11.0.25_linux-x64_bin.tar.gz
export JAVA_HOME=/app/jdk/jdk-11.0.25
export CLASSPATH=$JAVA_HOME/lib/
export ZK_HOME=/app/zookeeper/apache-zookeeper-3.8.4-bin
export PATH=$ZK_HOME/bin:$JAVA_HOME/bin:$PATH
2、配置zookeeper
创建目录
mkdir -p /app/zookeeper
解压
tar -xvf apache-zookeeper-3.8.4-bin.tar.gz
参数配置
cd /app/zookeeper/apache-zookeeper-3.8.4-bin
mkdir data
cd /app/zookeeper/apache-zookeeper-3.8.4-bin/conf
cp zoo_sample.cfg zoo.cfg
vi zoo.cfg
dataDir=/app/zookeeper/apache-zookeeper-3.8.4-bin/data
启动服务
cd /app/zookeeper/apache-zookeeper-3.8.4-bin/bin/
./zkServer.sh start
./zkServer.sh status
查看

Note:单机部署,使用默认端口为2181,自定义个数据目录,java进程名:QuorumPeerMain
3、配置kafka
创建目录
mkdir /app/kafka
解压
tar -zxvf kafka_2.13-2.8.2.tgz
配置参数
vi server.properties
log.dirs=/app/kafka/kafka_2.13-2.8.2/data/kafka-logs
num.partitions=1
default.replication.factor=1
服务启动
cd /app/kafka/kafka_2.13-2.8.2
bin/kafka-server-start.sh -daemon config/server.properties
查看

Note:单机版部署,使用默认端口9092
三、debezium配置
新建插件目录
cd /app/kafka/kafka_2.13-2.8.2
mkdir plugins
解压到插件目录
cd /app/kafka/kafka_2.13-2.8.2/plugins
tar -zxvf debezium-connector-oracle-2.7.3.Final-plugin.tar.gz
tar -zxvf debezium-connector-jdbc-2.7.3.Final-plugin.tar.gz
查看
[fwl@kafka plugins]$ ll
total 65952
drwxrwxr-x 2 fwl fwl 4096 Oct 16 23:29 debezium-connector-jdbc
-rw-r--r-- 1 root root 32787029 Oct 16 05:07 debezium-connector-jdbc-2.7.3.Final-plugin.tar.gz
drwxrwxr-x 2 fwl fwl 4096 Oct 16 09:03 debezium-connector-oracle
-rw-r--r-- 1 root root 34731726 Oct 16 05:07 debezium-connector-oracle-2.7.3.Final-plugin.tar.gz
[fwl@kafka plugins]$
修改connect配置文件
cd /app/kafka/kafka_2.13-2.8.2/config
vi connect-distributed.properties
plugin.path=/app/kafka/kafka_2.13-2.8.2/plugins
启动
cd /app/kafka/kafka_2.13-2.8.2
bin/connect-distributed.sh -daemon config/connect-distributed.properties
Note:connect有2种方式standalone和distributed,默认端口8083,可以配置启动多个connect
查看进程

查看插件

四、抽取-source connector配置
捕获Oracle支持3种方式,LogMiner、XStream API和OpenLogReplicator,以下使用logminer的方式。
配置数据库
alter database archivelog;
alter database force logging;
alter database add supplemental log data;
创建的CDC专用用户
create tablespace dbzumtbs datafile '/oracle/app/oracle/oradata/fwldb/dbzumtbs01.dbf' size 100m autoextend on;
create user dbzum identified by "dbzum" default tablespace dbzumtbs quota unlimited on dbzumtbs;
grant dba to dbzum;
开启附加日志
ALTER TABLE scott.nokey05 ADD SUPPLEMENTAL LOG DATA (PRIMARY KEY, UNIQUE) COLUMNS;
抽取connector配置
curl -X POST http://localhost:8083/connectors -H "Content-Type: application/json" -d '{
"name": "scott_source01",
"config": {
"connector.class" : "io.debezium.connector.oracle.OracleConnector",
"tasks.max" : "1",
"topic.prefix":"fwldb",
"database.hostname" : "192.168.137.12",
"database.port" : "1521",
"database.user" : "dbzum",
"database.password" : "dbzum",
"database.dbname" : "fwldb",
"snapshot.mode":"no_data",
"table.include.list" : "scott.fwl_t1",
"log.mining.strategy":"online_catalog",
"schema.history.internal.kafka.bootstrap.servers" : "192.168.137.11:9092",
"schema.history.internal.kafka.topic": "fwldb"
}
}'
测试
insert into FWL_T1 values(1,'aa');
update FWL_T1 set name='bb' where id = 1;
日志查看

alert日志查看

连接器查看

Note:支持Oracle大部分版本,CDB和non-CDB,数据初始化方式可以使用数据泵,debezium本身也支持数据初始化,支持DDL同步
五、入库-sink connector配置
连接器配置
curl -X POST http://localhost:8083/connectors -H "Content-Type: application/json" -d '{
"name": "scott_sink01",
"config": {
"connector.class":"io.debezium.connector.jdbc.JdbcSinkConnector",
"tasks.max": "1",
"connection.url":"jdbc:oracle:thin:@192.168.137.12:1521/testdb",
"connection.username":"fwl",
"connection.password":"fwl",
"table.name.format":"fwl.fwl_t1",
"insert.mode":"upsert",
"delete.enabled":"true",
"primary.key.mode":"record_key",
"schema.evolution":"basic",
"topics":"fwldb.SCOTT.FWL_T1"
}
}'
连接器状态查看

Note:可以利用正则匹配多个topic入库
六、数据检查
检查插入和更新


topic信息查看


最后检查删除


topic信息查看

Note:可以看到数据同步正常。




