暂无图片
暂无图片
1
暂无图片
暂无图片
暂无图片

使用 docker-compose 部署 Debezium 提供的测试环境学习 Avro

原创 张玉龙 2022-04-27
3172

image.png

Debezium 提供的 Docker 测试环境

下载解压,可以看到 tutorial 提供的测试用例

  • Debezium 提供的测试环境主要是针对 MySQL 的,本文测试主要针对 PostgreSQL,根据 MySQL 的相关测试修改一下,就可以用于 PostgreSQL 的测试了。
[root@docker ~]# unzip debezium-examples-main.zip [root@docker ~]# cd debezium-examples-main/tutorial [root@docker tutorial]# ls -lrt total 768 -rw-r--r--. 1 root root 602664 Apr 19 22:42 vitess-sharding-setup.png drwxr-xr-x. 2 root root 30 Apr 19 22:42 secrets -rw-r--r--. 1 root root 521 Apr 19 22:42 register-vitess.json -rw-r--r--. 1 root root 538 Apr 19 22:42 register-sqlserver.json -rw-r--r--. 1 root root 448 Apr 19 22:42 register-postgres.json -rw-r--r--. 1 root root 582 Apr 19 22:42 register-oracle-logminer.json -rw-r--r--. 1 root root 568 Apr 19 22:42 register-mysql.json -rw-r--r--. 1 root root 637 Apr 19 22:42 register-mysql-ext-secrets.json -rw-r--r--. 1 root root 860 Apr 19 22:42 register-mysql-avro.json -rw-r--r--. 1 root root 878 Apr 19 22:42 register-mysql-apicurio.json -rw-r--r--. 1 root root 1172 Apr 19 22:42 register-mysql-apicurio-converter-json.json -rw-r--r--. 1 root root 1166 Apr 19 22:42 register-mysql-apicurio-converter-avro.json -rw-r--r--. 1 root root 437 Apr 19 22:42 register-mongodb.json -rw-r--r--. 1 root root 576 Apr 19 22:42 register-db2.json -rw-r--r--. 1 root root 22923 Apr 19 22:42 README.md -rw-r--r--. 1 root root 1955 Apr 19 22:42 docker-compose-zookeeperless-kafka.yaml -rw-r--r--. 1 root root 1616 Apr 19 22:42 docker-compose-zookeeperless-kafka-combined.yaml -rw-r--r--. 1 root root 885 Apr 19 22:42 docker-compose-vitess.yaml -rw-r--r--. 1 root root 1119 Apr 19 22:42 docker-compose-sqlserver.yaml -rw-r--r--. 1 root root 1082 Apr 19 22:42 docker-compose-postgres.yaml -rw-r--r--. 1 root root 927 Apr 19 22:42 docker-compose-oracle.yaml -rw-r--r--. 1 root root 887 Apr 19 22:42 docker-compose-mysql.yaml -rw-r--r--. 1 root root 1068 Apr 19 22:42 docker-compose-mysql-ext-secrets.yml -rw-r--r--. 1 root root 1671 Apr 19 22:42 docker-compose-mysql-avro-worker.yaml -rw-r--r--. 1 root root 1391 Apr 19 22:42 docker-compose-mysql-avro-connector.yaml -rw-r--r--. 1 root root 1036 Apr 19 22:42 docker-compose-mysql-apicurio.yaml -rw-r--r--. 1 root root 43764 Apr 19 22:42 docker-compose-mysql-apicurio.png -rw-r--r--. 1 root root 1094 Apr 19 22:42 docker-compose-mongodb.yaml -rw-r--r--. 1 root root 1098 Apr 19 22:42 docker-compose-db2.yaml -rw-r--r--. 1 root root 930 Apr 19 22:42 docker-compose-cassandra.yaml drwxr-xr-x. 3 root root 36 Apr 19 22:42 debezium-with-oracle-jdbc drwxr-xr-x. 3 root root 74 Apr 19 22:42 debezium-vitess-init drwxr-xr-x. 2 root root 27 Apr 19 22:42 debezium-sqlserver-init drwxr-xr-x. 4 root root 41 Apr 19 22:42 debezium-db2-init drwxr-xr-x. 2 root root 141 Apr 19 22:42 debezium-cassandra-init drwxr-xr-x. 2 root root 26 Apr 19 22:42 db2data

安装 docker-compose

