暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

Every Day of a DBA,第135期: pg-cdc 搞定 PG 数据库全量同步+增量实时同步

ByteHouse 2026-04-23
124

作者:bytehouse
Oracle ACE、PostgreSQL ACE
10+年数据库架构与运维实战经验
公众号:bytehouse
墨天轮专栏:bytehouse
CSDN:Young DBA

摘要:

在 **Every Day of a DBA,第134期: Apache SeaTunnel 2.3.13 部署文档(适配 Hadoop+Flink 集群)**https://www.modb.pro/db/2046418722001854464 , 尝试了 SeaTunnel 的全量数据同步,在本篇尝试完成增量数据的同步。详细的对比如下。

pg-cdc-to-pg

查看源数据库配置

  1. 当前库和用户
traffic=# SELECT current_database(), current_user;
 current_database | current_user 
------------------+--------------
 traffic          | postgres
(1 row)

traffic=# 

  1. 用户是否能登录、是否有复制权限、是否是超级用户
traffic=# SELECT rolname, rolcanlogin, rolreplication, rolsuper
FROM pg_roles
WHERE rolname = 'postgres';
 rolname  | rolcanlogin | rolreplication | rolsuper 
----------+-------------+----------------+----------
 postgres | t           | t              | t
(1 row)

traffic=# 

  1. 逻辑复制相关服务端参数
traffic=# SELECT name, setting, unit, pending_restart
FROM pg_settings
WHERE name IN ('wal_level', 'max_replication_slots', 'max_wal_senders');
         name          | setting | unit | pending_restart 
-----------------------+---------+------+-----------------
 max_replication_slots | 10      |      | f
 max_wal_senders       | 10      |      | f
 wal_level             | logical |      | f
(3 rows)

traffic=# 

  1. 当前用户对库、schema、表的权限
traffic=# SELECT
  has_database_privilege('postgres', 'traffic', 'CONNECT') AS db_connect,
  has_schema_privilege('postgres', 'public', 'USAGE') AS schema_usage,
  has_table_privilege('postgres', 'public.users', 'SELECT') AS table_select;
 db_connect | schema_usage | table_select 
------------+--------------+--------------
 t          | t            | t
(1 row)

traffic=# 

  1. 表是否有主键、Replica Identity 是否可用
traffic=# SELECT
  n.nspname AS schema_name,
  c.relname AS table_name,
  c.relreplident,
  CASE c.relreplident
    WHEN 'd' THEN 'DEFAULT'
    WHEN 'f' THEN 'FULL'
    WHEN 'i' THEN 'INDEX'
    WHEN 'n' THEN 'NOTHING'
  END AS replica_identity,
  EXISTS (
    SELECT 1
    FROM pg_index i
    WHERE i.indrelid = c.oid
      AND i.indisprimary
  ) AS has_primary_key
FROM pg_class c
JOIN pg_namespace n ON n.oid = c.relnamespace
WHERE n.nspname = 'public'
  AND c.relname = 'users';
 schema_name | table_name | relreplident | replica_identity | has_primary_key 
-------------+------------+--------------+------------------+-----------------
 public      | users      | f            | FULL             | t
(1 row)

traffic=# 

  1. publication 是否包含目标表
traffic=# SELECT pubname, schemaname, tablename
FROM pg_publication_tables
WHERE schemaname = 'public'
  AND tablename = 'users';
     pubname     | schemaname | tablename 
-----------------+------------+-----------
 dbz_publication | public     | users
(1 row)

traffic=# 

  1. publication 本身配置
traffic=# SELECT pubname, pubinsert, pubupdate, pubdelete, pubtruncate
FROM pg_publication;
     pubname     | pubinsert | pubupdate | pubdelete | pubtruncate 
-----------------+-----------+-----------+-----------+-------------
 dbz_publication | t         | t         | t         | t
(1 row)

traffic=# 

  1. slot 是否正确
traffic=# SELECT
  slot_name,
  plugin,
  slot_type,
  database,
  active,
  active_pid,
  restart_lsn,
  confirmed_flush_lsn
FROM pg_replication_slots
traffic-# ;
 slot_name  |  plugin  | slot_type | database | active | active_pid | restart_lsn | confirmed_flush_lsn 
------------+----------+-----------+----------+--------+------------+-------------+---------------------
 final_slot | pgoutput | logical   | traffic  | f      |            | 1/4D5A0CB0  | 1/4D5A0CE8
(1 row)

traffic=# 

增量同步输出到日志

步骤 1:先清理旧 slot(必须做)

SELECT pg_drop_replication_slot('final_slot');

步骤 2:新建一个干净的 slot

SELECT * FROM pg_create_logical_replication_slot('cdc_full_slot', 'pgoutput');

步骤 3:配置

env {
  execution.parallelism = 1
  job.mode = "STREAMING"
  checkpoint.interval = 5000
}

source {
  Postgres-CDC {
    username = "postgres"
    password = "123456"
    database-names = ["traffic"]
    schema-names = ["public"]
    table-names = ["traffic.public.users"]
    url = "jdbc:postgresql://192.168.56.105:5432/traffic"

    decoding.plugin.name = "pgoutput"
    slot.name = "final_slot"
    startup.mode = "latest"
    plugin_output = "out"
  }
}

sink {
  Console {
    plugin_input = ["out"]
  }
}

启动任务

[root@localhost apache-seatunnel-2.3.13]# sh bin/seatunnel.sh --config config/pgcdcpg.conf 
WARNING: sun.reflect.Reflection.getCallerClass is not supported. This will impact performance.
2026-04-23 10:53:02,773 INFO  [c.h.i.c.AbstractConfigLocator ] [main] - Loading configuration '/opt/apache-seatunnel-2.3.13/config/seatunnel.yaml' from System property 'seatunnel.config'
2026-04-23 10:53:02,780 INFO  [c.h.i.c.AbstractConfigLocator ] [main] - Using configuration file at /opt/apache-seatunnel-2.3.13/config/seatunnel.yaml
2026-04-23 10:53:02,781 INFO  [o.a.s.e.c.c.SeaTunnelConfig   ] [main] - seatunnel.home is /opt/apache-seatunnel-2.3.13
2026-04-23 10:53:03,083 INFO  [amlSeaTunnelDomConfigProcessor] [main] - Dynamic slot is enabled, the schedule strategy is set to REJECT
2026-04-23 10:53:03,084 INFO  [c.h.i.c.AbstractConfigLocator ] [main] - Loading configuration '/opt/apache-seatunnel-2.3.13/config/hazelcast.yaml' from System property 'hazelcast.config'
2026-04-23 10:53:03,084 INFO  [c.h.i.c.AbstractConfigLocator ] [main] - Using configuration file at /opt/apache-seatunnel-2.3.13/config/hazelcast.yaml
2026-04-23 10:53:03,957 INFO  [c.h.i.c.AbstractConfigLocator ] [main] - Loading configuration '/opt/apache-seatunnel-2.3.13/config/hazelcast-client.yaml' from System property 'hazelcast.client.config'
2026-04-23 10:53:03,958 INFO  [c.h.i.c.AbstractConfigLocator ] [main] - Using configuration file at /opt/apache-seatunnel-2.3.13/config/hazelcast-client.yaml
2026-04-23 10:53:04,496 INFO  [.c.i.s.ClientInvocationService] [main] - hz.client_1 [seatunnel] [5.1] Running with 2 response threads, dynamic=true
2026-04-23 10:53:04,607 INFO  [c.h.c.LifecycleService        ] [main] - hz.client_1 [seatunnel] [5.1] HazelcastClient 5.1 (20220228 - 21f20e7) is STARTING
2026-04-23 10:53:04,609 INFO  [c.h.c.LifecycleService        ] [main] - hz.client_1 [seatunnel] [5.1] HazelcastClient 5.1 (20220228 - 21f20e7) is STARTED
WARNING: An illegal reflective access operation has occurred
WARNING: Illegal reflective access by com.hazelcast.internal.networking.nio.SelectorOptimizer (file:/opt/apache-seatunnel-2.3.13/starter/seatunnel-starter.jar) to field sun.nio.ch.SelectorImpl.selectedKeys
WARNING: Please consider reporting this to the maintainers of com.hazelcast.internal.networking.nio.SelectorOptimizer
WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations
WARNING: All illegal access operations will be denied in a future release
2026-04-23 10:53:04,701 INFO  [.c.i.c.ClientConnectionManager] [main] - hz.client_1 [seatunnel] [5.1] Trying to connect to cluster: seatunnel
2026-04-23 10:53:04,705 INFO  [.c.i.c.ClientConnectionManager] [main] - hz.client_1 [seatunnel] [5.1] Trying to connect to [localhost]:5801
2026-04-23 10:53:04,781 INFO  [c.h.c.LifecycleService        ] [main] - hz.client_1 [seatunnel] [5.1] HazelcastClient 5.1 (20220228 - 21f20e7) is CLIENT_CONNECTED
2026-04-23 10:53:04,781 INFO  [.c.i.c.ClientConnectionManager] [main] - hz.client_1 [seatunnel] [5.1] Authenticated with server [localhost]:5801:59db9132-1c4f-4bbd-8620-7155fd9c6a33, server version: 5.1, local address: /127.0.0.1:53033
2026-04-23 10:53:04,794 INFO  [c.h.i.d.Diagnostics           ] [main] - hz.client_1 [seatunnel] [5.1] Diagnostics disabled. To enable add -Dhazelcast.diagnostics.enabled=true to the JVM arguments.
2026-04-23 10:53:04,860 INFO  [c.h.c.i.s.ClientClusterService] [hz.client_1.event-2] - hz.client_1 [seatunnel] [5.1] 

Members [1] {
	Member [localhost]:5801 - 59db9132-1c4f-4bbd-8620-7155fd9c6a33 [master node]
}

2026-04-23 10:53:04,943 INFO  [.c.i.s.ClientStatisticsService] [main] - Client statistics is enabled with period 5 seconds.
2026-04-23 10:53:05,331 INFO  [o.a.s.c.s.u.ConfigBuilder     ] [main] - Loading config file from path: config/pgcdcpg.conf
2026-04-23 10:53:05,500 INFO  [o.a.s.c.s.u.ConfigShadeUtils  ] [main] - Load config shade spi: [base64]
2026-04-23 10:53:05,703 INFO  [o.a.s.c.s.u.ConfigBuilder     ] [main] - Parsed config file: 
{
    "env" : {
        "execution.parallelism" : 1,
        "job.mode" : "STREAMING",
        "checkpoint.interval" : 5000
    },
    "source" : [
        {
            "username" : "******",
            "password" : "******",
            "database-names" : [
                "traffic"
            ],
            "schema-names" : [
                "public"
            ],
            "table-names" : [
                "traffic.public.users"
            ],
            "url" : "jdbc:postgresql://192.168.56.105:5432/traffic",
            "decoding.plugin.name" : "pgoutput",
            "slot.name" : "final_slot",
            "startup.mode" : "latest",
            "plugin_output" : "out",
            "plugin_name" : "Postgres-CDC"
        }
    ],
    "sink" : [
        {
            "plugin_input" : [
                "out"
            ],
            "plugin_name" : "Console"
        }
    ],
    "transform" : []
}

2026-04-23 10:53:05,735 INFO  [p.MultipleTableJobConfigParser] [main] - add common jar in plugins :[]
2026-04-23 10:53:05,776 INFO  [.s.p.d.AbstractPluginDiscovery] [main] - Load SeaTunnelSink Plugin from /opt/apache-seatunnel-2.3.13/connectors
2026-04-23 10:53:05,820 INFO  [.s.p.d.AbstractPluginDiscovery] [main] - Discovery plugin jar for: PluginIdentifier{engineType='seatunnel', pluginType='source', pluginName='Postgres-CDC'} at: [file:/opt/apache-seatunnel-2.3.13/connectors/connector-cdc-base-2.3.13.jar, file:/opt/apache-seatunnel-2.3.13/connectors/connector-cdc-postgres-2.3.13.jar]
2026-04-23 10:53:05,820 INFO  [.s.p.d.AbstractPluginDiscovery] [main] - find connector jar and dependency for PluginIdentifier{engineType='seatunnel', pluginType='source', pluginName='Postgres-CDC'}: [file:/opt/apache-seatunnel-2.3.13/connectors/connector-cdc-base-2.3.13.jar, file:/opt/apache-seatunnel-2.3.13/connectors/connector-cdc-postgres-2.3.13.jar]
2026-04-23 10:53:05,832 INFO  [.s.p.d.AbstractPluginDiscovery] [main] - Load SeaTunnelSink Plugin from /opt/apache-seatunnel-2.3.13/connectors
2026-04-23 10:53:05,838 INFO  [.s.p.d.AbstractPluginDiscovery] [main] - Load SeaTunnelSink Plugin from /opt/apache-seatunnel-2.3.13/connectors
2026-04-23 10:53:05,839 INFO  [.s.p.d.AbstractPluginDiscovery] [main] - Discovery plugin jar for: PluginIdentifier{engineType='seatunnel', pluginType='sink', pluginName='Console'} at: [file:/opt/apache-seatunnel-2.3.13/connectors/connector-console-2.3.13.jar]
2026-04-23 10:53:05,839 INFO  [.s.p.d.AbstractPluginDiscovery] [main] - find connector jar and dependency for PluginIdentifier{engineType='seatunnel', pluginType='sink', pluginName='Console'}: [file:/opt/apache-seatunnel-2.3.13/connectors/connector-console-2.3.13.jar]
2026-04-23 10:53:05,864 INFO  [p.MultipleTableJobConfigParser] [main] - start generating all sources.
2026-04-23 10:53:06,488 INFO  [.s.c.s.j.c.AbstractJdbcCatalog] [main] - Catalog Postgres established connection to jdbc:postgresql://192.168.56.105:5432/traffic
2026-04-23 10:53:06,620 INFO  [o.a.s.a.t.c.CatalogTableUtil  ] [main] - Get catalog tables, cost time: 610 ms
2026-04-23 10:53:06,621 INFO  [.s.c.s.j.c.AbstractJdbcCatalog] [main] - Catalog Postgres closing
2026-04-23 10:53:07,034 INFO  [i.d.j.JdbcConnection          ] [pool-2-thread-1] - Connection gracefully closed
2026-04-23 10:53:07,231 INFO  [.c.s.c.p.u.TableDiscoveryUtils] [main] - Read list of available databases
2026-04-23 10:53:07,236 INFO  [.c.s.c.p.u.TableDiscoveryUtils] [main] - 	 list of available databases is: [postgres, template1, template0, pg_monitor, iot, thermo_monitor, adg_monitor, bigdata, oracle_monitor, traffic]
2026-04-23 10:53:07,236 INFO  [.c.s.c.p.u.TableDiscoveryUtils] [main] - Read list of available tables in each database
2026-04-23 10:53:07,247 WARN  [.c.s.c.p.u.TableDiscoveryUtils] [main] - 	 skipping database 'postgres' due to error reading tables: ERROR: cross-database references are not implemented: "postgres.information_schema.tables"
  Position: 15
