暂无图片
暂无图片
2
暂无图片
暂无图片
暂无图片

Every Day of a DBA,第136期: Oracle-cdd 全量+增量实时同步

原创 ByteHouse 5天前
48

作者:bytehouse
Oracle ACE、PostgreSQL ACE
10+年数据库架构与运维实战经验
公众号:bytehouse
墨天轮专栏:bytehouse
CSDN:Young DBA

摘要:
第134期: Apache SeaTunnel 2.3.13 部署文档(适配 Hadoop+Flink 集群)**https://www.modb.pro/db/2046418722001854464
第135期: pg-cdc 搞定 PG 数据库全量+增量实时同步: https://www.modb.pro/db/2047156372308566016

本期在Oracle 11g 上测试 全量+增量实时同步。

步骤 1:启用 Oracle Logminer(sysdba 执行)

1. 开启归档 & 补充日志

-- 1. 关闭数据库 shutdown immediate; -- 2. 启动到挂载模式 startup mount; -- 3. 开启归档 alter database archivelog; -- 4. 打开数据库 alter database open; -- 5. 11g 合法补充日志(关键,CDC必需) ALTER DATABASE ADD SUPPLEMENTAL LOG DATA; ALTER DATABASE ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS; -- 切换日志使配置立即生效 alter system switch logfile;

2. 创建 LogMiner 表空间 + CDC账号

-- 创建独立表空间 CREATE TABLESPACE logminer_tbs DATAFILE '/u01/app/oracle/oradata/big/logminer_tbs.dbf' SIZE 25M REUSE AUTOEXTEND ON MAXSIZE UNLIMITED; CREATE USER logminer_user IDENTIFIED BY oracle DEFAULT TABLESPACE logminer_tbs QUOTA UNLIMITED ON logminer_tbs;

3. logminer_user 用户授权

-- 基础连接 GRANT CREATE SESSION, ALTER SESSION TO logminer_user; -- LogMiner 核心包权限 GRANT EXECUTE ON SYS.DBMS_LOGMNR TO logminer_user; GRANT EXECUTE ON SYS.DBMS_LOGMNR_D TO logminer_user; -- 动态性能视图(11g 必需) GRANT SELECT ON SYS.V_$DATABASE TO logminer_user; GRANT SELECT ON SYS.V_$LOG TO logminer_user; GRANT SELECT ON SYS.V_$LOGFILE TO logminer_user; GRANT SELECT ON SYS.V_$LOGMNR_LOGS TO logminer_user; GRANT SELECT ON SYS.V_$LOGMNR_CONTENTS TO logminer_user; GRANT SELECT ON SYS.V_$ARCHIVED_LOG TO logminer_user; GRANT SELECT ON SYS.V_$ARCHIVE_DEST_STATUS TO logminer_user; GRANT SELECT ON SYS.V_$TRANSACTION TO logminer_user; -- CDC 数据读取权限 GRANT SELECT ANY TRANSACTION TO logminer_user; GRANT SELECT ANY TABLE TO logminer_user; GRANT ANALYZE ANY TO logminer_user; -- 数据字典访问 GRANT SELECT_CATALOG_ROLE TO logminer_user; GRANT EXECUTE_CATALOG_ROLE TO logminer_user;

4. 测试业务表 + 单独授权(最小权限原则)

-- 业务用户/表 CREATE USER admin IDENTIFIED BY admin; GRANT CONNECT, RESOURCE TO admin; CREATE TABLE admin.FULL_TYPES ( ID NUMBER(10) PRIMARY KEY, NAME VARCHAR2(100), AGE NUMBER(3), CREATE_TIME DATE, UPDATE_TIME TIMESTAMP, IS_VALID CHAR(1), SALARY NUMBER(12,2), REMARK CLOB ); -- 给CDC账号授权单表查询 GRANT SELECT ON admin.FULL_TYPES TO logminer_user;

步骤2:环境校验

-- 校验归档模式 SELECT log_mode FROM v$database; -- 校验补充日志(11g 通用查询) SELECT supplemental_log_data_min FROM v$database; -- 查看日志组是否生效 SELECT log_group_name, table_name FROM dba_log_groups;

步骤3:SeaTunnel 2.3.12 配置

文件:oracle-cdc-oracle11g.conf,输出获取到的数据到终端

env {
  execution.parallelism = 1
  job.mode = "STREAMING"
  checkpoint.interval = 3000
  jvm.options = "-Xms512m -Xmx1g"
}