# 下载的二进制文件,给个可执行权限就可以直接运行,为了方便修改下文件名 [root@docker ~]# chmod +x docker-compose-linux-x86_64 [root@docker ~]# mv docker-compose-linux-x86_64 docker-compose

简单管理 docker-compose

# 帮助 /root/docker-compose -h # 创建并启动项目中的所有容器 /root/docker-compose -f xxx.yaml up # 停止并删除项目中的所有容器 /root/docker-compose -f xxx.yaml down # 重启项目中的服务(单个容器),以下示例重启connect容器 /root/docker-compose -f xxx.yaml restart connect # 列出项目中所有的容器 /root/docker-compose -f xxx.yaml ps

测试 Avro

Avro 有三种配置方式,第一种在 Kafka Connect Worker 配置,第二种在 Debezium 连接器上配置,第三种是使用 Apicurio 注册表

第一种配置方式:在 Kafka Connect Worker 配置

编辑 docker-compose 的配置文件

  • tutorial 里面只有 MySQL 的 docker-compose-mysql-avro-worker.yaml,仿照这个写一个 PostgreSQL 的 docker-compose 配置文件
  • 添加了一个 Kafka Web 管理工具
# cd /root/debezium-examples-main/tutorial # cat docker-compose-postgres-avro-worker.yaml version: '2' services: zookeeper: image: quay.io/debezium/zookeeper:${DEBEZIUM_VERSION} ports: - 2181:2181 - 2888:2888 - 3888:3888 kafka: image: quay.io/debezium/kafka:${DEBEZIUM_VERSION} ports: - 9092:9092 links: - zookeeper environment: - ZOOKEEPER_CONNECT=zookeeper:2181 postgres: image: quay.io/debezium/example-postgres:${DEBEZIUM_VERSION} ports: - 5432:5432 environment: - POSTGRES_USER=postgres - POSTGRES_PASSWORD=postgres schema-registry: image: confluentinc/cp-schema-registry:7.0.1 ports: - 8181:8181 - 8081:8081 environment: - SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS=kafka:9092 - SCHEMA_REGISTRY_HOST_NAME=schema-registry - SCHEMA_REGISTRY_LISTENERS=http://schema-registry:8081 links: - zookeeper connect: image: quay.io/debezium/connect:${DEBEZIUM_VERSION} ports: - 8083:8083 links: - kafka - postgres - schema-registry environment: - BOOTSTRAP_SERVERS=kafka:9092 - GROUP_ID=1 - CONFIG_STORAGE_TOPIC=my_connect_configs - OFFSET_STORAGE_TOPIC=my_connect_offsets - STATUS_STORAGE_TOPIC=my_connect_statuses - KEY_CONVERTER=io.confluent.connect.avro.AvroConverter - VALUE_CONVERTER=io.confluent.connect.avro.AvroConverter - INTERNAL_KEY_CONVERTER=org.apache.kafka.connect.json.JsonConverter - INTERNAL_VALUE_CONVERTER=org.apache.kafka.connect.json.JsonConverter - CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL=http://schema-registry:8081 - CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL=http://schema-registry:8081 kafkaui: image: provectuslabs/kafka-ui:latest ports: - 8811:8080 links: - kafka environment: - KAFKA_CLUSTERS_0_NAME=test - KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS=kafka:9092

启动 docker-compose

  • 启动 docker-compose,会相继启动 zookeeper、kafka、postgres、schema-registry、connect 和一个 Kafka 的 Web 管理的容器
export DEBEZIUM_VERSION=1.9 cd /root/debezium-examples-main/tutorial /root/docker-compose -f docker-compose-postgres-avro-worker.yaml up

image.png

注册 PostgreSQL connector

  • 使用 Debezium tutorial 中自带的 register-postgres.json
# cd /root/debezium-examples-main/tutorial # cat register-postgres.json { "name": "inventory-connector", "config": { "connector.class": "io.debezium.connector.postgresql.PostgresConnector", "tasks.max": "1", "database.hostname": "postgres", "database.port": "5432", "database.user": "postgres", "database.password": "postgres", "database.dbname" : "postgres", "database.server.name": "dbserver1", "schema.include.list": "inventory" } } curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors/ -d @register-postgres.json

登录 Kafka Web 查看 Topics 的情况

  • http://192.168.0.40:8811/
  • 可以看到自动为源端的每个表创建的 Topics
  • 可以看到自动为 schemas 创建的 Topics _schemas
    image.png
  • 可以看到每条消息的 key 和 value 都是二进制的