2026-04-23 10:53:07,255 WARN  [.c.s.c.p.u.TableDiscoveryUtils] [main] - 	 skipping database 'template1' due to error reading tables: ERROR: cross-database references are not implemented: "template1.information_schema.tables"
  Position: 15
2026-04-23 10:53:07,263 WARN  [.c.s.c.p.u.TableDiscoveryUtils] [main] - 	 skipping database 'template0' due to error reading tables: ERROR: cross-database references are not implemented: "template0.information_schema.tables"
  Position: 15
2026-04-23 10:53:07,274 WARN  [.c.s.c.p.u.TableDiscoveryUtils] [main] - 	 skipping database 'pg_monitor' due to error reading tables: ERROR: cross-database references are not implemented: "pg_monitor.information_schema.tables"
  Position: 15
2026-04-23 10:53:07,282 WARN  [.c.s.c.p.u.TableDiscoveryUtils] [main] - 	 skipping database 'iot' due to error reading tables: ERROR: cross-database references are not implemented: "iot.information_schema.tables"
  Position: 15
2026-04-23 10:53:07,283 WARN  [.c.s.c.p.u.TableDiscoveryUtils] [main] - 	 skipping database 'thermo_monitor' due to error reading tables: ERROR: cross-database references are not implemented: "thermo_monitor.information_schema.tables"
  Position: 15
2026-04-23 10:53:07,287 WARN  [.c.s.c.p.u.TableDiscoveryUtils] [main] - 	 skipping database 'adg_monitor' due to error reading tables: ERROR: cross-database references are not implemented: "adg_monitor.information_schema.tables"
  Position: 15
2026-04-23 10:53:07,290 WARN  [.c.s.c.p.u.TableDiscoveryUtils] [main] - 	 skipping database 'bigdata' due to error reading tables: ERROR: cross-database references are not implemented: "bigdata.information_schema.tables"
  Position: 15
2026-04-23 10:53:07,292 WARN  [.c.s.c.p.u.TableDiscoveryUtils] [main] - 	 skipping database 'oracle_monitor' due to error reading tables: ERROR: cross-database references are not implemented: "oracle_monitor.information_schema.tables"
  Position: 15
2026-04-23 10:53:07,296 INFO  [.c.s.c.p.u.TableDiscoveryUtils] [main] - 	 'traffic.public.test_users' is filtered out of capturing
2026-04-23 10:53:07,297 INFO  [.c.s.c.p.u.TableDiscoveryUtils] [main] - 	 including 'traffic.public.users' for further processing
2026-04-23 10:53:07,297 INFO  [.c.s.c.p.u.TableDiscoveryUtils] [main] - 	 'traffic.pg_catalog.pg_statistic' is filtered out of capturing
2026-04-23 10:53:07,297 INFO  [.c.s.c.p.u.TableDiscoveryUtils] [main] - 	 'traffic.pg_catalog.pg_type' is filtered out of capturing
2026-04-23 10:53:07,298 INFO  [.c.s.c.p.u.TableDiscoveryUtils] [main] - 	 'traffic.pg_catalog.pg_foreign_table' is filtered out of capturing
2026-04-23 10:53:07,298 INFO  [.c.s.c.p.u.TableDiscoveryUtils] [main] - 	 'traffic.pg_catalog.pg_authid' is filtered out of capturing
2026-04-23 10:53:07,298 INFO  [.c.s.c.p.u.TableDiscoveryUtils] [main] - 	 'traffic.pg_catalog.pg_statistic_ext_data' is filtered out of capturing
2026-04-23 10:53:07,298 INFO  [.c.s.c.p.u.TableDiscoveryUtils] [main] - 	 'traffic.pg_catalog.pg_user_mapping' is filtered out of capturing
2026-04-23 10:53:07,298 INFO  [.c.s.c.p.u.TableDiscoveryUtils] [main] - 	 'traffic.pg_catalog.pg_subscription' is filtered out of capturing
2026-04-23 10:53:07,298 INFO  [.c.s.c.p.u.TableDiscoveryUtils] [main] - 	 'traffic.pg_catalog.pg_attribute' is filtered out of capturing
2026-04-23 10:53:07,298 INFO  [.c.s.c.p.u.TableDiscoveryUtils] [main] - 	 'traffic.pg_catalog.pg_proc' is filtered out of capturing
2026-04-23 10:53:07,298 INFO  [.c.s.c.p.u.TableDiscoveryUtils] [main] - 	 'traffic.pg_catalog.pg_class' is filtered out of capturing
2026-04-23 10:53:07,298 INFO  [.c.s.c.p.u.TableDiscoveryUtils] [main] - 	 'traffic.pg_catalog.pg_attrdef' is filtered out of capturing
2026-04-23 10:53:07,298 INFO  [.c.s.c.p.u.TableDiscoveryUtils] [main] - 	 'traffic.pg_catalog.pg_constraint' is filtered out of capturing
2026-04-23 10:53:07,298 INFO  [.c.s.c.p.u.TableDiscoveryUtils] [main] - 	 'traffic.pg_catalog.pg_inherits' is filtered out of capturing
2026-04-23 10:53:07,298 INFO  [.c.s.c.p.u.TableDiscoveryUtils] [main] - 	 'traffic.pg_catalog.pg_index' is filtered out of capturing
2026-04-23 10:53:07,298 INFO  [.c.s.c.p.u.TableDiscoveryUtils] [main] - 	 'traffic.pg_catalog.pg_operator' is filtered out of capturing
2026-04-23 10:53:07,298 INFO  [.c.s.c.p.u.TableDiscoveryUtils] [main] - 	 'traffic.pg_catalog.pg_opfamily' is filtered out of capturing
2026-04-23 10:53:07,298 INFO  [.c.s.c.p.u.TableDiscoveryUtils] [main] - 	 'traffic.pg_catalog.pg_opclass' is filtered out of capturing
2026-04-23 10:53:07,299 INFO  [.c.s.c.p.u.TableDiscoveryUtils] [main] - 	 'traffic.pg_catalog.pg_am' is filtered out of capturing
2026-04-23 10:53:07,299 INFO  [.c.s.c.p.u.TableDiscoveryUtils] [main] - 	 'traffic.pg_catalog.pg_amop' is filtered out of capturing
2026-04-23 10:53:07,299 INFO  [.c.s.c.p.u.TableDiscoveryUtils] [main] - 	 'traffic.pg_catalog.pg_amproc' is filtered out of capturing
2026-04-23 10:53:07,299 INFO  [.c.s.c.p.u.TableDiscoveryUtils] [main] - 	 'traffic.pg_catalog.pg_language' is filtered out of capturing
2026-04-23 10:53:07,299 INFO  [.c.s.c.p.u.TableDiscoveryUtils] [main] - 	 'traffic.pg_catalog.pg_largeobject_metadata' is filtered out of capturing
2026-04-23 10:53:07,299 INFO  [.c.s.c.p.u.TableDiscoveryUtils] [main] - 	 'traffic.pg_catalog.pg_aggregate' is filtered out of capturing
2026-04-23 10:53:07,299 INFO  [.c.s.c.p.u.TableDiscoveryUtils] [main] - 	 'traffic.pg_catalog.pg_statistic_ext' is filtered out of capturing
2026-04-23 10:53:07,299 INFO  [.c.s.c.p.u.TableDiscoveryUtils] [main] - 	 'traffic.pg_catalog.pg_rewrite' is filtered out of capturing
2026-04-23 10:53:07,299 INFO  [.c.s.c.p.u.TableDiscoveryUtils] [main] - 	 'traffic.pg_catalog.pg_trigger' is filtered out of capturing
2026-04-23 10:53:07,299 INFO  [.c.s.c.p.u.TableDiscoveryUtils] [main] - 	 'traffic.pg_catalog.pg_event_trigger' is filtered out of capturing
2026-04-23 10:53:07,299 INFO  [.c.s.c.p.u.TableDiscoveryUtils] [main] - 	 'traffic.pg_catalog.pg_description' is filtered out of capturing
2026-04-23 10:53:07,299 INFO  [.c.s.c.p.u.TableDiscoveryUtils] [main] - 	 'traffic.pg_catalog.pg_cast' is filtered out of capturing
2026-04-23 10:53:07,299 INFO  [.c.s.c.p.u.TableDiscoveryUtils] [main] - 	 'traffic.pg_catalog.pg_enum' is filtered out of capturing
2026-04-23 10:53:07,299 INFO  [.c.s.c.p.u.TableDiscoveryUtils] [main] - 	 'traffic.pg_catalog.pg_namespace' is filtered out of capturing
2026-04-23 10:53:07,299 INFO  [.c.s.c.p.u.TableDiscoveryUtils] [main] - 	 'traffic.pg_catalog.pg_conversion' is filtered out of capturing
2026-04-23 10:53:07,299 INFO  [.c.s.c.p.u.TableDiscoveryUtils] [main] - 	 'traffic.pg_catalog.pg_depend' is filtered out of capturing
2026-04-23 10:53:07,299 INFO  [.c.s.c.p.u.TableDiscoveryUtils] [main] - 	 'traffic.pg_catalog.pg_database' is filtered out of capturing
2026-04-23 10:53:07,299 INFO  [.c.s.c.p.u.TableDiscoveryUtils] [main] - 	 'traffic.pg_catalog.pg_db_role_setting' is filtered out of capturing
2026-04-23 10:53:07,299 INFO  [.c.s.c.p.u.TableDiscoveryUtils] [main] - 	 'traffic.pg_catalog.pg_tablespace' is filtered out of capturing
2026-04-23 10:53:07,299 INFO  [.c.s.c.p.u.TableDiscoveryUtils] [main] - 	 'traffic.pg_catalog.pg_auth_members' is filtered out of capturing
2026-04-23 10:53:07,300 INFO  [.c.s.c.p.u.TableDiscoveryUtils] [main] - 	 'traffic.pg_catalog.pg_shdepend' is filtered out of capturing
2026-04-23 10:53:07,300 INFO  [.c.s.c.p.u.TableDiscoveryUtils] [main] - 	 'traffic.pg_catalog.pg_shdescription' is filtered out of capturing
2026-04-23 10:53:07,300 INFO  [.c.s.c.p.u.TableDiscoveryUtils] [main] - 	 'traffic.pg_catalog.pg_ts_config' is filtered out of capturing
2026-04-23 10:53:07,300 INFO  [.c.s.c.p.u.TableDiscoveryUtils] [main] - 	 'traffic.pg_catalog.pg_ts_config_map' is filtered out of capturing
2026-04-23 10:53:07,300 INFO  [.c.s.c.p.u.TableDiscoveryUtils] [main] - 	 'traffic.pg_catalog.pg_ts_dict' is filtered out of capturing
2026-04-23 10:53:07,300 INFO  [.c.s.c.p.u.TableDiscoveryUtils] [main] - 	 'traffic.pg_catalog.pg_ts_parser' is filtered out of capturing
2026-04-23 10:53:07,308 INFO  [.c.s.c.p.u.TableDiscoveryUtils] [main] - 	 'traffic.pg_catalog.pg_ts_template' is filtered out of capturing
2026-04-23 10:53:07,308 INFO  [.c.s.c.p.u.TableDiscoveryUtils] [main] - 	 'traffic.pg_catalog.pg_extension' is filtered out of capturing
2026-04-23 10:53:07,308 INFO  [.c.s.c.p.u.TableDiscoveryUtils] [main] - 	 'traffic.pg_catalog.pg_foreign_data_wrapper' is filtered out of capturing
2026-04-23 10:53:07,308 INFO  [.c.s.c.p.u.TableDiscoveryUtils] [main] - 	 'traffic.pg_catalog.pg_foreign_server' is filtered out of capturing
2026-04-23 10:53:07,308 INFO  [.c.s.c.p.u.TableDiscoveryUtils] [main] - 	 'traffic.pg_catalog.pg_policy' is filtered out of capturing
2026-04-23 10:53:07,308 INFO  [.c.s.c.p.u.TableDiscoveryUtils] [main] - 	 'traffic.pg_catalog.pg_replication_origin' is filtered out of capturing
2026-04-23 10:53:07,308 INFO  [.c.s.c.p.u.TableDiscoveryUtils] [main] - 	 'traffic.pg_catalog.pg_default_acl' is filtered out of capturing
2026-04-23 10:53:07,308 INFO  [.c.s.c.p.u.TableDiscoveryUtils] [main] - 	 'traffic.pg_catalog.pg_init_privs' is filtered out of capturing
2026-04-23 10:53:07,309 INFO  [.c.s.c.p.u.TableDiscoveryUtils] [main] - 	 'traffic.pg_catalog.pg_seclabel' is filtered out of capturing
2026-04-23 10:53:07,309 INFO  [.c.s.c.p.u.TableDiscoveryUtils] [main] - 	 'traffic.pg_catalog.pg_shseclabel' is filtered out of capturing
2026-04-23 10:53:07,309 INFO  [.c.s.c.p.u.TableDiscoveryUtils] [main] - 	 'traffic.pg_catalog.pg_collation' is filtered out of capturing
2026-04-23 10:53:07,309 INFO  [.c.s.c.p.u.TableDiscoveryUtils] [main] - 	 'traffic.pg_catalog.pg_parameter_acl' is filtered out of capturing
2026-04-23 10:53:07,309 INFO  [.c.s.c.p.u.TableDiscoveryUtils] [main] - 	 'traffic.pg_catalog.pg_partitioned_table' is filtered out of capturing
2026-04-23 10:53:07,309 INFO  [.c.s.c.p.u.TableDiscoveryUtils] [main] - 	 'traffic.pg_catalog.pg_range' is filtered out of capturing
2026-04-23 10:53:07,309 INFO  [.c.s.c.p.u.TableDiscoveryUtils] [main] - 	 'traffic.pg_catalog.pg_transform' is filtered out of capturing
2026-04-23 10:53:07,309 INFO  [.c.s.c.p.u.TableDiscoveryUtils] [main] - 	 'traffic.pg_catalog.pg_sequence' is filtered out of capturing
2026-04-23 10:53:07,309 INFO  [.c.s.c.p.u.TableDiscoveryUtils] [main] - 	 'traffic.pg_catalog.pg_publication' is filtered out of capturing
2026-04-23 10:53:07,309 INFO  [.c.s.c.p.u.TableDiscoveryUtils] [main] - 	 'traffic.pg_catalog.pg_publication_namespace' is filtered out of capturing
2026-04-23 10:53:07,310 INFO  [.c.s.c.p.u.TableDiscoveryUtils] [main] - 	 'traffic.pg_catalog.pg_publication_rel' is filtered out of capturing
2026-04-23 10:53:07,310 INFO  [.c.s.c.p.u.TableDiscoveryUtils] [main] - 	 'traffic.pg_catalog.pg_subscription_rel' is filtered out of capturing
2026-04-23 10:53:07,310 INFO  [.c.s.c.p.u.TableDiscoveryUtils] [main] - 	 'traffic.pg_catalog.pg_largeobject' is filtered out of capturing
2026-04-23 10:53:07,310 INFO  [.c.s.c.p.u.TableDiscoveryUtils] [main] - 	 'traffic.information_schema.sql_parts' is filtered out of capturing
2026-04-23 10:53:07,310 INFO  [.c.s.c.p.u.TableDiscoveryUtils] [main] - 	 'traffic.information_schema.sql_features' is filtered out of capturing
2026-04-23 10:53:07,310 INFO  [.c.s.c.p.u.TableDiscoveryUtils] [main] - 	 'traffic.information_schema.sql_implementation_info' is filtered out of capturing
2026-04-23 10:53:07,310 INFO  [.c.s.c.p.u.TableDiscoveryUtils] [main] - 	 'traffic.information_schema.sql_sizing' is filtered out of capturing
2026-04-23 10:53:07,338 INFO  [i.d.j.JdbcConnection          ] [pool-3-thread-1] - Connection gracefully closed
2026-04-23 10:53:07,431 INFO  [i.d.j.JdbcConnection          ] [pool-4-thread-1] - Connection gracefully closed
2026-04-23 10:53:07,520 INFO  [.a.s.c.c.b.u.CatalogTableUtils] [main] - Override primary key([id]) for catalog table traffic.public.users
2026-04-23 10:53:07,530 INFO  [i.d.j.JdbcConnection          ] [pool-5-thread-1] - Connection gracefully closed
2026-04-23 10:53:07,728 INFO  [o.a.k.c.j.JsonConverterConfig ] [main] - JsonConverterConfig values: 
	converter.type = value
	decimal.format = BASE64
	schemas.cache.size = 1000
	schemas.enable = true

