暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

Debezium取代ogg来进行数据割接(一)

前言

大家好,最近在研究debezium替换ogg的事情,主要实现目标是代替ogg在割接中的作用。以下是我做的一些测试,我没有选择Docker来安装,选择使用了kafka最新的2.8kraft版。我的环境如下:

主机IP应用
192.168.56.130Oracle 12c数据库
192.168.56.170PostgreSQL 13.2数据库
192.168.56.170Kafka 2.8
192.168.56.170Kafka Oracle Connector

我的kafka2.8和debezium-connector-oracle插件部署在192.168.56.170主机上。

Kafka安装

因为是测试,这里选择了最新的kafka2.8来安装。kafka2.8最大的特点就是它移除了zookeeper,采用了Quorum Controller机制。也就采用了Raft协议,节点数为奇数且大于等于3,最多可以容忍(n/2-1)个节点失败。

不过今天我们测试选择安装单节点的kafka就行了。安装单节点的很简单,先下载安装介质并解压。

1.生成集群ID

./bin/kafka-storage.sh random-uuid4rcJzm9bSJmDP7q6fX0ujA

2.格式化存储目录

./bin/kafka-storage.sh format -t 4rcJzm9bSJmDP7q6fX0ujA -c ./config/kraft/server.propertiesFormatting /tmp/kraft-combined-logs

3.启动Kafka服务

./bin/kafka-server-start.sh -daemon ./config/kraft/server.properties

4.创建topic并测试

./bin/kafka-topics.sh --create --topic oracletest --partitions 1 --replication-factor 1 --bootstrap-server 192.168.56.170:9092 Created topic oracletest../bin/kafka-topics.sh --list --bootstrap-server 192.168.56.170:9092oracletest

至此kafka2.8单机安装完毕。这里要说明一点,在kraft文件夹下的readme说了一点:

KRaft mode in Kafka 2.8 is provided for testing only, NOT for production.  We do not yet support upgrading existing ZooKeeper-based Kafka clusters into this mode.  In fact, when Kafka 3.0 is released,it will not be possible to upgrade your KRaft clusters from 2.8 to 3.0.  There may be bugs, including serious ones.  You should assume that your data could be lost at any time if you try the early access release of KRaft mode.

2.8版本仅仅只能作为测试,不要使用到生产环境

Oracle侧配置

我的oracle12c,cdb和pdb都有,所以配置略为麻烦。首先前提的条件是归档必须是打开的。

1.创建表空间和用户

sqlplus / as sysdbaCREATE TABLESPACE logminer_tbs DATAFILE '/opt/oracle/oradata/ORCLCDB/logminer_tbs.dbf' SIZE 50REUSE AUTOEXTEND ON MAXSIZE UNLIMITED;alter session set container=ORCLPDB1;Session altered.CREATE TABLESPACE logminer_tbs DATAFILE '/opt/oracle/oradata/ORCLCDB/logminer_tbs.dbf' SIZE 50REUSE AUTOEXTEND ON MAXSIZE UNLIMITED;

这里需要在你的cdb和pdb都创建表空间。如果你有多个pdb,每一个都需要建。

2.创建用户和赋权

CREATE USER c##dbzuser IDENTIFIED BY dbzDEFAULT TABLESPACE logminer_tbsQUOTA UNLIMITED ON logminer_tbsCONTAINER=ALL;GRANT CREATE SESSION TO c##dbzuser CONTAINER=ALL;GRANT SET CONTAINER TO c##dbzuser CONTAINER=ALL;GRANT SELECT ON V_$DATABASE to c##dbzuser CONTAINER=ALL;GRANT FLASHBACK ANY TABLE TO c##dbzuser CONTAINER=ALL;GRANT SELECT ANY TABLE TO c##dbzuser CONTAINER=ALL;GRANT SELECT_CATALOG_ROLE TO c##dbzuser CONTAINER=ALL;GRANT EXECUTE_CATALOG_ROLE TO c##dbzuser CONTAINER=ALL;GRANT SELECT ANY TRANSACTION TO c##dbzuser CONTAINER=ALL;GRANT LOGMINING TO c##dbzuser CONTAINER=ALL;GRANT CREATE TABLE TO c##dbzuser CONTAINER=ALL;GRANT LOCK ANY TABLE TO c##dbzuser CONTAINER=ALL;GRANT ALTER ANY TABLE TO c##dbzuser CONTAINER=ALL;GRANT CREATE SEQUENCE TO c##dbzuser CONTAINER=ALL;GRANT EXECUTE ON DBMS_LOGMNR TO c##dbzuser CONTAINER=ALL;GRANT EXECUTE ON DBMS_LOGMNR_D TO c##dbzuser CONTAINER=ALL;GRANT SELECT ON V_$LOG TO c##dbzuser CONTAINER=ALL;GRANT SELECT ON V_$LOG_HISTORY TO c##dbzuser CONTAINER=ALL;GRANT SELECT ON V_$LOGMNR_LOGS TO c##dbzuser CONTAINER=ALL;GRANT SELECT ON V_$LOGMNR_CONTENTS TO c##dbzuser CONTAINER=ALL;GRANT SELECT ON V_$LOGMNR_PARAMETERS TO c##dbzuser CONTAINER=ALL;GRANT SELECT ON V_$LOGFILE TO c##dbzuser CONTAINER=ALL;GRANT SELECT ON V_$ARCHIVED_LOG TO c##dbzuser CONTAINER=ALL;GRANT SELECT ON V_$ARCHIVE_DEST_STATUS TO c##dbzuser CONTAINER=ALL;