image.png

配置网络

  • 本实验的源端是 PostgreSQL,目标端是 Oracle 19C PDB,Debezium 提供了 PostgreSQL 的 Docker 镜像,但是没有 Oracle 的镜像。
  • 在 Docker 上安装 Oracle 参考:使用Docker装一个Oracle 19C的单机测试环境
  • 使用 docker-compose 部署的环境会建立一个默认的网络,名称为 docker-compose.yml 所在目录名称小写形式加上 “_default”,这里就是 tutorial_default。
    image.png
  • 在 Docker 上安装 Oracle 使用的默认网络,这样和 docker-compose 部署的环境,网络是相互隔离的。
  • 为了让 docker-compose 部署后的 connect 容器能与 Oracle 相连通,需要在 connect 容器上添加 Docker 的默认网络。
# 先使用 docker ps 查看 tutoral-connect-1 容器的 CONTAINER ID (5aedff3b90e4) # docker network connect bridge 5aedff3b90e4 # docker inspect tutoral-connect-1 |grep IPAddress "SecondaryIPAddresses": null, "IPAddress": "172.17.0.3", "IPAddress": "172.17.0.3", "IPAddress": "172.26.0.3",

注册一个消费者连接器

  • 消费者连接器使用的是 Kafka Connect JDBC,消费到 Oracle 19C PDB 中
  • Debezium 提供的 connect 容器中没有 Kafka Connect JDBC,需要自行下载并上传,重启 connect 容器
# 上传 Kafka Connect JDBC docker cp confluentinc-kafka-connect-jdbc-10.4.1 tutorial-connect-1:/kafka/connect # 使用 docker-compose 重启 connect 服务 cd /root/debezium-examples-main/tutorial /root/docker-compose -f docker-compose-postgres-avro-worker.yaml restart connect
  • 编辑消费者的连接器并注册到 Kafka Connect
[root@docker ~]# cat oracle-jdbc-sink.json { "name": "oracle-jdbc-sink", "config": { "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector", "connection.url": "jdbc:oracle:thin:@//10.16.0.1:1521/pdbtt", "connection.user": "inventory", "connection.password": "inventory", "tasks.max": "1", "topics": "dbserver1.inventory.orders", "table.name.format": "ORDERS", "quote.sql.identifiers": "never", "transforms": "unwrap", "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState", "transforms.unwrap.drop.tombstones": "false", "auto.create": "true", "auto.evolve": "true", "delete.enabled": "true", "pk.mode": "record_key" } } curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors/ -d @oracle-jdbc-sink.json
  • 查看消费的数据
SQL> desc INVENTORY.ORDERS; SQL> select * from INVENTORY.ORDERS;

image.png

停止并删除容器,清理测试环境

# cd /root/debezium-examples-main/tutorial # /root/docker-compose -f docker-compose-postgres-avro-worker.yaml down

第二种配置方式:在 Debezium 连接器上配置

编辑 docker-compose 的配置文件

  • tutorial 里面只有 MySQL 的 docker-compose-mysql-avro-connector.yaml,仿照这个写一个 PostgreSQL 的 docker-compose 配置文件
  • 添加了一个 Kafka Web 管理工具
# cd /root/debezium-examples-main/tutorial # cat docker-compose-postgres-avro-connector.yaml version: '2' services: zookeeper: image: quay.io/debezium/zookeeper:${DEBEZIUM_VERSION} ports: - 2181:2181 - 2888:2888 - 3888:3888 kafka: image: quay.io/debezium/kafka:${DEBEZIUM_VERSION} ports: - 9092:9092 links: - zookeeper environment: - ZOOKEEPER_CONNECT=zookeeper:2181 postgres: image: quay.io/debezium/example-postgres:${DEBEZIUM_VERSION} ports: - 5432:5432 environment: - POSTGRES_USER=postgres - POSTGRES_PASSWORD=postgres schema-registry: image: confluentinc/cp-schema-registry:7.0.1 ports: - 8181:8181 - 8081:8081 environment: - SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS=kafka:9092 - SCHEMA_REGISTRY_HOST_NAME=schema-registry - SCHEMA_REGISTRY_LISTENERS=http://schema-registry:8081 links: - zookeeper connect: image: quay.io/debezium/connect:${DEBEZIUM_VERSION} ports: - 8083:8083 links: - kafka - postgres - schema-registry environment: - BOOTSTRAP_SERVERS=kafka:9092 - GROUP_ID=1 - CONFIG_STORAGE_TOPIC=my_connect_configs - OFFSET_STORAGE_TOPIC=my_connect_offsets - STATUS_STORAGE_TOPIC=my_connect_statuses - INTERNAL_KEY_CONVERTER=org.apache.kafka.connect.json.JsonConverter - INTERNAL_VALUE_CONVERTER=org.apache.kafka.connect.json.JsonConverter kafkaui: image: provectuslabs/kafka-ui:latest ports: - 8811:8080 links: - kafka environment: - KAFKA_CLUSTERS_0_NAME=test - KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS=kafka:9092