2026-04-23 10:53:07,760 INFO  [o.a.s.a.t.f.FactoryUtil       ] [main] - get the CatalogTable from source Postgres-CDC: Postgres.traffic.public.users
2026-04-23 10:53:07,777 INFO  [.s.p.d.AbstractPluginDiscovery] [main] - Load SeaTunnelSource Plugin from /opt/apache-seatunnel-2.3.13/connectors
2026-04-23 10:53:07,782 INFO  [.s.p.d.AbstractPluginDiscovery] [main] - Discovery plugin jar for: PluginIdentifier{engineType='seatunnel', pluginType='source', pluginName='Postgres-CDC'} at: [file:/opt/apache-seatunnel-2.3.13/connectors/connector-cdc-base-2.3.13.jar, file:/opt/apache-seatunnel-2.3.13/connectors/connector-cdc-postgres-2.3.13.jar]
2026-04-23 10:53:07,782 INFO  [.s.p.d.AbstractPluginDiscovery] [main] - find connector jar and dependency for PluginIdentifier{engineType='seatunnel', pluginType='source', pluginName='Postgres-CDC'}: [file:/opt/apache-seatunnel-2.3.13/connectors/connector-cdc-base-2.3.13.jar, file:/opt/apache-seatunnel-2.3.13/connectors/connector-cdc-postgres-2.3.13.jar]
2026-04-23 10:53:07,818 INFO  [p.MultipleTableJobConfigParser] [main] - start generating all transforms.
2026-04-23 10:53:07,818 INFO  [p.MultipleTableJobConfigParser] [main] - start generating all sinks.
2026-04-23 10:53:07,824 INFO  [.s.p.d.AbstractPluginDiscovery] [main] - Load SeaTunnelSink Plugin from /opt/apache-seatunnel-2.3.13/connectors
2026-04-23 10:53:07,825 INFO  [.s.p.d.AbstractPluginDiscovery] [main] - Discovery plugin jar for: PluginIdentifier{engineType='seatunnel', pluginType='sink', pluginName='Console'} at: [file:/opt/apache-seatunnel-2.3.13/connectors/connector-console-2.3.13.jar]
2026-04-23 10:53:07,825 INFO  [.s.p.d.AbstractPluginDiscovery] [main] - find connector jar and dependency for PluginIdentifier{engineType='seatunnel', pluginType='sink', pluginName='Console'}: [file:/opt/apache-seatunnel-2.3.13/connectors/connector-console-2.3.13.jar]
2026-04-23 10:53:07,840 INFO  [o.a.s.a.t.f.FactoryUtil       ] [main] - Create sink 'Console' with upstream input catalog-table[database: traffic, schema: public, table: users]
2026-04-23 10:53:08,027 INFO  [o.a.s.e.c.j.ClientJobProxy    ] [main] - Start submit job, job id: 1099528343327342593, with plugin jar [file:/opt/apache-seatunnel-2.3.13/connectors/connector-cdc-postgres-2.3.13.jar, file:/opt/apache-seatunnel-2.3.13/connectors/connector-cdc-base-2.3.13.jar, file:/opt/apache-seatunnel-2.3.13/connectors/connector-console-2.3.13.jar]
2026-04-23 10:53:08,098 INFO  [o.a.s.e.c.j.ClientJobProxy    ] [main] - Submit job finished, job id: 1099528343327342593, job name: SeaTunnel_Job
2026-04-23 10:53:08,161 WARN  [o.a.s.e.c.j.JobMetricsRunner  ] [job-metrics-runner-1099528343327342593] - Failed to get job metrics summary, it maybe first-run
2026-04-23 10:54:08,505 INFO  [o.a.s.e.c.j.JobMetricsRunner  ] [job-metrics-runner-1099528343327342593] - 
***********************************************
           Job Progress Information
***********************************************
Job Id                    : 1099528343327342593
Read Count So Far         :                   3
Write Attempt Count So Far:                   3
Write Committed Count So Far:                   3
Commit Rate               :             100.00%
Average Read Count        :                 0/s
Average Write Attempt Count:                 0/s
Average Write Committed Count:                 0/s
Last Statistic Time       : 2026-04-23 10:53:08
Current Statistic Time    : 2026-04-23 10:54:08
***********************************************

2026-04-23 10:55:08,116 INFO  [o.a.s.e.c.j.JobMetricsRunner  ] [job-metrics-runner-1099528343327342593] - 
***********************************************
           Job Progress Information
***********************************************
Job Id                    : 1099528343327342593
Read Count So Far         :                   3
Write Attempt Count So Far:                   3
Write Committed Count So Far:                   3
Commit Rate               :             100.00%
Average Read Count        :                 0/s
Average Write Attempt Count:                 0/s
Average Write Committed Count:                 0/s
Last Statistic Time       : 2026-04-23 10:54:08
Current Statistic Time    : 2026-04-23 10:55:08
***********************************************

2026-04-23 10:56:08,112 INFO  [o.a.s.e.c.j.JobMetricsRunner  ] [job-metrics-runner-1099528343327342593] - 
***********************************************
           Job Progress Information
***********************************************
Job Id                    : 1099528343327342593
Read Count So Far         :                   3
Write Attempt Count So Far:                   3
Write Committed Count So Far:                   3
Commit Rate               :             100.00%
Average Read Count        :                 0/s
Average Write Attempt Count:                 0/s
Average Write Committed Count:                 0/s
Last Statistic Time       : 2026-04-23 10:55:08
Current Statistic Time    : 2026-04-23 10:56:08
***********************************************

增量同步

步骤 1:先清理旧 slot(必须做)

SELECT pg_drop_replication_slot('final_slot');

步骤 2:新建一个干净的 slot

SELECT * FROM pg_create_logical_replication_slot('cdc_full_slot', 'pgoutput');

步骤 3:配置

[root@localhost apache-seatunnel-2.3.13]# cat config/pgcdcpg.conf 
env {
  execution.parallelism = 1
  job.mode = "STREAMING"
  checkpoint.interval = 5000
}

source {
  Postgres-CDC {
    username = "postgres"
    password = "123456"
    database-names = ["traffic"]
    schema-names = ["public"]
    table-names = ["traffic.public.users"]
    url = "jdbc:postgresql://192.168.56.105:5432/traffic"

    decoding.plugin.name = "pgoutput"
    slot.name = "final_slot"
    startup.mode = "latest"
    plugin_output = "out"
  }
}

sink {
  Jdbc {
    plugin_input = ["out"]
    driver = "org.postgresql.Driver"
    url = "jdbc:postgresql://192.168.56.105:5432/traffic"
    user = "postgres"
    password = "123456"

    # 必加,修复你最后报错!
    database = "traffic"
    table = "public.test_users"
    primary_keys = ["id"]
    
    # 稳定兼容参数
    generate_sink_sql = true
    support_upsert = true
    batch_size = 100
    batch_interval_ms = 1000
  }
}

[root@localhost apache-seatunnel-2.3.13]# 

启动任务

[root@localhost apache-seatunnel-2.3.13]# sh bin/seatunnel.sh --config config/pgcdcpg.conf 
WARNING: sun.reflect.Reflection.getCallerClass is not supported. This will impact performance.
2026-04-23 11:00:51,772 INFO  [c.h.i.c.AbstractConfigLocator ] [main] - Loading configuration '/opt/apache-seatunnel-2.3.13/config/seatunnel.yaml' from System property 'seatunnel.config'
2026-04-23 11:00:51,775 INFO  [c.h.i.c.AbstractConfigLocator ] [main] - Using configuration file at /opt/apache-seatunnel-2.3.13/config/seatunnel.yaml
2026-04-23 11:00:51,777 INFO  [o.a.s.e.c.c.SeaTunnelConfig   ] [main] - seatunnel.home is /opt/apache-seatunnel-2.3.13
2026-04-23 11:00:51,923 INFO  [amlSeaTunnelDomConfigProcessor] [main] - Dynamic slot is enabled, the schedule strategy is set to REJECT
2026-04-23 11:00:51,923 INFO  [c.h.i.c.AbstractConfigLocator ] [main] - Loading configuration '/opt/apache-seatunnel-2.3.13/config/hazelcast.yaml' from System property 'hazelcast.config'
2026-04-23 11:00:51,923 INFO  [c.h.i.c.AbstractConfigLocator ] [main] - Using configuration file at /opt/apache-seatunnel-2.3.13/config/hazelcast.yaml
2026-04-23 11:00:52,492 INFO  [c.h.i.c.AbstractConfigLocator ] [main] - Loading configuration '/opt/apache-seatunnel-2.3.13/config/hazelcast-client.yaml' from System property 'hazelcast.client.config'
2026-04-23 11:00:52,492 INFO  [c.h.i.c.AbstractConfigLocator ] [main] - Using configuration file at /opt/apache-seatunnel-2.3.13/config/hazelcast-client.yaml
2026-04-23 11:00:52,784 INFO  [.c.i.s.ClientInvocationService] [main] - hz.client_1 [seatunnel] [5.1] Running with 2 response threads, dynamic=true
2026-04-23 11:00:52,856 INFO  [c.h.c.LifecycleService        ] [main] - hz.client_1 [seatunnel] [5.1] HazelcastClient 5.1 (20220228 - 21f20e7) is STARTING
2026-04-23 11:00:52,866 INFO  [c.h.c.LifecycleService        ] [main] - hz.client_1 [seatunnel] [5.1] HazelcastClient 5.1 (20220228 - 21f20e7) is STARTED
WARNING: An illegal reflective access operation has occurred
WARNING: Illegal reflective access by com.hazelcast.internal.networking.nio.SelectorOptimizer (file:/opt/apache-seatunnel-2.3.13/starter/seatunnel-starter.jar) to field sun.nio.ch.SelectorImpl.selectedKeys
WARNING: Please consider reporting this to the maintainers of com.hazelcast.internal.networking.nio.SelectorOptimizer
WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations
WARNING: All illegal access operations will be denied in a future release
2026-04-23 11:00:52,911 INFO  [.c.i.c.ClientConnectionManager] [main] - hz.client_1 [seatunnel] [5.1] Trying to connect to cluster: seatunnel
2026-04-23 11:00:52,914 INFO  [.c.i.c.ClientConnectionManager] [main] - hz.client_1 [seatunnel] [5.1] Trying to connect to [localhost]:5801
2026-04-23 11:00:52,978 INFO  [c.h.c.LifecycleService        ] [main] - hz.client_1 [seatunnel] [5.1] HazelcastClient 5.1 (20220228 - 21f20e7) is CLIENT_CONNECTED
2026-04-23 11:00:52,978 INFO  [.c.i.c.ClientConnectionManager] [main] - hz.client_1 [seatunnel] [5.1] Authenticated with server [localhost]:5801:59db9132-1c4f-4bbd-8620-7155fd9c6a33, server version: 5.1, local address: /127.0.0.1:52244
2026-04-23 11:00:52,990 INFO  [c.h.i.d.Diagnostics           ] [main] - hz.client_1 [seatunnel] [5.1] Diagnostics disabled. To enable add -Dhazelcast.diagnostics.enabled=true to the JVM arguments.
2026-04-23 11:00:53,011 INFO  [c.h.c.i.s.ClientClusterService] [hz.client_1.event-1] - hz.client_1 [seatunnel] [5.1] 

Members [1] {
	Member [localhost]:5801 - 59db9132-1c4f-4bbd-8620-7155fd9c6a33 [master node]
}