source {
  Oracle-CDC {
    # 官方新版标准:plugin_output(废弃 result_table_name)
    plugin_output = "customers"

    url = "jdbc:oracle:thin:@10.168.1.201:1521:big"
    username = "logminer_user"
    password = "oracle"

    database-names = ["BIG"]
    schema-names = ["ADMIN"]
    table-names = ["BIG.ADMIN.FULL_TYPES"]

    startup.mode = "INITIAL"
    schema-changes.enabled = false
    exactly_once = false

    connect.timeout.ms = 60000
    connect.max-retries = 5
    connection.pool.size = 3

    snapshot.split.size = 4096
    snapshot.fetch.size = 512
    format = "DEFAULT"
  }
}

sink {
  Console {
    # 官方标准:指定输入源
    plugin_input = "customers"
    
    format = "json"
    print_data = true
  }
}

任务启动命令

./bin/seatunnel.sh \ --config ./config/oracle-cdc-oracle11g.conf \ -e local

步骤4:测试语句

-- 插入 INSERT INTO admin.FULL_TYPES(ID,NAME,AGE) VALUES (1,'测试1',22); COMMIT; -- 更新 UPDATE admin.FULL_TYPES SET NAME='修改名称' WHERE ID=1; COMMIT; -- 删除 DELETE FROM admin.FULL_TYPES WHERE ID=1; COMMIT;

步骤5:同步数据写入pg

  1. oracle 表插入测试数据
-- =============================================
-- 1. 先插入初始数据(全量同步会同步过去)
-- =============================================
INSERT INTO ADMIN.FULL_TYPES (ID, NAME) VALUES (1, 'initial_data_001');
INSERT INTO ADMIN.FULL_TYPES (ID, NAME) VALUES (2, 'initial_data_002');
INSERT INTO ADMIN.FULL_TYPES (ID, NAME) VALUES (3, 'initial_data_003');
COMMIT;

-- =============================================
-- 2. 单条插入(增量插入)
-- =============================================
INSERT INTO ADMIN.FULL_TYPES (ID, NAME) VALUES (1001, 'test_data_1001');
INSERT INTO ADMIN.FULL_TYPES (ID, NAME) VALUES (1002, 'test_data_1002');
COMMIT;

-- =============================================
-- 3. 批量插入(增量批量插入)
-- =============================================
INSERT ALL
INTO ADMIN.FULL_TYPES (ID, NAME) VALUES (2001, 'batch_data_2001')
INTO ADMIN.FULL_TYPES (ID, NAME) VALUES (2002, 'batch_data_2002')
INTO ADMIN.FULL_TYPES (ID, NAME) VALUES (2003, 'batch_data_2003')
SELECT 1 FROM DUAL;
COMMIT;
  1. 配置任务
env {
  execution.parallelism = 1
  job.mode = "STREAMING"
  checkpoint.interval = 3000
  jvm.options = "-Xms512m -Xmx1g"
}

source {
  Oracle-CDC {
    plugin_output = "customers"
    url = "jdbc:oracle:thin:@10.168.1.201:1521/big"
    username = "logminer_user"
    password = "oracle"

    database-names = ["BIG"]
    schema-names = ["ADMIN"]
    table-names = ["BIG.ADMIN.FULL_TYPES"]

    startup.mode = "INITIAL"
    schema-changes.enabled = false
    exactly_once = false

    connect.timeout.ms = 60000
    connect.max-retries = 5
    connection.pool.size = 3
    snapshot.split.size = 4096
    snapshot.fetch.size = 512
    format = "DEFAULT"
  }
}

sink {
  Jdbc {
    plugin_input = "customers"
    driver = "org.postgresql.Driver"
    url = "jdbc:postgresql://10.168.1.214:5432/traffic"
    user = "postgres"
    password = "123456"
    
    database = "traffic"
    table = "public.full_types"
    generate_sink_sql = true
    
    supported_write_mode = ["insert", "update", "delete"]
    primary_keys = ["ID"]
    batch_size = 100
    batch_interval_ms = 1000
  }
}

transform {
}
  1. 启动任务
./bin/seatunnel.sh \
--config ./config/oracle-cdc-2pg.conf \
-e local
  1. Pg 验证全量同步
SELECT * FROM ADMIN.FULL_TYPES;
  1. cdc 数据测试
-- =============================================
-- 4. 单条更新(增量更新)
-- =============================================
UPDATE ADMIN.FULL_TYPES SET NAME = 'updated_1001_new' WHERE ID = 1001;
COMMIT;