启动 docker-compose

  • 启动 docker-compose,会相继启动 zookeeper、kafka、postgres、schema-registry、connect 和一个 Kafka 的 Web 管理的容器
export DEBEZIUM_VERSION=1.9 cd /root/debezium-examples-main/tutorial /root/docker-compose -f docker-compose-postgres-avro-connector.yaml up

注册 PostgreSQL connector

  • tutorial 里面只有 MySQL 的 register-mysql-avro.json,仿照这个写一个 PostgreSQL 的配置文件
# cd /root/debezium-examples-main/tutorial # cat register-postgres-avro.json { "name": "inventory-connector", "config": { "connector.class": "io.debezium.connector.postgresql.PostgresConnector", "tasks.max": "1", "database.hostname": "postgres", "database.port": "5432", "database.user": "postgres", "database.password": "postgres", "database.dbname" : "postgres", "database.server.name": "dbserver1", "schema.include.list": "inventory", "key.converter": "io.confluent.connect.avro.AvroConverter", "value.converter": "io.confluent.connect.avro.AvroConverter", "key.converter.schema.registry.url": "http://schema-registry:8081", "value.converter.schema.registry.url": "http://schema-registry:8081" } } curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors/ -d @register-postgres-avro.json

查看 customers schema

curl -X GET http://localhost:8081/subjects/dbserver1.inventory.customers-value/versions/1 curl -X GET http://localhost:8081/subjects/dbserver1.inventory.customers-value/versions/1 | jq '.schema | fromjson'
  • 服务注册表还带有一个可以读取 Avro 消息的控制台使用者:
# cd /root/debezium-examples-main/tutorial /root/docker-compose -f docker-compose-postgres-avro-connector.yaml exec schema-registry /usr/bin/kafka-avro-console-consumer \ --bootstrap-server kafka:9092 \ --from-beginning \ --property print.key=true \ --property schema.registry.url=http://schema-registry:8081 \ --topic dbserver1.inventory.customers

配置网络

  • 详细说明见 上一种配置方式
# 先使用 docker ps 查看 tutoral-connect-1 容器的 CONTAINER ID (5aedff3b90e4) # docker network connect bridge 5aedff3b90e4

注册一个消费者连接器

  • 消费者连接器使用的是 Kafka Connect JDBC,消费到 Oracle 19C PDB 中
  • Debezium 提供的 connect 容器中没有 Kafka Connect JDBC,需要自行下载并上传,重启 connect 容器
# 上传 Kafka Connect JDBC docker cp confluentinc-kafka-connect-jdbc-10.4.1 tutorial-connect-1:/kafka/connect # 使用 docker-compose 重启 connect 服务 cd /root/debezium-examples-main/tutorial /root/docker-compose -f docker-compose-postgres-avro-connector.yaml restart connect
  • 编辑消费者的连接器并注册到 Kafka Connect
[root@docker tutorial]# cat oracle-jdbc-sink-avro.json { "name": "oracle-jdbc-sink-avro", "config": { "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector", "connection.url": "jdbc:oracle:thin:@//172.17.0.2:1521/pdbtt", "connection.user": "inventory", "connection.password": "inventory", "tasks.max": "1", "topics": "dbserver1.inventory.orders", "table.name.format": "ORDERS", "quote.sql.identifiers": "never", "transforms": "unwrap", "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState", "transforms.unwrap.drop.tombstones": "false", "auto.create": "true", "auto.evolve": "true", "delete.enabled": "true", "pk.mode": "record_key", "key.converter": "io.confluent.connect.avro.AvroConverter", "value.converter": "io.confluent.connect.avro.AvroConverter", "key.converter.schema.registry.url": "http://schema-registry:8081", "value.converter.schema.registry.url": "http://schema-registry:8081" } } curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors/ -d @oracle-jdbc-sink-avro.json
  • 查看消费的数据