2026-04-23 11:00:53,035 INFO  [.c.i.s.ClientStatisticsService] [main] - Client statistics is enabled with period 5 seconds.
2026-04-23 11:00:53,252 INFO  [o.a.s.c.s.u.ConfigBuilder     ] [main] - Loading config file from path: config/pgcdcpg.conf
2026-04-23 11:00:53,372 INFO  [o.a.s.c.s.u.ConfigShadeUtils  ] [main] - Load config shade spi: [base64]
2026-04-23 11:00:53,451 INFO  [o.a.s.c.s.u.ConfigBuilder     ] [main] - Parsed config file: 
{
    "env" : {
        "execution.parallelism" : 1,
        "job.mode" : "STREAMING",
        "checkpoint.interval" : 5000
    },
    "source" : [
        {
            "username" : "******",
            "password" : "******",
            "database-names" : [
                "traffic"
            ],
            "schema-names" : [
                "public"
            ],
            "table-names" : [
                "traffic.public.users"
            ],
            "url" : "jdbc:postgresql://192.168.56.105:5432/traffic",
            "decoding.plugin.name" : "pgoutput",
            "slot.name" : "final_slot",
            "startup.mode" : "latest",
            "plugin_output" : "out",
            "plugin_name" : "Postgres-CDC"
        }
    ],
    "sink" : [
        {
            "plugin_input" : [
                "out"
            ],
            "driver" : "org.postgresql.Driver",
            "url" : "jdbc:postgresql://192.168.56.105:5432/traffic",
            "user" : "postgres",
            "password" : "******",
            "database" : "traffic",
            "table" : "public.test_users",
            "primary_keys" : [
                "id"
            ],
            "generate_sink_sql" : true,
            "support_upsert" : true,
            "batch_size" : 100,
            "batch_interval_ms" : 1000,
            "plugin_name" : "Jdbc"
        }
    ],
    "transform" : []
}

2026-04-23 11:00:53,477 INFO  [p.MultipleTableJobConfigParser] [main] - add common jar in plugins :[]
2026-04-23 11:00:53,505 INFO  [.s.p.d.AbstractPluginDiscovery] [main] - Load SeaTunnelSink Plugin from /opt/apache-seatunnel-2.3.13/connectors
2026-04-23 11:00:53,535 INFO  [.s.p.d.AbstractPluginDiscovery] [main] - Discovery plugin jar for: PluginIdentifier{engineType='seatunnel', pluginType='source', pluginName='Postgres-CDC'} at: [file:/opt/apache-seatunnel-2.3.13/connectors/connector-cdc-base-2.3.13.jar, file:/opt/apache-seatunnel-2.3.13/connectors/connector-cdc-postgres-2.3.13.jar]
2026-04-23 11:00:53,536 INFO  [.s.p.d.AbstractPluginDiscovery] [main] - find connector jar and dependency for PluginIdentifier{engineType='seatunnel', pluginType='source', pluginName='Postgres-CDC'}: [file:/opt/apache-seatunnel-2.3.13/connectors/connector-cdc-base-2.3.13.jar, file:/opt/apache-seatunnel-2.3.13/connectors/connector-cdc-postgres-2.3.13.jar]
2026-04-23 11:00:53,540 INFO  [.s.p.d.AbstractPluginDiscovery] [main] - Load SeaTunnelSink Plugin from /opt/apache-seatunnel-2.3.13/connectors
2026-04-23 11:00:53,544 INFO  [.s.p.d.AbstractPluginDiscovery] [main] - Load SeaTunnelSink Plugin from /opt/apache-seatunnel-2.3.13/connectors
2026-04-23 11:00:53,545 INFO  [.s.p.d.AbstractPluginDiscovery] [main] - Discovery plugin jar for: PluginIdentifier{engineType='seatunnel', pluginType='sink', pluginName='Jdbc'} at: [file:/opt/apache-seatunnel-2.3.13/connectors/connector-jdbc-2.3.13.jar]
2026-04-23 11:00:53,546 INFO  [.s.p.d.AbstractPluginDiscovery] [main] - find connector jar and dependency for PluginIdentifier{engineType='seatunnel', pluginType='sink', pluginName='Jdbc'}: [file:/opt/apache-seatunnel-2.3.13/connectors/connector-jdbc-2.3.13.jar]
2026-04-23 11:00:53,585 INFO  [p.MultipleTableJobConfigParser] [main] - start generating all sources.
2026-04-23 11:00:53,944 INFO  [.s.c.s.j.c.AbstractJdbcCatalog] [main] - Catalog Postgres established connection to jdbc:postgresql://192.168.56.105:5432/traffic
2026-04-23 11:00:54,023 INFO  [o.a.s.a.t.c.CatalogTableUtil  ] [main] - Get catalog tables, cost time: 372 ms
2026-04-23 11:00:54,025 INFO  [.s.c.s.j.c.AbstractJdbcCatalog] [main] - Catalog Postgres closing
2026-04-23 11:00:54,267 INFO  [i.d.j.JdbcConnection          ] [pool-2-thread-1] - Connection gracefully closed
2026-04-23 11:00:54,395 INFO  [.c.s.c.p.u.TableDiscoveryUtils] [main] - Read list of available databases
2026-04-23 11:00:54,401 INFO  [.c.s.c.p.u.TableDiscoveryUtils] [main] - 	 list of available databases is: [postgres, template1, template0, pg_monitor, iot, thermo_monitor, adg_monitor, bigdata, oracle_monitor, traffic]
2026-04-23 11:00:54,401 INFO  [.c.s.c.p.u.TableDiscoveryUtils] [main] - Read list of available tables in each database
2026-04-23 11:00:54,405 WARN  [.c.s.c.p.u.TableDiscoveryUtils] [main] - 	 skipping database 'postgres' due to error reading tables: ERROR: cross-database references are not implemented: "postgres.information_schema.tables"
  Position: 15
2026-04-23 11:00:54,410 WARN  [.c.s.c.p.u.TableDiscoveryUtils] [main] - 	 skipping database 'template1' due to error reading tables: ERROR: cross-database references are not implemented: "template1.information_schema.tables"
  Position: 15
2026-04-23 11:00:54,413 WARN  [.c.s.c.p.u.TableDiscoveryUtils] [main] - 	 skipping database 'template0' due to error reading tables: ERROR: cross-database references are not implemented: "template0.information_schema.tables"
  Position: 15
2026-04-23 11:00:54,414 WARN  [.c.s.c.p.u.TableDiscoveryUtils] [main] - 	 skipping database 'pg_monitor' due to error reading tables: ERROR: cross-database references are not implemented: "pg_monitor.information_schema.tables"
  Position: 15
2026-04-23 11:00:54,415 WARN  [.c.s.c.p.u.TableDiscoveryUtils] [main] - 	 skipping database 'iot' due to error reading tables: ERROR: cross-database references are not implemented: "iot.information_schema.tables"
  Position: 15
2026-04-23 11:00:54,416 WARN  [.c.s.c.p.u.TableDiscoveryUtils] [main] - 	 skipping database 'thermo_monitor' due to error reading tables: ERROR: cross-database references are not implemented: "thermo_monitor.information_schema.tables"
  Position: 15
2026-04-23 11:00:54,417 WARN  [.c.s.c.p.u.TableDiscoveryUtils] [main] - 	 skipping database 'adg_monitor' due to error reading tables: ERROR: cross-database references are not implemented: "adg_monitor.information_schema.tables"
  Position: 15
2026-04-23 11:00:54,418 WARN  [.c.s.c.p.u.TableDiscoveryUtils] [main] - 	 skipping database 'bigdata' due to error reading tables: ERROR: cross-database references are not implemented: "bigdata.information_schema.tables"
  Position: 15
2026-04-23 11:00:54,419 WARN  [.c.s.c.p.u.TableDiscoveryUtils] [main] - 	 skipping database 'oracle_monitor' due to error reading tables: ERROR: cross-database references are not implemented: "oracle_monitor.information_schema.tables"
  Position: 15
2026-04-23 11:00:54,422 INFO  [.c.s.c.p.u.TableDiscoveryUtils] [main] - 	 'traffic.public.test_users' is filtered out of capturing
2026-04-23 11:00:54,422 INFO  [.c.s.c.p.u.TableDiscoveryUtils] [main] - 	 including 'traffic.public.users' for further processing
2026-04-23 11:00:54,422 INFO  [.c.s.c.p.u.TableDiscoveryUtils] [main] - 	 'traffic.pg_catalog.pg_statistic' is filtered out of capturing
2026-04-23 11:00:54,422 INFO  [.c.s.c.p.u.TableDiscoveryUtils] [main] - 	 'traffic.pg_catalog.pg_type' is filtered out of capturing
2026-04-23 11:00:54,422 INFO  [.c.s.c.p.u.TableDiscoveryUtils] [main] - 	 'traffic.pg_catalog.pg_foreign_table' is filtered out of capturing
2026-04-23 11:00:54,422 INFO  [.c.s.c.p.u.TableDiscoveryUtils] [main] - 	 'traffic.pg_catalog.pg_authid' is filtered out of capturing
2026-04-23 11:00:54,422 INFO  [.c.s.c.p.u.TableDiscoveryUtils] [main] - 	 'traffic.pg_catalog.pg_statistic_ext_data' is filtered out of capturing
2026-04-23 11:00:54,422 INFO  [.c.s.c.p.u.TableDiscoveryUtils] [main] - 	 'traffic.pg_catalog.pg_user_mapping' is filtered out of capturing
2026-04-23 11:00:54,422 INFO  [.c.s.c.p.u.TableDiscoveryUtils] [main] - 	 'traffic.pg_catalog.pg_subscription' is filtered out of capturing
2026-04-23 11:00:54,422 INFO  [.c.s.c.p.u.TableDiscoveryUtils] [main] - 	 'traffic.pg_catalog.pg_attribute' is filtered out of capturing
2026-04-23 11:00:54,422 INFO  [.c.s.c.p.u.TableDiscoveryUtils] [main] - 	 'traffic.pg_catalog.pg_proc' is filtered out of capturing
2026-04-23 11:00:54,422 INFO  [.c.s.c.p.u.TableDiscoveryUtils] [main] - 	 'traffic.pg_catalog.pg_class' is filtered out of capturing
2026-04-23 11:00:54,422 INFO  [.c.s.c.p.u.TableDiscoveryUtils] [main] - 	 'traffic.pg_catalog.pg_attrdef' is filtered out of capturing
2026-04-23 11:00:54,422 INFO  [.c.s.c.p.u.TableDiscoveryUtils] [main] - 	 'traffic.pg_catalog.pg_constraint' is filtered out of capturing
2026-04-23 11:00:54,422 INFO  [.c.s.c.p.u.TableDiscoveryUtils] [main] - 	 'traffic.pg_catalog.pg_inherits' is filtered out of capturing
2026-04-23 11:00:54,422 INFO  [.c.s.c.p.u.TableDiscoveryUtils] [main] - 	 'traffic.pg_catalog.pg_index' is filtered out of capturing
2026-04-23 11:00:54,422 INFO  [.c.s.c.p.u.TableDiscoveryUtils] [main] - 	 'traffic.pg_catalog.pg_operator' is filtered out of capturing
2026-04-23 11:00:54,423 INFO  [.c.s.c.p.u.TableDiscoveryUtils] [main] - 	 'traffic.pg_catalog.pg_opfamily' is filtered out of capturing
2026-04-23 11:00:54,423 INFO  [.c.s.c.p.u.TableDiscoveryUtils] [main] - 	 'traffic.pg_catalog.pg_opclass' is filtered out of capturing
2026-04-23 11:00:54,423 INFO  [.c.s.c.p.u.TableDiscoveryUtils] [main] - 	 'traffic.pg_catalog.pg_am' is filtered out of capturing
2026-04-23 11:00:54,423 INFO  [.c.s.c.p.u.TableDiscoveryUtils] [main] - 	 'traffic.pg_catalog.pg_amop' is filtered out of capturing
2026-04-23 11:00:54,423 INFO  [.c.s.c.p.u.TableDiscoveryUtils] [main] - 	 'traffic.pg_catalog.pg_amproc' is filtered out of capturing
2026-04-23 11:00:54,423 INFO  [.c.s.c.p.u.TableDiscoveryUtils] [main] - 	 'traffic.pg_catalog.pg_language' is filtered out of capturing
2026-04-23 11:00:54,423 INFO  [.c.s.c.p.u.TableDiscoveryUtils] [main] - 	 'traffic.pg_catalog.pg_largeobject_metadata' is filtered out of capturing
2026-04-23 11:00:54,423 INFO  [.c.s.c.p.u.TableDiscoveryUtils] [main] - 	 'traffic.pg_catalog.pg_aggregate' is filtered out of capturing
2026-04-23 11:00:54,423 INFO  [.c.s.c.p.u.TableDiscoveryUtils] [main] - 	 'traffic.pg_catalog.pg_statistic_ext' is filtered out of capturing
2026-04-23 11:00:54,423 INFO  [.c.s.c.p.u.TableDiscoveryUtils] [main] - 	 'traffic.pg_catalog.pg_rewrite' is filtered out of capturing
2026-04-23 11:00:54,423 INFO  [.c.s.c.p.u.TableDiscoveryUtils] [main] - 	 'traffic.pg_catalog.pg_trigger' is filtered out of capturing
2026-04-23 11:00:54,423 INFO  [.c.s.c.p.u.TableDiscoveryUtils] [main] - 	 'traffic.pg_catalog.pg_event_trigger' is filtered out of capturing
2026-04-23 11:00:54,423 INFO  [.c.s.c.p.u.TableDiscoveryUtils] [main] - 	 'traffic.pg_catalog.pg_description' is filtered out of capturing
2026-04-23 11:00:54,423 INFO  [.c.s.c.p.u.TableDiscoveryUtils] [main] - 	 'traffic.pg_catalog.pg_cast' is filtered out of capturing
2026-04-23 11:00:54,423 INFO  [.c.s.c.p.u.TableDiscoveryUtils] [main] - 	 'traffic.pg_catalog.pg_enum' is filtered out of capturing
2026-04-23 11:00:54,423 INFO  [.c.s.c.p.u.TableDiscoveryUtils] [main] - 	 'traffic.pg_catalog.pg_namespace' is filtered out of capturing
2026-04-23 11:00:54,423 INFO  [.c.s.c.p.u.TableDiscoveryUtils] [main] - 	 'traffic.pg_catalog.pg_conversion' is filtered out of capturing
2026-04-23 11:00:54,423 INFO  [.c.s.c.p.u.TableDiscoveryUtils] [main] - 	 'traffic.pg_catalog.pg_depend' is filtered out of capturing
2026-04-23 11:00:54,423 INFO  [.c.s.c.p.u.TableDiscoveryUtils] [main] - 	 'traffic.pg_catalog.pg_database' is filtered out of capturing
2026-04-23 11:00:54,423 INFO  [.c.s.c.p.u.TableDiscoveryUtils] [main] - 	 'traffic.pg_catalog.pg_db_role_setting' is filtered out of capturing
2026-04-23 11:00:54,423 INFO  [.c.s.c.p.u.TableDiscoveryUtils] [main] - 	 'traffic.pg_catalog.pg_tablespace' is filtered out of capturing
2026-04-23 11:00:54,423 INFO  [.c.s.c.p.u.TableDiscoveryUtils] [main] - 	 'traffic.pg_catalog.pg_auth_members' is filtered out of capturing
2026-04-23 11:00:54,423 INFO  [.c.s.c.p.u.TableDiscoveryUtils] [main] - 	 'traffic.pg_catalog.pg_shdepend' is filtered out of capturing
2026-04-23 11:00:54,423 INFO  [.c.s.c.p.u.TableDiscoveryUtils] [main] - 	 'traffic.pg_catalog.pg_shdescription' is filtered out of capturing
2026-04-23 11:00:54,423 INFO  [.c.s.c.p.u.TableDiscoveryUtils] [main] - 	 'traffic.pg_catalog.pg_ts_config' is filtered out of capturing
2026-04-23 11:00:54,424 INFO  [.c.s.c.p.u.TableDiscoveryUtils] [main] - 	 'traffic.pg_catalog.pg_ts_config_map' is filtered out of capturing
2026-04-23 11:00:54,424 INFO  [.c.s.c.p.u.TableDiscoveryUtils] [main] - 	 'traffic.pg_catalog.pg_ts_dict' is filtered out of capturing
2026-04-23 11:00:54,424 INFO  [.c.s.c.p.u.TableDiscoveryUtils] [main] - 	 'traffic.pg_catalog.pg_ts_parser' is filtered out of capturing
2026-04-23 11:00:54,424 INFO  [.c.s.c.p.u.TableDiscoveryUtils] [main] - 	 'traffic.pg_catalog.pg_ts_template' is filtered out of capturing
2026-04-23 11:00:54,430 INFO  [.c.s.c.p.u.TableDiscoveryUtils] [main] - 	 'traffic.pg_catalog.pg_extension' is filtered out of capturing
2026-04-23 11:00:54,430 INFO  [.c.s.c.p.u.TableDiscoveryUtils] [main] - 	 'traffic.pg_catalog.pg_foreign_data_wrapper' is filtered out of capturing
2026-04-23 11:00:54,430 INFO  [.c.s.c.p.u.TableDiscoveryUtils] [main] - 	 'traffic.pg_catalog.pg_foreign_server' is filtered out of capturing
2026-04-23 11:00:54,430 INFO  [.c.s.c.p.u.TableDiscoveryUtils] [main] - 	 'traffic.pg_catalog.pg_policy' is filtered out of capturing
2026-04-23 11:00:54,430 INFO  [.c.s.c.p.u.TableDiscoveryUtils] [main] - 	 'traffic.pg_catalog.pg_replication_origin' is filtered out of capturing
2026-04-23 11:00:54,430 INFO  [.c.s.c.p.u.TableDiscoveryUtils] [main] - 	 'traffic.pg_catalog.pg_default_acl' is filtered out of capturing
2026-04-23 11:00:54,430 INFO  [.c.s.c.p.u.TableDiscoveryUtils] [main] - 	 'traffic.pg_catalog.pg_init_privs' is filtered out of capturing
2026-04-23 11:00:54,430 INFO  [.c.s.c.p.u.TableDiscoveryUtils] [main] - 	 'traffic.pg_catalog.pg_seclabel' is filtered out of capturing
2026-04-23 11:00:54,430 INFO  [.c.s.c.p.u.TableDiscoveryUtils] [main] - 	 'traffic.pg_catalog.pg_shseclabel' is filtered out of capturing
2026-04-23 11:00:54,430 INFO  [.c.s.c.p.u.TableDiscoveryUtils] [main] - 	 'traffic.pg_catalog.pg_collation' is filtered out of capturing
2026-04-23 11:00:54,430 INFO  [.c.s.c.p.u.TableDiscoveryUtils] [main] - 	 'traffic.pg_catalog.pg_parameter_acl' is filtered out of capturing
2026-04-23 11:00:54,430 INFO  [.c.s.c.p.u.TableDiscoveryUtils] [main] - 	 'traffic.pg_catalog.pg_partitioned_table' is filtered out of capturing
2026-04-23 11:00:54,430 INFO  [.c.s.c.p.u.TableDiscoveryUtils] [main] - 	 'traffic.pg_catalog.pg_range' is filtered out of capturing
2026-04-23 11:00:54,430 INFO  [.c.s.c.p.u.TableDiscoveryUtils] [main] - 	 'traffic.pg_catalog.pg_transform' is filtered out of capturing
2026-04-23 11:00:54,430 INFO  [.c.s.c.p.u.TableDiscoveryUtils] [main] - 	 'traffic.pg_catalog.pg_sequence' is filtered out of capturing
2026-04-23 11:00:54,430 INFO  [.c.s.c.p.u.TableDiscoveryUtils] [main] - 	 'traffic.pg_catalog.pg_publication' is filtered out of capturing
2026-04-23 11:00:54,430 INFO  [.c.s.c.p.u.TableDiscoveryUtils] [main] - 	 'traffic.pg_catalog.pg_publication_namespace' is filtered out of capturing
2026-04-23 11:00:54,430 INFO  [.c.s.c.p.u.TableDiscoveryUtils] [main] - 	 'traffic.pg_catalog.pg_publication_rel' is filtered out of capturing
2026-04-23 11:00:54,430 INFO  [.c.s.c.p.u.TableDiscoveryUtils] [main] - 	 'traffic.pg_catalog.pg_subscription_rel' is filtered out of capturing
2026-04-23 11:00:54,430 INFO  [.c.s.c.p.u.TableDiscoveryUtils] [main] - 	 'traffic.pg_catalog.pg_largeobject' is filtered out of capturing
2026-04-23 11:00:54,430 INFO  [.c.s.c.p.u.TableDiscoveryUtils] [main] - 	 'traffic.information_schema.sql_parts' is filtered out of capturing
2026-04-23 11:00:54,430 INFO  [.c.s.c.p.u.TableDiscoveryUtils] [main] - 	 'traffic.information_schema.sql_features' is filtered out of capturing
2026-04-23 11:00:54,430 INFO  [.c.s.c.p.u.TableDiscoveryUtils] [main] - 	 'traffic.information_schema.sql_implementation_info' is filtered out of capturing
2026-04-23 11:00:54,430 INFO  [.c.s.c.p.u.TableDiscoveryUtils] [main] - 	 'traffic.information_schema.sql_sizing' is filtered out of capturing
2026-04-23 11:00:54,444 INFO  [i.d.j.JdbcConnection          ] [pool-3-thread-1] - Connection gracefully closed
2026-04-23 11:00:54,496 INFO  [i.d.j.JdbcConnection          ] [pool-4-thread-1] - Connection gracefully closed
2026-04-23 11:00:54,552 INFO  [.a.s.c.c.b.u.CatalogTableUtils] [main] - Override primary key([id]) for catalog table traffic.public.users
2026-04-23 11:00:54,557 INFO  [i.d.j.JdbcConnection          ] [pool-5-thread-1] - Connection gracefully closed
2026-04-23 11:00:54,669 INFO  [o.a.k.c.j.JsonConverterConfig ] [main] - JsonConverterConfig values: 
	converter.type = value
	decimal.format = BASE64
	schemas.cache.size = 1000
	schemas.enable = true