3.打开附加日志

ALTER DATABASE ADD SUPPLEMENTAL LOG DATA;ALTER TABLE HR.EMPLOYEES ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;

这里可以打开全库的附加日志,也可以只打开指定表的附加日志。

Kafka Connector 配置

接下来我们要配置kafka Connector。

1.下载Oracle客户端

wget "https://download.oracle.com/otn_software/linux/instantclient/19600/instantclient-basiclite-linux.x64-19.6.0.0.0dbru.zip" -O /tmp/oracle_client.zipmv oracle_client.zip /appunzip oracle_client.zip

解压完毕后,还需设置LD_LIBRARY_PATH环境变量。

export LD_LIBRARY_PATH=/app/kafka/instantclient_19_6

2.下载Oracle Connector Plug-in插件

这里我也选择最新的1.60.Beta2下面的Oracle插件。

mkdir -p pluginsmv debezium-connector-oracle-1.6.0.Beta2-plugin.tar.gz  /app/kafka/kafka_2.13-2.8.0/pluginsgunzip debezium-connector-oracle-1.6.0.Beta2-plugin.tar.gz tar -xvf debezium-connector-oracle-1.6.0.Beta2-plugin.tar

在kafka目录下创建一个plugins的目录,把下载的软件丢到该目录下并解压。

解压完毕后,修改connect-standalone.properties配置文件

cd /app/kafka/kafka_2.13-2.8.0/configvi connect-standalone.properties 

3.复制ojdbc8.jar到kafaka的libs

将Oracle客户端的ojdbc8.jar放在kafka的libs文件夹下。

cp -rp ojdbc8.jar  ../kafka/kafka_2.13-2.8.0/libs/

4.对连接器进行配置

对于独立模式,只需在文件中定义,然后通过命令行启动就行了,我的配置文件如下。

name=testoracledbconnector.class=io.debezium.connector.oracle.OracleConnectordb_type=oracletasks.max=1database.server.name=oracle-12201-vagrantdatabase.tablename.case.insensitive=truedatabase.oracle.version=12+database.hostname=192.168.56.130database.port=1521database.user=c##dbzuserdatabase.password=dbzdatabase.dbname=ORCLCDBdatabase.pdb.name=ORCLPDB1database.history.kafka.bootstrap.servers=localhost:9092database.history.kafka.topic=debezium.oracledatabase.history.skip.unparseable.ddl=trueinclude.schema.changes=truetable.whitelist=hr.a1errors.log.enable=true

这里的配置选项很多,这里不做一一描述,基本上就是Oracle数据库的一些连接信息。

针对CDB和PDB的情况,需要同时配置database.dbname和database.pdb.name

具体选项可以参考:https://debezium.io/documentation/reference/1.6/connectors/oracle.html#oracle-property-name

5.重启kafka测试是连接器是否可以正常工作

./kafka-server-stop.sh ./bin/kafka-server-start.sh -daemon ./config/kraft/server.properties

在Oracle侧插入一条数据.

insert into hr.a1 select * from EMPLOYEES where rownum<=1;SQL> select * from a1;EMPLOYEE_ID FIRST_NAME      LAST_NAME       EMAIL           PHONE_NUMBER         HIRE_DATE JOB_ID         SALARY COMMISSION_PCT MANAGER_ID DEPARTMENT_ID----------- --------------- --------------- --------------- -------------------- --------- ---------- ---------- --        198 Donald          OConnell        DOCONNEL        650.507.9833         21-JUN-07 SH_CLERK         2600                       124            50

使用kafka-console-consumer.sh脚本来读取topic信息

./kafka-console-consumer.sh --bootstrap-server 192.168.56.170:9092 --topic oracle-12201-vagrant.HR.A1 --from-beginning --property print.key = true  | jq

这里的输出结果很长,我们可以看到刚刚插入的信息,还有Oracle的SCN的信息。

后记

Oracle到Kafka这一侧的配置已经完成,后续是PostgreSQL来消费这些消息。未完待续...

参考文档

1.https://debezium.io/documentation/reference/1.6/connectors/oracle.html

文章转载自励志成为postgresql大神,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论