SQL> desc INVENTORY.ORDERS; SQL> select * from INVENTORY.ORDERS;

image.png

停止并删除容器,清理测试环境

# cd /root/debezium-examples-main/tutorial # /root/docker-compose -f docker-compose-postgres-avro-connector.yaml down

第三种配置方式:使用 Apicurio 注册表

Apicurio Registry 是一个开源 API 和 schema 注册表,除其他外,可用于存储 Kafka 记录的 schema。 它提供

  • 它自己的原生 Avro 转换器和 Protobuf 序列化器
  • 将其 schema 导出到注册表的 JSON 转换器
  • 与 IBM 或 Confluent 等其他 schema 注册表的兼容层; 它可以与 Confluent Avro 转换器一起使用。

编辑 docker-compose 的配置文件

  • tutorial 里面只有 MySQL 的 docker-compose-mysql-apicurio.yaml,仿照这个写一个 PostgreSQL 的 docker-compose 配置文件
  • 添加了一个 Kafka Web 管理工具
# cd /root/debezium-examples-main/tutorial # cat docker-compose-postgres-apicurio.yaml version: '2' services: zookeeper: image: quay.io/debezium/zookeeper:${DEBEZIUM_VERSION} ports: - 2181:2181 - 2888:2888 - 3888:3888 kafka: image: quay.io/debezium/kafka:${DEBEZIUM_VERSION} ports: - 9092:9092 links: - zookeeper environment: - ZOOKEEPER_CONNECT=zookeeper:2181 postgres: image: quay.io/debezium/example-postgres:${DEBEZIUM_VERSION} ports: - 5432:5432 environment: - POSTGRES_USER=postgres - POSTGRES_PASSWORD=postgres apicurio: image: apicurio/apicurio-registry-mem:2.0.0.Final ports: - 8080:8080 connect: image: quay.io/debezium/connect:${DEBEZIUM_VERSION} ports: - 8083:8083 links: - kafka - postgres - apicurio environment: - BOOTSTRAP_SERVERS=kafka:9092 - GROUP_ID=1 - CONFIG_STORAGE_TOPIC=my_connect_configs - OFFSET_STORAGE_TOPIC=my_connect_offsets - STATUS_STORAGE_TOPIC=my_connect_statuses - ENABLE_APICURIO_CONVERTERS=true kafkaui: image: provectuslabs/kafka-ui:latest ports: - 8811:8080 links: - kafka environment: - KAFKA_CLUSTERS_0_NAME=test - KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS=kafka:9092

启动 docker-compose

  • 启动 docker-compose,会相继启动 zookeeper、kafka、postgres、apicurio、connect 和一个 Kafka 的 Web 管理的容器
export DEBEZIUM_VERSION=1.9 cd /root/debezium-examples-main/tutorial /root/docker-compose -f docker-compose-postgres-apicurio.yaml up

注册 PostgreSQL connector (Apicurio - JSON 格式)

  • tutorial 里面只有 MySQL 的 register-mysql-apicurio-converter-json.json,仿照这个写一个 PostgreSQL 的配置文件
# cd /root/debezium-examples-main/tutorial # cat register-postgres-apicurio-converter-json.json { "name": "inventory-connector", "config": { "connector.class": "io.debezium.connector.postgresql.PostgresConnector", "tasks.max": "1", "database.hostname": "postgres", "database.port": "5432", "database.user": "postgres", "database.password": "postgres", "database.dbname" : "postgres", "database.server.name": "dbserver1", "schema.include.list": "inventory", "key.converter": "io.apicurio.registry.utils.converter.ExtJsonConverter", "key.converter.apicurio.registry.url": "http://apicurio:8080/apis/registry/v2", "key.converter.apicurio.registry.auto-register": "true", "key.converter.apicurio.registry.find-latest": "true", "value.converter": "io.apicurio.registry.utils.converter.ExtJsonConverter", "value.converter.apicurio.registry.url": "http://apicurio:8080/apis/registry/v2", "value.converter.apicurio.registry.auto-register": "true", "value.converter.apicurio.registry.find-latest": "true" } } curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors/ -d @register-postgres-apicurio-converter-json.json

注册 PostgreSQL connector (Apicurio - Avro 格式)

  • tutorial 里面只有 MySQL 的 register-mysql-apicurio-converter-avro.json,仿照这个写一个 PostgreSQL 的配置文件