2026-04-23 11:00:54,693 INFO  [o.a.s.a.t.f.FactoryUtil       ] [main] - get the CatalogTable from source Postgres-CDC: Postgres.traffic.public.users
2026-04-23 11:00:54,698 INFO  [.s.p.d.AbstractPluginDiscovery] [main] - Load SeaTunnelSource Plugin from /opt/apache-seatunnel-2.3.13/connectors
2026-04-23 11:00:54,714 INFO  [.s.p.d.AbstractPluginDiscovery] [main] - Discovery plugin jar for: PluginIdentifier{engineType='seatunnel', pluginType='source', pluginName='Postgres-CDC'} at: [file:/opt/apache-seatunnel-2.3.13/connectors/connector-cdc-base-2.3.13.jar, file:/opt/apache-seatunnel-2.3.13/connectors/connector-cdc-postgres-2.3.13.jar]
2026-04-23 11:00:54,714 INFO  [.s.p.d.AbstractPluginDiscovery] [main] - find connector jar and dependency for PluginIdentifier{engineType='seatunnel', pluginType='source', pluginName='Postgres-CDC'}: [file:/opt/apache-seatunnel-2.3.13/connectors/connector-cdc-base-2.3.13.jar, file:/opt/apache-seatunnel-2.3.13/connectors/connector-cdc-postgres-2.3.13.jar]
2026-04-23 11:00:54,715 INFO  [p.MultipleTableJobConfigParser] [main] - start generating all transforms.
2026-04-23 11:00:54,715 INFO  [p.MultipleTableJobConfigParser] [main] - start generating all sinks.
2026-04-23 11:00:54,717 INFO  [.s.p.d.AbstractPluginDiscovery] [main] - Load SeaTunnelSink Plugin from /opt/apache-seatunnel-2.3.13/connectors
2026-04-23 11:00:54,718 INFO  [.s.p.d.AbstractPluginDiscovery] [main] - Discovery plugin jar for: PluginIdentifier{engineType='seatunnel', pluginType='sink', pluginName='Jdbc'} at: [file:/opt/apache-seatunnel-2.3.13/connectors/connector-jdbc-2.3.13.jar]
2026-04-23 11:00:54,718 INFO  [.s.p.d.AbstractPluginDiscovery] [main] - find connector jar and dependency for PluginIdentifier{engineType='seatunnel', pluginType='sink', pluginName='Jdbc'}: [file:/opt/apache-seatunnel-2.3.13/connectors/connector-jdbc-2.3.13.jar]
2026-04-23 11:00:54,766 INFO  [o.a.s.a.t.f.FactoryUtil       ] [main] - Create sink 'Jdbc' with upstream input catalog-table[database: traffic, schema: public, table: users]
2026-04-23 11:00:54,768 WARN  [o.a.s.a.c.ReadonlyConfig      ] [main] - Please use the new key 'username' instead of the deprecated key 'user'.
2026-04-23 11:00:54,963 INFO  [o.a.s.e.c.j.ClientJobProxy    ] [main] - Start submit job, job id: 1099530306026733569, with plugin jar [file:/opt/apache-seatunnel-2.3.13/connectors/connector-jdbc-2.3.13.jar, file:/opt/apache-seatunnel-2.3.13/connectors/connector-cdc-postgres-2.3.13.jar, file:/opt/apache-seatunnel-2.3.13/connectors/connector-cdc-base-2.3.13.jar]
2026-04-23 11:00:55,048 INFO  [o.a.s.e.c.j.ClientJobProxy    ] [main] - Submit job finished, job id: 1099530306026733569, job name: SeaTunnel_Job
2026-04-23 11:00:55,108 WARN  [o.a.s.e.c.j.JobMetricsRunner  ] [job-metrics-runner-1099530306026733569] - Failed to get job metrics summary, it maybe first-run
2026-04-23 11:01:55,081 INFO  [o.a.s.e.c.j.JobMetricsRunner  ] [job-metrics-runner-1099530306026733569] - 
***********************************************
           Job Progress Information
***********************************************
Job Id                    : 1099530306026733569
Read Count So Far         :                   0
Write Attempt Count So Far:                   0
Write Committed Count So Far:                   0
Commit Rate               :                 N/A
Average Read Count        :                 0/s
Average Write Attempt Count:                 0/s
Average Write Committed Count:                 0/s
Last Statistic Time       : 2026-04-23 11:00:55
Current Statistic Time    : 2026-04-23 11:01:55
***********************************************

2026-04-23 11:02:55,081 INFO  [o.a.s.e.c.j.JobMetricsRunner  ] [job-metrics-runner-1099530306026733569] - 
***********************************************
           Job Progress Information
***********************************************
Job Id                    : 1099530306026733569
Read Count So Far         :                   3
Write Attempt Count So Far:                   3
Write Committed Count So Far:                   2
Commit Rate               :              66.67%
Average Read Count        :                 0/s
Average Write Attempt Count:                 0/s
Average Write Committed Count:                 0/s
Last Statistic Time       : 2026-04-23 11:01:55
Current Statistic Time    : 2026-04-23 11:02:55
***********************************************

全量 + 增量

步骤 1:先清理旧 slot(必须做)

SELECT pg_drop_replication_slot('final_slot');

步骤 2:新建一个干净的 slot

SELECT * FROM pg_create_logical_replication_slot('cdc_full_slot', 'pgoutput');

步骤 3:配置

[root@localhost apache-seatunnel-2.3.13]# cat config/pgcdcpg.conf 
env {
  execution.parallelism = 1
  job.mode = "STREAMING"
  checkpoint.interval = 5000
}

source {
  Postgres-CDC {
    username = "postgres"
    password = "123456"
    database-names = ["traffic"]
    schema-names = ["public"]
    table-names = ["traffic.public.users"]
    url = "jdbc:postgresql://192.168.56.105:5432/traffic"

    decoding.plugin.name = "pgoutput"
    slot.name = "final_slot"
    startup.mode = "initial"
    plugin_output = "out"
  }
}

sink {
  Jdbc {
    plugin_input = ["out"]
    driver = "org.postgresql.Driver"
    url = "jdbc:postgresql://192.168.56.105:5432/traffic"
    user = "postgres"
    password = "123456"

    database = "traffic"
    table = "public.test_users"
    primary_keys = ["id"]
    
    # 稳定兼容参数
    generate_sink_sql = true
    support_upsert = true
    batch_size = 100
    batch_interval_ms = 1000
  }
}

[root@localhost apache-seatunnel-2.3.13]# 

启动任务