-- =============================================
-- 5. 批量更新(增量批量更新)
-- =============================================
UPDATE ADMIN.FULL_TYPES SET NAME = CONCAT(NAME, '_updated_all') WHERE ID IN (2001,2002,2003);
COMMIT;

-- =============================================
-- 6. 单条删除(增量删除)
-- =============================================
DELETE FROM ADMIN.FULL_TYPES WHERE ID = 1002;
COMMIT;

-- =============================================
-- 7. 批量删除(增量批量删除)
-- =============================================
DELETE FROM ADMIN.FULL_TYPES WHERE ID IN (2,3);
COMMIT;

-- =============================================
-- 8. 先删再插(覆盖写入)
-- =============================================
DELETE FROM ADMIN.FULL_TYPES WHERE ID = 1;
INSERT INTO ADMIN.FULL_TYPES (ID, NAME) VALUES (1, 'replaced_data_001_new');
COMMIT;

-- =============================================
-- 9. 查询验证(Oracle 端查看最终数据)
-- =============================================
SELECT * FROM ADMIN.FULL_TYPES ORDER BY ID;

  1. PostgreSQL 端验证 cdc
-- 实时查看同步结果
SELECT * FROM public.full_types ORDER BY ID;

-- 统计条数
SELECT COUNT(*) FROM public.full_types;

任务运行:

2026-04-24 09:35:39,445 INFO  [o.a.s.e.s.CoordinatorService  ] [pool-7-thread-1] - [localhost]:5801 [seatunnel-395224] [5.1] 
***********************************************
     CoordinatorService Thread Pool Status
***********************************************
activeCount               :                   2
corePoolSize              :                  10
maximumPoolSize           :          2147483647
poolSize                  :                  10
completedTaskCount        :                1196
taskCount                 :                1198
***********************************************

2026-04-24 09:35:39,447 INFO  [o.a.s.e.s.CoordinatorService  ] [pool-7-thread-1] - [localhost]:5801 [seatunnel-395224] [5.1] 
***********************************************
                Job info detail
***********************************************
createdJobCount           :                   0
pendingJobCount           :                   0
scheduledJobCount         :                   0
runningJobCount           :                   1
failingJobCount           :                   0
failedJobCount            :                   0
cancellingJobCount        :                   0
canceledJobCount          :                   0
finishedJobCount          :                   0
***********************************************

2026-04-24 09:35:40,612 INFO  [.s.e.s.c.CheckpointCoordinator] [seatunnel-coordinator-service-9] - wait checkpoint completed: 199
2026-04-24 09:35:40,621 INFO  [.s.e.s.c.CheckpointCoordinator] [seatunnel-coordinator-service-9] - pending checkpoint(199/1@1099868735893667841) notify finished!
2026-04-24 09:35:40,621 INFO  [.s.e.s.c.CheckpointCoordinator] [seatunnel-coordinator-service-9] - start notify checkpoint completed, job id: 1099868735893667841, pipeline id: 1, checkpoint id:199
2026-04-24 09:35:43,197 INFO  [o.a.s.e.c.j.JobMetricsRunner  ] [job-metrics-runner-1099868735893667841] - 
***********************************************
           Job Progress Information
***********************************************
Job Id                    : 1099868735893667841
Read Count So Far         :                  23
Write Attempt Count So Far:                  23
Write Committed Count So Far:                  23
Commit Rate               :             100.00%
Average Read Count        :                 0/s
Average Write Attempt Count:                 0/s
Average Write Committed Count:                 0/s
Last Statistic Time       : 2026-04-24 09:34:43
Current Statistic Time    : 2026-04-24 09:35:43
***********************************************

2026-04-24 09:35:43,612 INFO  [.s.e.s.c.CheckpointCoordinator] [seatunnel-coordinator-service-9] - wait checkpoint completed: 200
2026-04-24 09:35:43,620 INFO  [.s.e.s.c.CheckpointCoordinator] [seatunnel-coordinator-service-9] - pending checkpoint(200/1@1099868735893667841) notify finished!
2026-04-24 09:35:43,621 INFO  [.s.e.s.c.CheckpointCoordinator] [seatunnel-coordinator-service-9] - start notify checkpoint completed, job id: 1099868735893667841, pipeline id: 1, checkpoint id:200

「喜欢这篇文章,您的关注和赞赏是给作者最好的鼓励」
关注作者
【版权声明】本文为墨天轮用户原创内容,转载时必须标注文章的来源(墨天轮),文章链接,文章作者等基本信息,否则作者和墨天轮有权追究责任。如果您发现墨天轮中有涉嫌抄袭或者侵权的内容,欢迎发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论