# cd /root/debezium-examples-main/tutorial # cat register-postgres-apicurio-converter-avro.json { "name": "inventory-connector", "config": { "connector.class": "io.debezium.connector.postgresql.PostgresConnector", "tasks.max": "1", "database.hostname": "postgres", "database.port": "5432", "database.user": "postgres", "database.password": "postgres", "database.dbname" : "postgres", "database.server.name": "dbserver1", "schema.include.list": "inventory", "key.converter": "io.apicurio.registry.utils.converter.AvroConverter", "key.converter.apicurio.registry.url": "http://apicurio:8080/apis/registry/v2", "key.converter.apicurio.registry.auto-register": "true", "key.converter.apicurio.registry.find-latest": "true", "value.converter": "io.apicurio.registry.utils.converter.AvroConverter", "value.converter.apicurio.registry.url": "http://apicurio:8080/apis/registry/v2", "value.converter.apicurio.registry.auto-register": "true", "value.converter.apicurio.registry.find-latest": "true" } } curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors/ -d @register-postgres-apicurio-converter-avro.json

注册 PostgreSQL connector (Confluent - Avro 格式)

  • tutorial 里面只有 MySQL 的 register-mysql-apicurio.json,仿照这个写一个 PostgreSQL 的配置文件
# cd /root/debezium-examples-main/tutorial # cat register-postgres-apicurio.json { "name": "inventory-connector", "config": { "connector.class": "io.debezium.connector.postgresql.PostgresConnector", "tasks.max": "1", "database.hostname": "postgres", "database.port": "5432", "database.user": "postgres", "database.password": "postgres", "database.dbname" : "postgres", "database.server.name": "dbserver1", "schema.include.list": "inventory", "key.converter": "io.confluent.connect.avro.AvroConverter", "value.converter": "io.confluent.connect.avro.AvroConverter", "key.converter.schema.registry.url": "http://apicurio:8080/apis/ccompat/v6", "value.converter.schema.registry.url": "http://apicurio:8080/apis/ccompat/v6" } } curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors/ -d @register-postgres-apicurio.json

查看 customers schema

# Apicurio - JSON 格式和 Avro 格式 curl -X GET http://localhost:8080/apis/registry/v2/groups/default/artifacts/dbserver1.inventory.customers-value | jq . # Confluent - Avro 格式 curl -X GET http://localhost:8080/apis/ccompat/v6/subjects/dbserver1.inventory.customers-value/versions/1 | jq '.schema | fromjson'
  • 服务注册表还带有一个可以读取 Avro 消息的控制台使用者:
# Apicurio - JSON 格式 # cd /root/debezium-examples-main/tutorial /root/docker-compose -f docker-compose-postgres-apicurio.yaml exec kafka /kafka/bin/kafka-console-consumer.sh \ --bootstrap-server kafka:9092 \ --from-beginning \ --property print.key=true \ --topic dbserver1.inventory.customers

当您查看数据消息时,您会注意到它仅包含payload但不包含schema部分,因为它已外部化到注册表中。

查看 Topics

  • Apicurio - JSON
    image.png
    image.png
  • Apicurio - Avro
    image.png
    image.png
  • Confluent - Avro
    image.png
    image.png

配置网络

  • 详细说明见 上一种配置方式
# 先使用 docker ps 查看 tutoral-connect-1 容器的 CONTAINER ID (5aedff3b90e4) # docker network connect bridge 5aedff3b90e4

注册一个消费者连接器

  • 消费者连接器使用的是 Kafka Connect JDBC,消费到 Oracle 19C PDB 中
  • Debezium 提供的 connect 容器中没有 Kafka Connect JDBC,需要自行下载并上传,重启 connect 容器
# 上传 Kafka Connect JDBC docker cp confluentinc-kafka-connect-jdbc-10.4.1 tutorial-connect-1:/kafka/connect # 使用 docker-compose 重启 connect 服务 cd /root/debezium-examples-main/tutorial /root/docker-compose -f docker-compose-postgres-apicurio.yaml restart connect
  • 编辑消费者的连接器并注册到 Kafka Connect (Apicurio - JSON 格式)