[root@localhost apache-seatunnel-2.3.13]# sh bin/seatunnel.sh --config config/pgcdcpg.conf 
WARNING: sun.reflect.Reflection.getCallerClass is not supported. This will impact performance.
2026-04-23 11:06:31,573 INFO  [c.h.i.c.AbstractConfigLocator ] [main] - Loading configuration '/opt/apache-seatunnel-2.3.13/config/seatunnel.yaml' from System property 'seatunnel.config'
2026-04-23 11:06:31,590 INFO  [c.h.i.c.AbstractConfigLocator ] [main] - Using configuration file at /opt/apache-seatunnel-2.3.13/config/seatunnel.yaml
2026-04-23 11:06:31,592 INFO  [o.a.s.e.c.c.SeaTunnelConfig   ] [main] - seatunnel.home is /opt/apache-seatunnel-2.3.13
2026-04-23 11:06:31,717 INFO  [amlSeaTunnelDomConfigProcessor] [main] - Dynamic slot is enabled, the schedule strategy is set to REJECT
2026-04-23 11:06:31,717 INFO  [c.h.i.c.AbstractConfigLocator ] [main] - Loading configuration '/opt/apache-seatunnel-2.3.13/config/hazelcast.yaml' from System property 'hazelcast.config'
2026-04-23 11:06:31,717 INFO  [c.h.i.c.AbstractConfigLocator ] [main] - Using configuration file at /opt/apache-seatunnel-2.3.13/config/hazelcast.yaml
2026-04-23 11:06:32,340 INFO  [c.h.i.c.AbstractConfigLocator ] [main] - Loading configuration '/opt/apache-seatunnel-2.3.13/config/hazelcast-client.yaml' from System property 'hazelcast.client.config'
2026-04-23 11:06:32,340 INFO  [c.h.i.c.AbstractConfigLocator ] [main] - Using configuration file at /opt/apache-seatunnel-2.3.13/config/hazelcast-client.yaml
2026-04-23 11:06:32,636 INFO  [.c.i.s.ClientInvocationService] [main] - hz.client_1 [seatunnel] [5.1] Running with 2 response threads, dynamic=true
2026-04-23 11:06:32,720 INFO  [c.h.c.LifecycleService        ] [main] - hz.client_1 [seatunnel] [5.1] HazelcastClient 5.1 (20220228 - 21f20e7) is STARTING
2026-04-23 11:06:32,721 INFO  [c.h.c.LifecycleService        ] [main] - hz.client_1 [seatunnel] [5.1] HazelcastClient 5.1 (20220228 - 21f20e7) is STARTED
WARNING: An illegal reflective access operation has occurred
WARNING: Illegal reflective access by com.hazelcast.internal.networking.nio.SelectorOptimizer (file:/opt/apache-seatunnel-2.3.13/starter/seatunnel-starter.jar) to field sun.nio.ch.SelectorImpl.selectedKeys
WARNING: Please consider reporting this to the maintainers of com.hazelcast.internal.networking.nio.SelectorOptimizer
WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations
WARNING: All illegal access operations will be denied in a future release
2026-04-23 11:06:32,780 INFO  [.c.i.c.ClientConnectionManager] [main] - hz.client_1 [seatunnel] [5.1] Trying to connect to cluster: seatunnel
2026-04-23 11:06:32,782 INFO  [.c.i.c.ClientConnectionManager] [main] - hz.client_1 [seatunnel] [5.1] Trying to connect to [localhost]:5801
2026-04-23 11:06:32,821 INFO  [c.h.c.LifecycleService        ] [main] - hz.client_1 [seatunnel] [5.1] HazelcastClient 5.1 (20220228 - 21f20e7) is CLIENT_CONNECTED
2026-04-23 11:06:32,821 INFO  [.c.i.c.ClientConnectionManager] [main] - hz.client_1 [seatunnel] [5.1] Authenticated with server [localhost]:5801:59db9132-1c4f-4bbd-8620-7155fd9c6a33, server version: 5.1, local address: /127.0.0.1:58759
2026-04-23 11:06:32,825 INFO  [c.h.i.d.Diagnostics           ] [main] - hz.client_1 [seatunnel] [5.1] Diagnostics disabled. To enable add -Dhazelcast.diagnostics.enabled=true to the JVM arguments.
2026-04-23 11:06:32,854 INFO  [c.h.c.i.s.ClientClusterService] [hz.client_1.event-4] - hz.client_1 [seatunnel] [5.1] 

Members [1] {
	Member [localhost]:5801 - 59db9132-1c4f-4bbd-8620-7155fd9c6a33 [master node]
}

2026-04-23 11:06:32,874 INFO  [.c.i.s.ClientStatisticsService] [main] - Client statistics is enabled with period 5 seconds.
2026-04-23 11:06:33,169 INFO  [o.a.s.c.s.u.ConfigBuilder     ] [main] - Loading config file from path: config/pgcdcpg.conf
2026-04-23 11:06:33,314 INFO  [o.a.s.c.s.u.ConfigShadeUtils  ] [main] - Load config shade spi: [base64]
2026-04-23 11:06:33,454 INFO  [o.a.s.c.s.u.ConfigBuilder     ] [main] - Parsed config file: 
{
    "env" : {
        "execution.parallelism" : 1,
        "job.mode" : "STREAMING",
        "checkpoint.interval" : 5000
    },
    "source" : [
        {
            "username" : "******",
            "password" : "******",
            "database-names" : [
                "traffic"
            ],
            "schema-names" : [
                "public"
            ],
            "table-names" : [
                "traffic.public.users"
            ],
            "url" : "jdbc:postgresql://192.168.56.105:5432/traffic",
            "decoding.plugin.name" : "pgoutput",
            "slot.name" : "final_slot",
            "startup.mode" : "initial",
            "plugin_output" : "out",
            "plugin_name" : "Postgres-CDC"
        }
    ],
    "sink" : [
        {
            "plugin_input" : [
                "out"
            ],
            "driver" : "org.postgresql.Driver",
            "url" : "jdbc:postgresql://192.168.56.105:5432/traffic",
            "user" : "postgres",
            "password" : "******",
            "database" : "traffic",
            "table" : "public.test_users",
            "primary_keys" : [
                "id"
            ],
            "generate_sink_sql" : true,
            "support_upsert" : true,
            "batch_size" : 100,
            "batch_interval_ms" : 1000,
            "plugin_name" : "Jdbc"
        }
    ],
    "transform" : []
}

2026-04-23 11:06:33,459 INFO  [p.MultipleTableJobConfigParser] [main] - add common jar in plugins :[]
2026-04-23 11:06:33,498 INFO  [.s.p.d.AbstractPluginDiscovery] [main] - Load SeaTunnelSink Plugin from /opt/apache-seatunnel-2.3.13/connectors
2026-04-23 11:06:33,510 INFO  [.s.p.d.AbstractPluginDiscovery] [main] - Discovery plugin jar for: PluginIdentifier{engineType='seatunnel', pluginType='source', pluginName='Postgres-CDC'} at: [file:/opt/apache-seatunnel-2.3.13/connectors/connector-cdc-base-2.3.13.jar, file:/opt/apache-seatunnel-2.3.13/connectors/connector-cdc-postgres-2.3.13.jar]
2026-04-23 11:06:33,524 INFO  [.s.p.d.AbstractPluginDiscovery] [main] - find connector jar and dependency for PluginIdentifier{engineType='seatunnel', pluginType='source', pluginName='Postgres-CDC'}: [file:/opt/apache-seatunnel-2.3.13/connectors/connector-cdc-base-2.3.13.jar, file:/opt/apache-seatunnel-2.3.13/connectors/connector-cdc-postgres-2.3.13.jar]
2026-04-23 11:06:33,530 INFO  [.s.p.d.AbstractPluginDiscovery] [main] - Load SeaTunnelSink Plugin from /opt/apache-seatunnel-2.3.13/connectors
2026-04-23 11:06:33,557 INFO  [.s.p.d.AbstractPluginDiscovery] [main] - Load SeaTunnelSink Plugin from /opt/apache-seatunnel-2.3.13/connectors
2026-04-23 11:06:33,558 INFO  [.s.p.d.AbstractPluginDiscovery] [main] - Discovery plugin jar for: PluginIdentifier{engineType='seatunnel', pluginType='sink', pluginName='Jdbc'} at: [file:/opt/apache-seatunnel-2.3.13/connectors/connector-jdbc-2.3.13.jar]
2026-04-23 11:06:33,558 INFO  [.s.p.d.AbstractPluginDiscovery] [main] - find connector jar and dependency for PluginIdentifier{engineType='seatunnel', pluginType='sink', pluginName='Jdbc'}: [file:/opt/apache-seatunnel-2.3.13/connectors/connector-jdbc-2.3.13.jar]
2026-04-23 11:06:33,561 INFO  [p.MultipleTableJobConfigParser] [main] - start generating all sources.
2026-04-23 11:06:33,915 INFO  [.s.c.s.j.c.AbstractJdbcCatalog] [main] - Catalog Postgres established connection to jdbc:postgresql://192.168.56.105:5432/traffic
2026-04-23 11:06:33,956 INFO  [o.a.s.a.t.c.CatalogTableUtil  ] [main] - Get catalog tables, cost time: 293 ms
2026-04-23 11:06:33,958 INFO  [.s.c.s.j.c.AbstractJdbcCatalog] [main] - Catalog Postgres closing
2026-04-23 11:06:34,131 INFO  [i.d.j.JdbcConnection          ] [pool-2-thread-1] - Connection gracefully closed
2026-04-23 11:06:34,289 INFO  [.c.s.c.p.u.TableDiscoveryUtils] [main] - Read list of available databases
2026-04-23 11:06:34,291 INFO  [.c.s.c.p.u.TableDiscoveryUtils] [main] - 	 list of available databases is: [postgres, template1, template0, pg_monitor, iot, thermo_monitor, adg_monitor, bigdata, oracle_monitor, traffic]
2026-04-23 11:06:34,291 INFO  [.c.s.c.p.u.TableDiscoveryUtils] [main] - Read list of available tables in each database
2026-04-23 11:06:34,295 WARN  [.c.s.c.p.u.TableDiscoveryUtils] [main] - 	 skipping database 'postgres' due to error reading tables: ERROR: cross-database references are not implemented: "postgres.information_schema.tables"
  Position: 15
2026-04-23 11:06:34,308 WARN  [.c.s.c.p.u.TableDiscoveryUtils] [main] - 	 skipping database 'template1' due to error reading tables: ERROR: cross-database references are not implemented: "template1.information_schema.tables"
  Position: 15
2026-04-23 11:06:34,310 WARN  [.c.s.c.p.u.TableDiscoveryUtils] [main] - 	 skipping database 'template0' due to error reading tables: ERROR: cross-database references are not implemented: "template0.information_schema.tables"
  Position: 15
2026-04-23 11:06:34,311 WARN  [.c.s.c.p.u.TableDiscoveryUtils] [main] - 	 skipping database 'pg_monitor' due to error reading tables: ERROR: cross-database references are not implemented: "pg_monitor.information_schema.tables"
  Position: 15
2026-04-23 11:06:34,313 WARN  [.c.s.c.p.u.TableDiscoveryUtils] [main] - 	 skipping database 'iot' due to error reading tables: ERROR: cross-database references are not implemented: "iot.information_schema.tables"
  Position: 15
2026-04-23 11:06:34,315 WARN  [.c.s.c.p.u.TableDiscoveryUtils] [main] - 	 skipping database 'thermo_monitor' due to error reading tables: ERROR: cross-database references are not implemented: "thermo_monitor.information_schema.tables"
  Position: 15
2026-04-23 11:06:34,316 WARN  [.c.s.c.p.u.TableDiscoveryUtils] [main] - 	 skipping database 'adg_monitor' due to error reading tables: ERROR: cross-database references are not implemented: "adg_monitor.information_schema.tables"
  Position: 15
2026-04-23 11:06:34,316 WARN  [.c.s.c.p.u.TableDiscoveryUtils] [main] - 	 skipping database 'bigdata' due to error reading tables: ERROR: cross-database references are not implemented: "bigdata.information_schema.tables"
  Position: 15
2026-04-23 11:06:34,317 WARN  [.c.s.c.p.u.TableDiscoveryUtils] [main] - 	 skipping database 'oracle_monitor' due to error reading tables: ERROR: cross-database references are not implemented: "oracle_monitor.information_schema.tables"
  Position: 15
2026-04-23 11:06:34,320 INFO  [.c.s.c.p.u.TableDiscoveryUtils] [main] - 	 including 'traffic.public.users' for further processing
2026-04-23 11:06:34,320 INFO  [.c.s.c.p.u.TableDiscoveryUtils] [main] - 	 'traffic.public.test_users' is filtered out of capturing
2026-04-23 11:06:34,320 INFO  [.c.s.c.p.u.TableDiscoveryUtils] [main] - 	 'traffic.pg_catalog.pg_statistic' is filtered out of capturing
2026-04-23 11:06:34,320 INFO  [.c.s.c.p.u.TableDiscoveryUtils] [main] - 	 'traffic.pg_catalog.pg_type' is filtered out of capturing
2026-04-23 11:06:34,320 INFO  [.c.s.c.p.u.TableDiscoveryUtils] [main] - 	 'traffic.pg_catalog.pg_foreign_table' is filtered out of capturing
2026-04-23 11:06:34,321 INFO  [.c.s.c.p.u.TableDiscoveryUtils] [main] - 	 'traffic.pg_catalog.pg_authid' is filtered out of capturing
2026-04-23 11:06:34,321 INFO  [.c.s.c.p.u.TableDiscoveryUtils] [main] - 	 'traffic.pg_catalog.pg_statistic_ext_data' is filtered out of capturing
2026-04-23 11:06:34,321 INFO  [.c.s.c.p.u.TableDiscoveryUtils] [main] - 	 'traffic.pg_catalog.pg_user_mapping' is filtered out of capturing
2026-04-23 11:06:34,321 INFO  [.c.s.c.p.u.TableDiscoveryUtils] [main] - 	 'traffic.pg_catalog.pg_subscription' is filtered out of capturing
2026-04-23 11:06:34,321 INFO  [.c.s.c.p.u.TableDiscoveryUtils] [main] - 	 'traffic.pg_catalog.pg_attribute' is filtered out of capturing
2026-04-23 11:06:34,321 INFO  [.c.s.c.p.u.TableDiscoveryUtils] [main] - 	 'traffic.pg_catalog.pg_proc' is filtered out of capturing
2026-04-23 11:06:34,321 INFO  [.c.s.c.p.u.TableDiscoveryUtils] [main] - 	 'traffic.pg_catalog.pg_class' is filtered out of capturing
2026-04-23 11:06:34,321 INFO  [.c.s.c.p.u.TableDiscoveryUtils] [main] - 	 'traffic.pg_catalog.pg_attrdef' is filtered out of capturing
2026-04-23 11:06:34,321 INFO  [.c.s.c.p.u.TableDiscoveryUtils] [main] - 	 'traffic.pg_catalog.pg_constraint' is filtered out of capturing
2026-04-23 11:06:34,321 INFO  [.c.s.c.p.u.TableDiscoveryUtils] [main] - 	 'traffic.pg_catalog.pg_inherits' is filtered out of capturing
2026-04-23 11:06:34,321 INFO  [.c.s.c.p.u.TableDiscoveryUtils] [main] - 	 'traffic.pg_catalog.pg_index' is filtered out of capturing
2026-04-23 11:06:34,321 INFO  [.c.s.c.p.u.TableDiscoveryUtils] [main] - 	 'traffic.pg_catalog.pg_operator' is filtered out of capturing
2026-04-23 11:06:34,321 INFO  [.c.s.c.p.u.TableDiscoveryUtils] [main] - 	 'traffic.pg_catalog.pg_opfamily' is filtered out of capturing
2026-04-23 11:06:34,321 INFO  [.c.s.c.p.u.TableDiscoveryUtils] [main] - 	 'traffic.pg_catalog.pg_opclass' is filtered out of capturing
2026-04-23 11:06:34,321 INFO  [.c.s.c.p.u.TableDiscoveryUtils] [main] - 	 'traffic.pg_catalog.pg_am' is filtered out of capturing
2026-04-23 11:06:34,321 INFO  [.c.s.c.p.u.TableDiscoveryUtils] [main] - 	 'traffic.pg_catalog.pg_amop' is filtered out of capturing
2026-04-23 11:06:34,321 INFO  [.c.s.c.p.u.TableDiscoveryUtils] [main] - 	 'traffic.pg_catalog.pg_amproc' is filtered out of capturing
2026-04-23 11:06:34,321 INFO  [.c.s.c.p.u.TableDiscoveryUtils] [main] - 	 'traffic.pg_catalog.pg_language' is filtered out of capturing
2026-04-23 11:06:34,321 INFO  [.c.s.c.p.u.TableDiscoveryUtils] [main] - 	 'traffic.pg_catalog.pg_largeobject_metadata' is filtered out of capturing
2026-04-23 11:06:34,321 INFO  [.c.s.c.p.u.TableDiscoveryUtils] [main] - 	 'traffic.pg_catalog.pg_aggregate' is filtered out of capturing
2026-04-23 11:06:34,321 INFO  [.c.s.c.p.u.TableDiscoveryUtils] [main] - 	 'traffic.pg_catalog.pg_statistic_ext' is filtered out of capturing
2026-04-23 11:06:34,321 INFO  [.c.s.c.p.u.TableDiscoveryUtils] [main] - 	 'traffic.pg_catalog.pg_rewrite' is filtered out of capturing
2026-04-23 11:06:34,321 INFO  [.c.s.c.p.u.TableDiscoveryUtils] [main] - 	 'traffic.pg_catalog.pg_trigger' is filtered out of capturing
2026-04-23 11:06:34,321 INFO  [.c.s.c.p.u.TableDiscoveryUtils] [main] - 	 'traffic.pg_catalog.pg_event_trigger' is filtered out of capturing
2026-04-23 11:06:34,321 INFO  [.c.s.c.p.u.TableDiscoveryUtils] [main] - 	 'traffic.pg_catalog.pg_description' is filtered out of capturing
2026-04-23 11:06:34,321 INFO  [.c.s.c.p.u.TableDiscoveryUtils] [main] - 	 'traffic.pg_catalog.pg_cast' is filtered out of capturing
2026-04-23 11:06:34,321 INFO  [.c.s.c.p.u.TableDiscoveryUtils] [main] - 	 'traffic.pg_catalog.pg_enum' is filtered out of capturing
2026-04-23 11:06:34,321 INFO  [.c.s.c.p.u.TableDiscoveryUtils] [main] - 	 'traffic.pg_catalog.pg_namespace' is filtered out of capturing
2026-04-23 11:06:34,321 INFO  [.c.s.c.p.u.TableDiscoveryUtils] [main] - 	 'traffic.pg_catalog.pg_conversion' is filtered out of capturing
2026-04-23 11:06:34,321 INFO  [.c.s.c.p.u.TableDiscoveryUtils] [main] - 	 'traffic.pg_catalog.pg_depend' is filtered out of capturing
2026-04-23 11:06:34,321 INFO  [.c.s.c.p.u.TableDiscoveryUtils] [main] - 	 'traffic.pg_catalog.pg_database' is filtered out of capturing
2026-04-23 11:06:34,322 INFO  [.c.s.c.p.u.TableDiscoveryUtils] [main] - 	 'traffic.pg_catalog.pg_db_role_setting' is filtered out of capturing
2026-04-23 11:06:34,322 INFO  [.c.s.c.p.u.TableDiscoveryUtils] [main] - 	 'traffic.pg_catalog.pg_tablespace' is filtered out of capturing
2026-04-23 11:06:34,322 INFO  [.c.s.c.p.u.TableDiscoveryUtils] [main] - 	 'traffic.pg_catalog.pg_auth_members' is filtered out of capturing
2026-04-23 11:06:34,322 INFO  [.c.s.c.p.u.TableDiscoveryUtils] [main] - 	 'traffic.pg_catalog.pg_shdepend' is filtered out of capturing
2026-04-23 11:06:34,322 INFO  [.c.s.c.p.u.TableDiscoveryUtils] [main] - 	 'traffic.pg_catalog.pg_shdescription' is filtered out of capturing
2026-04-23 11:06:34,322 INFO  [.c.s.c.p.u.TableDiscoveryUtils] [main] - 	 'traffic.pg_catalog.pg_ts_config' is filtered out of capturing
2026-04-23 11:06:34,322 INFO  [.c.s.c.p.u.TableDiscoveryUtils] [main] - 	 'traffic.pg_catalog.pg_ts_config_map' is filtered out of capturing
2026-04-23 11:06:34,322 INFO  [.c.s.c.p.u.TableDiscoveryUtils] [main] - 	 'traffic.pg_catalog.pg_ts_dict' is filtered out of capturing
2026-04-23 11:06:34,322 INFO  [.c.s.c.p.u.TableDiscoveryUtils] [main] - 	 'traffic.pg_catalog.pg_ts_parser' is filtered out of capturing
2026-04-23 11:06:34,322 INFO  [.c.s.c.p.u.TableDiscoveryUtils] [main] - 	 'traffic.pg_catalog.pg_ts_template' is filtered out of capturing
2026-04-23 11:06:34,337 INFO  [.c.s.c.p.u.TableDiscoveryUtils] [main] - 	 'traffic.pg_catalog.pg_extension' is filtered out of capturing
2026-04-23 11:06:34,337 INFO  [.c.s.c.p.u.TableDiscoveryUtils] [main] - 	 'traffic.pg_catalog.pg_foreign_data_wrapper' is filtered out of capturing
2026-04-23 11:06:34,337 INFO  [.c.s.c.p.u.TableDiscoveryUtils] [main] - 	 'traffic.pg_catalog.pg_foreign_server' is filtered out of capturing
2026-04-23 11:06:34,337 INFO  [.c.s.c.p.u.TableDiscoveryUtils] [main] - 	 'traffic.pg_catalog.pg_policy' is filtered out of capturing
2026-04-23 11:06:34,337 INFO  [.c.s.c.p.u.TableDiscoveryUtils] [main] - 	 'traffic.pg_catalog.pg_replication_origin' is filtered out of capturing
2026-04-23 11:06:34,337 INFO  [.c.s.c.p.u.TableDiscoveryUtils] [main] - 	 'traffic.pg_catalog.pg_default_acl' is filtered out of capturing
2026-04-23 11:06:34,337 INFO  [.c.s.c.p.u.TableDiscoveryUtils] [main] - 	 'traffic.pg_catalog.pg_init_privs' is filtered out of capturing
2026-04-23 11:06:34,337 INFO  [.c.s.c.p.u.TableDiscoveryUtils] [main] - 	 'traffic.pg_catalog.pg_seclabel' is filtered out of capturing
2026-04-23 11:06:34,337 INFO  [.c.s.c.p.u.TableDiscoveryUtils] [main] - 	 'traffic.pg_catalog.pg_shseclabel' is filtered out of capturing
2026-04-23 11:06:34,337 INFO  [.c.s.c.p.u.TableDiscoveryUtils] [main] - 	 'traffic.pg_catalog.pg_collation' is filtered out of capturing
2026-04-23 11:06:34,337 INFO  [.c.s.c.p.u.TableDiscoveryUtils] [main] - 	 'traffic.pg_catalog.pg_parameter_acl' is filtered out of capturing
2026-04-23 11:06:34,337 INFO  [.c.s.c.p.u.TableDiscoveryUtils] [main] - 	 'traffic.pg_catalog.pg_partitioned_table' is filtered out of capturing
2026-04-23 11:06:34,337 INFO  [.c.s.c.p.u.TableDiscoveryUtils] [main] - 	 'traffic.pg_catalog.pg_range' is filtered out of capturing
2026-04-23 11:06:34,338 INFO  [.c.s.c.p.u.TableDiscoveryUtils] [main] - 	 'traffic.pg_catalog.pg_transform' is filtered out of capturing
2026-04-23 11:06:34,338 INFO  [.c.s.c.p.u.TableDiscoveryUtils] [main] - 	 'traffic.pg_catalog.pg_sequence' is filtered out of capturing
2026-04-23 11:06:34,338 INFO  [.c.s.c.p.u.TableDiscoveryUtils] [main] - 	 'traffic.pg_catalog.pg_publication' is filtered out of capturing
2026-04-23 11:06:34,338 INFO  [.c.s.c.p.u.TableDiscoveryUtils] [main] - 	 'traffic.pg_catalog.pg_publication_namespace' is filtered out of capturing
2026-04-23 11:06:34,338 INFO  [.c.s.c.p.u.TableDiscoveryUtils] [main] - 	 'traffic.pg_catalog.pg_publication_rel' is filtered out of capturing
2026-04-23 11:06:34,338 INFO  [.c.s.c.p.u.TableDiscoveryUtils] [main] - 	 'traffic.pg_catalog.pg_subscription_rel' is filtered out of capturing
2026-04-23 11:06:34,338 INFO  [.c.s.c.p.u.TableDiscoveryUtils] [main] - 	 'traffic.pg_catalog.pg_largeobject' is filtered out of capturing
2026-04-23 11:06:34,338 INFO  [.c.s.c.p.u.TableDiscoveryUtils] [main] - 	 'traffic.information_schema.sql_parts' is filtered out of capturing
2026-04-23 11:06:34,338 INFO  [.c.s.c.p.u.TableDiscoveryUtils] [main] - 	 'traffic.information_schema.sql_features' is filtered out of capturing
2026-04-23 11:06:34,338 INFO  [.c.s.c.p.u.TableDiscoveryUtils] [main] - 	 'traffic.information_schema.sql_implementation_info' is filtered out of capturing
2026-04-23 11:06:34,338 INFO  [.c.s.c.p.u.TableDiscoveryUtils] [main] - 	 'traffic.information_schema.sql_sizing' is filtered out of capturing
2026-04-23 11:06:34,362 INFO  [i.d.j.JdbcConnection          ] [pool-3-thread-1] - Connection gracefully closed
2026-04-23 11:06:34,400 INFO  [i.d.j.JdbcConnection          ] [pool-4-thread-1] - Connection gracefully closed
2026-04-23 11:06:34,455 INFO  [.a.s.c.c.b.u.CatalogTableUtils] [main] - Override primary key([id]) for catalog table traffic.public.users
2026-04-23 11:06:34,460 INFO  [i.d.j.JdbcConnection          ] [pool-5-thread-1] - Connection gracefully closed
2026-04-23 11:06:34,548 INFO  [o.a.k.c.j.JsonConverterConfig ] [main] - JsonConverterConfig values: 
	converter.type = value
	decimal.format = BASE64
	schemas.cache.size = 1000
	schemas.enable = true

2026-04-23 11:06:34,560 INFO  [o.a.s.a.t.f.FactoryUtil       ] [main] - get the CatalogTable from source Postgres-CDC: Postgres.traffic.public.users
2026-04-23 11:06:34,580 INFO  [.s.p.d.AbstractPluginDiscovery] [main] - Load SeaTunnelSource Plugin from /opt/apache-seatunnel-2.3.13/connectors
2026-04-23 11:06:34,585 INFO  [.s.p.d.AbstractPluginDiscovery] [main] - Discovery plugin jar for: PluginIdentifier{engineType='seatunnel', pluginType='source', pluginName='Postgres-CDC'} at: [file:/opt/apache-seatunnel-2.3.13/connectors/connector-cdc-base-2.3.13.jar, file:/opt/apache-seatunnel-2.3.13/connectors/connector-cdc-postgres-2.3.13.jar]
2026-04-23 11:06:34,586 INFO  [.s.p.d.AbstractPluginDiscovery] [main] - find connector jar and dependency for PluginIdentifier{engineType='seatunnel', pluginType='source', pluginName='Postgres-CDC'}: [file:/opt/apache-seatunnel-2.3.13/connectors/connector-cdc-base-2.3.13.jar, file:/opt/apache-seatunnel-2.3.13/connectors/connector-cdc-postgres-2.3.13.jar]
2026-04-23 11:06:34,587 INFO  [p.MultipleTableJobConfigParser] [main] - start generating all transforms.
2026-04-23 11:06:34,587 INFO  [p.MultipleTableJobConfigParser] [main] - start generating all sinks.
2026-04-23 11:06:34,589 INFO  [.s.p.d.AbstractPluginDiscovery] [main] - Load SeaTunnelSink Plugin from /opt/apache-seatunnel-2.3.13/connectors
2026-04-23 11:06:34,590 INFO  [.s.p.d.AbstractPluginDiscovery] [main] - Discovery plugin jar for: PluginIdentifier{engineType='seatunnel', pluginType='sink', pluginName='Jdbc'} at: [file:/opt/apache-seatunnel-2.3.13/connectors/connector-jdbc-2.3.13.jar]
2026-04-23 11:06:34,590 INFO  [.s.p.d.AbstractPluginDiscovery] [main] - find connector jar and dependency for PluginIdentifier{engineType='seatunnel', pluginType='sink', pluginName='Jdbc'}: [file:/opt/apache-seatunnel-2.3.13/connectors/connector-jdbc-2.3.13.jar]
2026-04-23 11:06:34,648 INFO  [o.a.s.a.t.f.FactoryUtil       ] [main] - Create sink 'Jdbc' with upstream input catalog-table[database: traffic, schema: public, table: users]
2026-04-23 11:06:34,650 WARN  [o.a.s.a.c.ReadonlyConfig      ] [main] - Please use the new key 'username' instead of the deprecated key 'user'.
2026-04-23 11:06:34,750 INFO  [o.a.s.e.c.j.ClientJobProxy    ] [main] - Start submit job, job id: 1099531731754549249, with plugin jar [file:/opt/apache-seatunnel-2.3.13/connectors/connector-jdbc-2.3.13.jar, file:/opt/apache-seatunnel-2.3.13/connectors/connector-cdc-postgres-2.3.13.jar, file:/opt/apache-seatunnel-2.3.13/connectors/connector-cdc-base-2.3.13.jar]
2026-04-23 11:06:34,810 INFO  [o.a.s.e.c.j.ClientJobProxy    ] [main] - Submit job finished, job id: 1099531731754549249, job name: SeaTunnel_Job
2026-04-23 11:06:34,852 WARN  [o.a.s.e.c.j.JobMetricsRunner  ] [job-metrics-runner-1099531731754549249] - Failed to get job metrics summary, it maybe first-run
2026-04-23 11:06:47,728 INFO  [o.a.s.e.c.j.ClientJobProxy    ] [main] - Job (1099531731754549249) end with state FAILED
2026-04-23 11:06:47,728 INFO  [c.h.c.LifecycleService        ] [main] - hz.client_1 [seatunnel] [5.1] HazelcastClient 5.1 (20220228 - 21f20e7) is SHUTTING_DOWN
2026-04-23 11:06:47,732 INFO  [.c.i.c.ClientConnectionManager] [main] - hz.client_1 [seatunnel] [5.1] Removed connection to endpoint: [localhost]:5801:59db9132-1c4f-4bbd-8620-7155fd9c6a33, connection: ClientConnection{alive=false, connectionId=1, channel=NioChannel{/127.0.0.1:58759->localhost/127.0.0.1:5801}, remoteAddress=[localhost]:5801, lastReadTime=2026-04-23 11:06:47.706, lastWriteTime=2026-04-23 11:06:42.882, closedTime=2026-04-23 11:06:47.730, connected server version=5.1}
2026-04-23 11:06:47,732 INFO  [c.h.c.LifecycleService        ] [main] - hz.client_1 [seatunnel] [5.1] HazelcastClient 5.1 (20220228 - 21f20e7) is CLIENT_DISCONNECTED
2026-04-23 11:06:47,735 INFO  [c.h.c.LifecycleService        ] [main] - hz.client_1 [seatunnel] [5.1] HazelcastClient 5.1 (20220228 - 21f20e7) is SHUTDOWN
2026-04-23 11:06:47,736 INFO  [s.c.s.s.c.ClientExecuteCommand] [main] - Closed SeaTunnel client......
2026-04-23 11:06:47,736 INFO  [s.c.s.s.c.ClientExecuteCommand] [main] - Closed metrics executor service ......
2026-04-23 11:06:47,736 ERROR [o.a.s.c.s.SeaTunnel           ] [main] - 