[root@docker tutorial]# cat oracle-jdbc-sink-apicurio-json.json { "name": "oracle-jdbc-sink-apicurio-json", "config": { "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector", "connection.url": "jdbc:oracle:thin:@//172.17.0.2:1521/pdbtt", "connection.user": "inventory", "connection.password": "inventory", "tasks.max": "1", "topics": "dbserver1.inventory.orders", "table.name.format": "ORDERS", "quote.sql.identifiers": "never", "transforms": "unwrap", "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState", "transforms.unwrap.drop.tombstones": "false", "auto.create": "true", "auto.evolve": "true", "delete.enabled": "true", "pk.mode": "record_key", "key.converter": "io.apicurio.registry.utils.converter.ExtJsonConverter", "key.converter.apicurio.registry.url": "http://apicurio:8080/apis/registry/v2", "key.converter.apicurio.registry.auto-register": "true", "key.converter.apicurio.registry.find-latest": "true", "value.converter": "io.apicurio.registry.utils.converter.ExtJsonConverter", "value.converter.apicurio.registry.url": "http://apicurio:8080/apis/registry/v2", "value.converter.apicurio.registry.auto-register": "true", "value.converter.apicurio.registry.find-latest": "true" } } curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors/ -d @oracle-jdbc-sink-apicurio-json.json
  • 消费端没走通,报错:Tolerance exceeded in error handler

image.png

  • 编辑消费者的连接器并注册到 Kafka Connect (Apicurio - Avro 格式)
[root@docker tutorial]# cat oracle-jdbc-sink-apicurio-avro.json { "name": "oracle-jdbc-sink-apicurio-avro", "config": { "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector", "connection.url": "jdbc:oracle:thin:@//172.17.0.2:1521/pdbtt", "connection.user": "inventory", "connection.password": "inventory", "tasks.max": "1", "topics": "dbserver1.inventory.orders", "table.name.format": "ORDERS", "quote.sql.identifiers": "never", "transforms": "unwrap", "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState", "transforms.unwrap.drop.tombstones": "false", "auto.create": "true", "auto.evolve": "true", "delete.enabled": "true", "pk.mode": "record_key", "key.converter": "io.apicurio.registry.utils.converter.AvroConverter", "key.converter.apicurio.registry.url": "http://apicurio:8080/apis/registry/v2", "key.converter.apicurio.registry.auto-register": "true", "key.converter.apicurio.registry.find-latest": "true", "value.converter": "io.apicurio.registry.utils.converter.AvroConverter", "value.converter.apicurio.registry.url": "http://apicurio:8080/apis/registry/v2", "value.converter.apicurio.registry.auto-register": "true", "value.converter.apicurio.registry.find-latest": "true" } } curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors/ -d @oracle-jdbc-sink-apicurio-avro.json
  • 编辑消费者的连接器并注册到 Kafka Connect (Confluent - Avro 格式)
[root@docker tutorial]# cat oracle-jdbc-sink-apicurio-avro2.json { "name": "oracle-jdbc-sink-apicurio-avro2", "config": { "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector", "connection.url": "jdbc:oracle:thin:@//172.17.0.2:1521/pdbtt", "connection.user": "inventory", "connection.password": "inventory", "tasks.max": "1", "topics": "dbserver1.inventory.orders", "table.name.format": "ORDERS", "quote.sql.identifiers": "never", "transforms": "unwrap", "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState", "transforms.unwrap.drop.tombstones": "false", "auto.create": "true", "auto.evolve": "true", "delete.enabled": "true", "pk.mode": "record_key", "key.converter": "io.confluent.connect.avro.AvroConverter", "value.converter": "io.confluent.connect.avro.AvroConverter", "key.converter.schema.registry.url": "http://apicurio:8080/apis/ccompat/v6", "value.converter.schema.registry.url": "http://apicurio:8080/apis/ccompat/v6" } } curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors/ -d @oracle-jdbc-sink-apicurio-avro2.json

停止并删除容器,清理测试环境

# cd /root/debezium-examples-main/tutorial # /root/docker-compose -f docker-compose-postgres-apicurio.yaml down
最后修改时间:2022-04-27 19:52:15
「喜欢这篇文章,您的关注和赞赏是给作者最好的鼓励」
关注作者
1人已赞赏
【版权声明】本文为墨天轮用户原创内容,转载时必须标注文章的来源(墨天轮),文章链接,文章作者等基本信息,否则作者和墨天轮有权追究责任。如果您发现墨天轮中有涉嫌抄袭或者侵权的内容,欢迎发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

文章被以下合辑收录

评论