===============================================================================


2026-04-23 11:06:47,736 ERROR [o.a.s.c.s.SeaTunnel           ] [main] - Fatal Error, 

2026-04-23 11:06:47,736 ERROR [o.a.s.c.s.SeaTunnel           ] [main] - Please submit bug report in https://github.com/apache/seatunnel/issues

2026-04-23 11:06:47,736 ERROR [o.a.s.c.s.SeaTunnel           ] [main] - Reason:SeaTunnel job executed failed 

2026-04-23 11:06:47,737 ERROR [o.a.s.c.s.SeaTunnel           ] [main] - Exception StackTrace:org.apache.seatunnel.core.starter.exception.CommandExecuteException: SeaTunnel job executed failed
	at org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand.execute(ClientExecuteCommand.java:266)
	at org.apache.seatunnel.core.starter.SeaTunnel.run(SeaTunnel.java:40)
	at org.apache.seatunnel.core.starter.seatunnel.SeaTunnelClient.main(SeaTunnelClient.java:40)
Caused by: org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException: java.lang.RuntimeException: One or more fetchers have encountered exception
	at org.apache.seatunnel.connectors.seatunnel.common.source.reader.fetcher.SplitFetcherManager.checkErrors(SplitFetcherManager.java:147)
	at org.apache.seatunnel.connectors.seatunnel.common.source.reader.SourceReaderBase.getNextFetch(SourceReaderBase.java:167)
	at org.apache.seatunnel.connectors.seatunnel.common.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:93)
	at org.apache.seatunnel.connectors.cdc.base.source.reader.IncrementalSourceReader.pollNext(IncrementalSourceReader.java:119)
	at org.apache.seatunnel.engine.server.task.flow.SourceFlowLifeCycle.collect(SourceFlowLifeCycle.java:159)
	at org.apache.seatunnel.engine.server.task.SourceSeaTunnelTask.collect(SourceSeaTunnelTask.java:127)
	at org.apache.seatunnel.engine.server.task.SeaTunnelTask.stateProcess(SeaTunnelTask.java:165)
	at org.apache.seatunnel.engine.server.task.SourceSeaTunnelTask.call(SourceSeaTunnelTask.java:132)
	at org.apache.seatunnel.engine.server.TaskExecutionService$BlockingWorker.run(TaskExecutionService.java:683)
	at org.apache.seatunnel.engine.server.TaskExecutionService$NamedTaskWrapper.run(TaskExecutionService.java:1012)
	at org.apache.seatunnel.api.tracing.MDCRunnable.run(MDCRunnable.java:43)
	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: java.lang.RuntimeException: SplitFetcher thread 0 received unexpected exception while polling the records
	at org.apache.seatunnel.connectors.seatunnel.common.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:165)
	at org.apache.seatunnel.connectors.seatunnel.common.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:81)
	... 5 more
Caused by: java.io.IOException: Source fetch execution was fail
	at org.apache.seatunnel.connectors.seatunnel.common.source.reader.fetcher.FetchTask.run(FetchTask.java:73)
	at org.apache.seatunnel.connectors.seatunnel.common.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:162)
	... 6 more
Caused by: java.io.IOException: org.apache.seatunnel.common.utils.SeaTunnelException: Read split SnapshotSplit(tableId=traffic.public.users, splitKeyType=ROW<id INT>, splitStart=null, splitEnd=null, lowWatermark=null, highWatermark=null) error due to java.lang.NullPointerException.
	at org.apache.seatunnel.connectors.cdc.base.source.reader.IncrementalSourceSplitReader.fetch(IncrementalSourceSplitReader.java:94)
	at org.apache.seatunnel.connectors.seatunnel.common.source.reader.fetcher.FetchTask.run(FetchTask.java:54)
	... 7 more
Caused by: org.apache.seatunnel.common.utils.SeaTunnelException: Read split SnapshotSplit(tableId=traffic.public.users, splitKeyType=ROW<id INT>, splitStart=null, splitEnd=null, lowWatermark=null, highWatermark=null) error due to java.lang.NullPointerException.
	at org.apache.seatunnel.connectors.cdc.base.source.reader.external.IncrementalSourceScanFetcher.checkReadException(IncrementalSourceScanFetcher.java:216)
	at org.apache.seatunnel.connectors.cdc.base.source.reader.external.IncrementalSourceScanFetcher.pollSplitRecords(IncrementalSourceScanFetcher.java:117)
	at org.apache.seatunnel.connectors.cdc.base.source.reader.IncrementalSourceSplitReader.fetch(IncrementalSourceSplitReader.java:91)
	... 8 more
Caused by: io.debezium.DebeziumException: java.lang.NullPointerException
	at org.apache.seatunnel.connectors.seatunnel.cdc.postgres.source.reader.snapshot.PostgresSnapshotSplitReadTask.execute(PostgresSnapshotSplitReadTask.java:112)
	at org.apache.seatunnel.connectors.seatunnel.cdc.postgres.source.reader.snapshot.PostgresSnapshotFetchTask.execute(PostgresSnapshotFetchTask.java:65)
	at org.apache.seatunnel.connectors.cdc.base.source.reader.external.IncrementalSourceScanFetcher.lambda$submitTask$0(IncrementalSourceScanFetcher.java:96)
	... 5 more
Caused by: java.lang.NullPointerException
	at org.apache.seatunnel.connectors.seatunnel.cdc.postgres.source.reader.snapshot.PostgresSnapshotSplitReadTask.createDataEventsForTable(PostgresSnapshotSplitReadTask.java:183)
	at org.apache.seatunnel.connectors.seatunnel.cdc.postgres.source.reader.snapshot.PostgresSnapshotSplitReadTask.createDataEvents(PostgresSnapshotSplitReadTask.java:170)
	at org.apache.seatunnel.connectors.seatunnel.cdc.postgres.source.reader.snapshot.PostgresSnapshotSplitReadTask.doExecute(PostgresSnapshotSplitReadTask.java:136)
	at org.apache.seatunnel.connectors.seatunnel.cdc.postgres.source.reader.snapshot.PostgresSnapshotSplitReadTask.execute(PostgresSnapshotSplitReadTask.java:107)
	... 7 more

	at org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand.execute(ClientExecuteCommand.java:258)
	... 2 more
 
2026-04-23 11:06:47,742 ERROR [o.a.s.c.s.SeaTunnel           ] [main] - 
===============================================================================



Exception in thread "main" org.apache.seatunnel.core.starter.exception.CommandExecuteException: SeaTunnel job executed failed
	at org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand.execute(ClientExecuteCommand.java:266)
	at org.apache.seatunnel.core.starter.SeaTunnel.run(SeaTunnel.java:40)
	at org.apache.seatunnel.core.starter.seatunnel.SeaTunnelClient.main(SeaTunnelClient.java:40)
Caused by: org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException: java.lang.RuntimeException: One or more fetchers have encountered exception
	at org.apache.seatunnel.connectors.seatunnel.common.source.reader.fetcher.SplitFetcherManager.checkErrors(SplitFetcherManager.java:147)
	at org.apache.seatunnel.connectors.seatunnel.common.source.reader.SourceReaderBase.getNextFetch(SourceReaderBase.java:167)
	at org.apache.seatunnel.connectors.seatunnel.common.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:93)
	at org.apache.seatunnel.connectors.cdc.base.source.reader.IncrementalSourceReader.pollNext(IncrementalSourceReader.java:119)
	at org.apache.seatunnel.engine.server.task.flow.SourceFlowLifeCycle.collect(SourceFlowLifeCycle.java:159)
	at org.apache.seatunnel.engine.server.task.SourceSeaTunnelTask.collect(SourceSeaTunnelTask.java:127)
	at org.apache.seatunnel.engine.server.task.SeaTunnelTask.stateProcess(SeaTunnelTask.java:165)
	at org.apache.seatunnel.engine.server.task.SourceSeaTunnelTask.call(SourceSeaTunnelTask.java:132)
	at org.apache.seatunnel.engine.server.TaskExecutionService$BlockingWorker.run(TaskExecutionService.java:683)
	at org.apache.seatunnel.engine.server.TaskExecutionService$NamedTaskWrapper.run(TaskExecutionService.java:1012)
	at org.apache.seatunnel.api.tracing.MDCRunnable.run(MDCRunnable.java:43)
	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: java.lang.RuntimeException: SplitFetcher thread 0 received unexpected exception while polling the records
	at org.apache.seatunnel.connectors.seatunnel.common.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:165)
	at org.apache.seatunnel.connectors.seatunnel.common.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:81)
	... 5 more
Caused by: java.io.IOException: Source fetch execution was fail
	at org.apache.seatunnel.connectors.seatunnel.common.source.reader.fetcher.FetchTask.run(FetchTask.java:73)
	at org.apache.seatunnel.connectors.seatunnel.common.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:162)
	... 6 more
Caused by: java.io.IOException: org.apache.seatunnel.common.utils.SeaTunnelException: Read split SnapshotSplit(tableId=traffic.public.users, splitKeyType=ROW<id INT>, splitStart=null, splitEnd=null, lowWatermark=null, highWatermark=null) error due to java.lang.NullPointerException.
	at org.apache.seatunnel.connectors.cdc.base.source.reader.IncrementalSourceSplitReader.fetch(IncrementalSourceSplitReader.java:94)
	at org.apache.seatunnel.connectors.seatunnel.common.source.reader.fetcher.FetchTask.run(FetchTask.java:54)
	... 7 more
Caused by: org.apache.seatunnel.common.utils.SeaTunnelException: Read split SnapshotSplit(tableId=traffic.public.users, splitKeyType=ROW<id INT>, splitStart=null, splitEnd=null, lowWatermark=null, highWatermark=null) error due to java.lang.NullPointerException.
	at org.apache.seatunnel.connectors.cdc.base.source.reader.external.IncrementalSourceScanFetcher.checkReadException(IncrementalSourceScanFetcher.java:216)
	at org.apache.seatunnel.connectors.cdc.base.source.reader.external.IncrementalSourceScanFetcher.pollSplitRecords(IncrementalSourceScanFetcher.java:117)
	at org.apache.seatunnel.connectors.cdc.base.source.reader.IncrementalSourceSplitReader.fetch(IncrementalSourceSplitReader.java:91)
	... 8 more
Caused by: io.debezium.DebeziumException: java.lang.NullPointerException
	at org.apache.seatunnel.connectors.seatunnel.cdc.postgres.source.reader.snapshot.PostgresSnapshotSplitReadTask.execute(PostgresSnapshotSplitReadTask.java:112)
	at org.apache.seatunnel.connectors.seatunnel.cdc.postgres.source.reader.snapshot.PostgresSnapshotFetchTask.execute(PostgresSnapshotFetchTask.java:65)
	at org.apache.seatunnel.connectors.cdc.base.source.reader.external.IncrementalSourceScanFetcher.lambda$submitTask$0(IncrementalSourceScanFetcher.java:96)
	... 5 more
Caused by: java.lang.NullPointerException
	at org.apache.seatunnel.connectors.seatunnel.cdc.postgres.source.reader.snapshot.PostgresSnapshotSplitReadTask.createDataEventsForTable(PostgresSnapshotSplitReadTask.java:183)
	at org.apache.seatunnel.connectors.seatunnel.cdc.postgres.source.reader.snapshot.PostgresSnapshotSplitReadTask.createDataEvents(PostgresSnapshotSplitReadTask.java:170)
	at org.apache.seatunnel.connectors.seatunnel.cdc.postgres.source.reader.snapshot.PostgresSnapshotSplitReadTask.doExecute(PostgresSnapshotSplitReadTask.java:136)
	at org.apache.seatunnel.connectors.seatunnel.cdc.postgres.source.reader.snapshot.PostgresSnapshotSplitReadTask.execute(PostgresSnapshotSplitReadTask.java:107)
	... 7 more

	at org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand.execute(ClientExecuteCommand.java:258)
	... 2 more
2026-04-23 11:06:47,746 INFO  [s.c.s.s.c.ClientExecuteCommand] [SeaTunnel-CompletableFuture-Thread-0] - run shutdown hook because get close signal
[root@localhost apache-seatunnel-2.3.13]# 

根据错误,需要分布完成:

第一步:先跑 增量(latest)

第二步:跑成功后,再做全量同步

「喜欢这篇文章,您的关注和赞赏是给作者最好的鼓励」
关注作者
【版权声明】本文为墨天轮用户原创内容,转载时必须标注文章的来源(墨天轮),文章链接,文章作者等基本信息,否则作者和墨天轮有权追究责任。如果您发现墨天轮中有涉嫌抄袭或者侵权的内容,欢迎发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论