暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

【DB宝88】PG逻辑复制插件之pglogical使用说明

DB宝 2022-01-22
2374

简介

参考:

https://gitee.com/mirrors/pglogical

https://github.com/2ndQuadrant/pglogical

https://www.xmmup.com/pgluojifuzhichajianzhipglogicalguanfangshuoming.html

Logical Replication extension for PostgreSQL 14、13, 12, 11, 10, 9.6, 9.5, 9.4 (Postgres), providing much faster replication than Slony, Bucardo or Londiste, as well as cross-version upgrades.

pglogical 是 PostgreSQL 的拓展模块, 为 PostgreSQL 数据库提供了逻辑流复制发布和订阅的功能。pglogical 重用了 BDR 项目中的一部分相关技术。pglogical 是一个完全作为PostgreSQL 扩展实现的逻辑复制系统。完全集成,它不需要触发器或外部程序。这种物理复制的替代方法是使用发布/订阅模型复制数据以进行选择性复制的一种高效方法。支持 PG14、13、12、11、10、9.6、9.5、9.4 ,提供比 Slony、Bucardo 或 Londiste 更快的复制速度,以及跨版本升级。

我们使用的下列术语来描述节点和数据流之间的关系,重用了一些早期的Slony技术中的术语:

  • 节点 - PostgreSQL数据库实例

  • 发布者和订阅者 - 节点的角色名称

  • 复制集 - 关系表的集合

pglogical是新技术组件,使用了最新的PostgreSQL 数据库中的一些核心功能,所以存在一些数据库版本限制:

  • 数据源发布和订阅节点需要运行 PostgreSQL 9.4 +

  • 复制源过滤和冲突检测需要 PostgreSQL 9.5 +

支持的使用场景:

  • 主版本数据库之间的升级(存在上述的版本限制)

  • 完整的数据库复制

  • 利用复制集,选择性的筛选的关系表

  • 可从多个上游服务器,做数据的聚集和合并

架构上的细节︰

  • pglogical 工作在每个数据库层面上,而不是像物理流复制一样工作在整个数据库集簇实例级别

  • 一个发布程序提供给多个订阅者不会引起额外的磁盘写开销

  • 一个订阅服务器可以从几个起源的更改合并和检测与自动和可配置冲突决议 (一些,但并不是所有方面所需的多主机)的更改之间的冲突。

  • 级联复制是在变更集转发的过程中实现的。

必要条件

  • 要使用pglogical,提供发布和订阅服务器必须运行PostgreSQL 9.4或更高版本。

  • pglogical扩展必须同时安装在提供发布和订阅服务器上。您必须同时创建扩展“CREATE EXTENSION pglogical;”。

  • 提供发布和订阅服务器上的表必须具有相同的名称并位于相同的schema中。将来的修订版可能会添加映射功能。

  • 提供发布和订阅服务器上的表必须具有相同的列,每列中的数据类型相同。订阅服务器上的CHECK约束、NOT NULL约束等必须与提供发布相同或较弱(更宽松)。

  • 表必须具有相同的主键。不建议添加主键以外的其他唯一约束。

安装插件

安装包安装

yum安装

1、安装yum源:

  • PostgreSQL 9.4: curl https://techsupport.enterprisedb.com/api/repository/dl/default/release/9.4/rpm | bash

  • PostgreSQL 9.5: curl https://techsupport.enterprisedb.com/api/repository/dl/default/release/9.5/rpm | bash

  • PostgreSQL 9.6: curl https://techsupport.enterprisedb.com/api/repository/dl/default/release/9.6/rpm | bash

  • PostgreSQL 10: curl https://techsupport.enterprisedb.com/api/repository/dl/default/release/10/rpm | bash

  • PostgreSQL 11: curl https://techsupport.enterprisedb.com/api/repository/dl/default/release/11/rpm | bash

  • PostgreSQL 12: curl https://techsupport.enterprisedb.com/api/repository/dl/default/release/12/rpm | bash

  • PostgreSQL 13: curl https://techsupport.enterprisedb.com/api/repository/dl/default/release/13/rpm | bash

  • PostgreSQL 14: curl https://techsupport.enterprisedb.com/api/repository/dl/default/release/14/rpm | bash

2、安装插件

  • PostgreSQL 9.4: yum install postgresql94-pglogical

  • PostgreSQL 9.5: yum install postgresql95-pglogical

  • PostgreSQL 9.6: yum install postgresql96-pglogical

  • PostgreSQL 10: yum install postgresql10-pglogical

  • PostgreSQL 11: yum install postgresql11-pglogical

  • PostgreSQL 12: yum install postgresql12-pglogical

  • PostgreSQL 13: yum install postgresql13-pglogical

  • PostgreSQL 14: yum install postgresql14-pglogical

APT安装

1、安装源:

1curl https://techsupport.enterprisedb.com/api/repository/dl/default/release/deb | bash

2、安装插件

  • PostgreSQL 9.4: sudo apt-get install postgresql-9.4-pglogical

  • PostgreSQL 9.5: sudo apt-get install postgresql-9.5-pglogical

  • PostgreSQL 9.6: sudo apt-get install postgresql-9.6-pglogical

  • PostgreSQL 10: sudo apt-get install postgresql-10-pglogical

  • PostgreSQL 11: sudo apt-get install postgresql-11-pglogical

  • PostgreSQL 12: sudo apt-get install postgresql-12-pglogical

  • PostgreSQL 13: sudo apt-get install postgresql-13-pglogical

  • PostgreSQL 14: sudo apt-get install postgresql-14-pglogical

编译安装

https://github.com/2ndQuadrant/pglogical/releases

Source code installs are the same as for any other PostgreSQL extension built using PGXS.

Make sure the directory containing pg_config
from the PostgreSQL release is listed in your PATH
environment variable. You might have to install a -dev
or -devel
package for your PostgreSQL release from your package manager if you don't have pg_config
.

Then run make
to compile, and make install
to install. You might need to use sudo
for the install step.

1wget https://codeload.github.com/2ndQuadrant/pglogical/tar.gz/refs/tags/REL2_4_1 -O pglogical-REL2_4_1.tar.gz
2
3tar -zxvf pglogical-REL2_4_1.tar.gz 
4cd pglogical-REL2_4_1
5which pg_config
6
7USE_PGXS=1 make clean
8USE_PGXS=1 make
9USE_PGXS=1 make install

安装完之后,查看:

 1postgres=# select * from pg_available_extensions where name='pglogical';
2   name    | default_version | installed_version |            comment             
3-----------+-----------------+-------------------+--------------------------------
4 pglogical | 2.4.1           |                   | PostgreSQL Logical Replication
5(1 row)
6postgres=# \dx
7                                            List of installed extensions
8        Name        | Version |   Schema   |                              Description                               
9--------------------+---------+------------+------------------------------------------------------------------------
10 pageinspect        | 1.8     | public     | inspect the contents of database pages at a low level
11 pg_recovery        | 1.0     | public     | recovery table data of update/delete/rollback rows and drop columns
12 pg_stat_statements | 1.8     | public     | track planning and execution statistics of all SQL statements executed
13 plpgsql            | 1.0     | pg_catalog | PL/pgSQL procedural language
14(4 rows)

使用配置

配置参数

首先 PostgreSQL服务器必须正确配置才能够支持逻辑解码︰

1wal_level = 'logical'
2# one per database needed on (provider/subscriber)provider node
3max_worker_processes = 10  
4# one per node needed on provider node
5max_replication_slots = 10  
6# one per node needed on provider node
7max_wal_senders = 10 
8shared_preload_libraries = 'pglogical'

配置:

1alter system set shared_preload_libraries = 'pglogical';
2alter system set wal_level = 'logical';
3select pg_reload_conf();
4
5-- 由于都是postmaster类型的参数,所以需要重启库

操作过程:

 1postgres=# select * from pg_settings where name in ('wal_level','max_worker_processes','max_replication_slots','max_wal_senders','shared_preload_libraries');
2           name           | setting | unit |                        category                        |                               short_desc                                | extra_desc |  context   | vartype | source  | min_val | max_val |         enumvals          | boot_val | reset_val | sourcefile | sourceline | pending_restart
3--------------------------+---------+------+--------------------------------------------------------+-------------------------------------------------------------------------+------------+------------+---------+---------+---------+---------+---------------------------+----------+-----------+------------+------------+-----------------
4 max_replication_slots    | 10      |      | Replication / Sending Servers                          | Sets the maximum number of simultaneously defined replication slots.    |            | postmaster | integer | default | 0       | 262143  |                           | 10       | 10        |            |            | f
5 max_wal_senders          | 10      |      | Replication / Sending Servers                          | Sets the maximum number of simultaneously running WAL sender processes. |            | postmaster | integer | default | 0       | 262143  |                           | 10       | 10        |            |            | f
6 max_worker_processes     | 8       |      | Resource Usage / Asynchronous Behavior                 | Maximum number of concurrent worker processes.                          |            | postmaster | integer | default | 0       | 262143  |                           | 8        | 8         |            |            | f
7 shared_preload_libraries |         |      | Client Connection Defaults / Shared Library Preloading | Lists shared libraries to preload into server.                          |            | postmaster | string  | default |         |         |                           |          |           |            |            | f
8 wal_level                | replica |      | Write-Ahead Log / Settings                             | Set the level of information written to the WAL.                        |            | postmaster | enum    | default |         |         | {minimal,replica,logical} | replica  | replica   |            |            | f
9(5 rows)
10
11
12postgres=# alter system set shared_preload_libraries = 'pglogical';
13ALTER SYSTEM
14postgres=# alter system set wal_level = 'logical';
15ALTER SYSTEM
16postgres=# select * from pg_settings where name in ('wal_level','max_worker_processes','max_replication_slots','max_wal_senders','shared_preload_libraries');
17           name           | setting | unit |                        category                        |                               short_desc                                | extra_desc |  context   | vartype | source  | min_val | max_val |         enumvals          | boot_val | reset_val | sourcefile | sourceline | pending_restart
18--------------------------+---------+------+--------------------------------------------------------+-------------------------------------------------------------------------+------------+------------+---------+---------+---------+---------+---------------------------+----------+-----------+------------+------------+-----------------
19 max_replication_slots    | 10      |      | Replication / Sending Servers                          | Sets the maximum number of simultaneously defined replication slots.    |            | postmaster | integer | default | 0       | 262143  |                           | 10       | 10        |            |            | f
20 max_wal_senders          | 10      |      | Replication / Sending Servers                          | Sets the maximum number of simultaneously running WAL sender processes. |            | postmaster | integer | default | 0       | 262143  |                           | 10       | 10        |            |            | f
21 max_worker_processes     | 8       |      | Resource Usage / Asynchronous Behavior                 | Maximum number of concurrent worker processes.                          |            | postmaster | integer | default | 0       | 262143  |                           | 8        | 8         |            |            | f
22 shared_preload_libraries |         |      | Client Connection Defaults / Shared Library Preloading | Lists shared libraries to preload into server.                          |            | postmaster | string  | default |         |         |                           |          |           |            |            | f
23 wal_level                | replica |      | Write-Ahead Log / Settings                             | Set the level of information written to the WAL.                        |            | postmaster | enum    | default |         |         | {minimal,replica,logical} | replica  | replica   |            |            | f
24(5 rows)
25
26
27postgres=# select pg_reload_conf();
28 pg_reload_conf
29----------------
30 t
31(1 row)
32
33
34
35
36postgres=# select * from pg_settings where name in ('wal_level','max_worker_processes','max_replication_slots','max_wal_senders','shared_preload_libraries');
37           name           | setting | unit |                        category                        |                               short_desc                                | extra_desc |  context   | vartype | source  | min_val | max_val |         enumvals          | boot_val | reset_val | sourcefile | sourceline | pending_restart
38--------------------------+---------+------+--------------------------------------------------------+-------------------------------------------------------------------------+------------+------------+---------+---------+---------+---------+---------------------------+----------+-----------+------------+------------+-----------------
39 max_replication_slots    | 10      |      | Replication / Sending Servers                          | Sets the maximum number of simultaneously defined replication slots.    |            | postmaster | integer | default | 0       | 262143  |                           | 10       | 10        |            |            | f
40 max_wal_senders          | 10      |      | Replication / Sending Servers                          | Sets the maximum number of simultaneously running WAL sender processes. |            | postmaster | integer | default | 0       | 262143  |                           | 10       | 10        |            |            | f
41 max_worker_processes     | 8       |      | Resource Usage / Asynchronous Behavior                 | Maximum number of concurrent worker processes.                          |            | postmaster | integer | default | 0       | 262143  |                           | 8        | 8         |            |            | f
42 shared_preload_libraries |         |      | Client Connection Defaults / Shared Library Preloading | Lists shared libraries to preload into server.                          |            | postmaster | string  | default |         |         |                           |          |           |            |            | t
43 wal_level                | replica |      | Write-Ahead Log / Settings                             | Set the level of information written to the WAL.                        |            | postmaster | enum    | default |         |         | {minimal,replica,logical} | replica  | replica   |            |            | t
44(5 rows)
45
46
47[pg13@lhrpgall ~]$ pg_ctl restart
48waiting for server to shut down.... done
49server stopped
50waiting for server to start....2022-01-14 11:06:07.103 CST [1272LOG:  redirecting log output to logging collector process
512022-01-14 11:06:07.103 CST [1272] HINT:  Future log output will appear in directory "pg_log".
52 done
53server started
54
55postgres=# select * from pg_settings where name in ('wal_level','max_worker_processes','max_replication_slots','max_wal_senders','shared_preload_libraries');
56           name           |  setting  | unit |                        category                        |                               short_desc                                | extra_desc |  context   | vartype |       source       | min_val | max_val |         enumvals          | boot_val | reset_val |            sourcefile             | sourceline | pending_restart
57--------------------------+-----------+------+--------------------------------------------------------+-------------------------------------------------------------------------+------------+------------+---------+--------------------+---------+---------+---------------------------+----------+-----------+-----------------------------------+------------+-----------------
58 max_replication_slots    | 10        |      | Replication / Sending Servers                          | Sets the maximum number of simultaneously defined replication slots.    |            | postmaster | integer | default            | 0       | 262143  |                           | 10       | 10        |                                   |            | f
59 max_wal_senders          | 10        |      | Replication / Sending Servers                          | Sets the maximum number of simultaneously running WAL sender processes. |            | postmaster | integer | default            | 0       | 262143  |                           | 10       | 10        |                                   |            | f
60 max_worker_processes     | 8         |      | Resource Usage / Asynchronous Behavior                 | Maximum number of concurrent worker processes.                          |            | postmaster | integer | default            | 0       | 262143  |                           | 8        | 8         |                                   |            | f
61 shared_preload_libraries | pglogical |      | Client Connection Defaults / Shared Library Preloading | Lists shared libraries to preload into server.                          |            | postmaster | string  | configuration file |         |         |                           |          | pglogical | /pg13/pgdata/postgresql.auto.conf |          3 | f
62 wal_level                | logical   |      | Write-Ahead Log / Settings                             | Set the level of information written to the WAL.                        |            | postmaster | enum    | configuration file |         |         | {minimal,replica,logical} | replica  | logical   | /pg13/pgdata/postgresql.auto.conf |          4 | f
63(5 rows)

如果你想要处理解决与上一次/第一次更新之间的冲突 wins(参阅冲突章节), 你的数据库版本需要为PostgreSQL 9.5+ (在9.4中无效) 您可以向 PostgreSQL.conf 添加此额外的选项:

1track_commit_timestamp = on # needed for last/first update wins conflict resolution
2                            # property available in PostgreSQL 9.5+

配置pg_hba.conf

pg_hba.conf 需要配置成允许从本地主机复制,用户拥有复制权限,连接权限。

1host    replication     postgres        网段ip/24           trust

创建扩展

 1postgres=# \dx
2                                            List of installed extensions
3        Name        | Version |   Schema   |                              Description
4--------------------+---------+------------+------------------------------------------------------------------------
5 pageinspect        | 1.8     | public     | inspect the contents of database pages at a low level
6 pg_recovery        | 1.0     | public     | recovery table data of update/delete/rollback rows and drop columns
7 pg_stat_statements | 1.8     | public     | track planning and execution statistics of all SQL statements executed
8 plpgsql            | 1.0     | pg_catalog | PL/pgSQL procedural language
9(4 rows)
10
11
12postgres=# create EXTENSION pglogical;
13CREATE EXTENSION
14postgres=# \dx
15                                            List of installed extensions
16        Name        | Version |   Schema   |                              Description
17--------------------+---------+------------+------------------------------------------------------------------------
18 pageinspect        | 1.8     | public     | inspect the contents of database pages at a low level
19 pg_recovery        | 1.0     | public     | recovery table data of update/delete/rollback rows and drop columns
20 pg_stat_statements | 1.8     | public     | track planning and execution statistics of all SQL statements executed
21 pglogical          | 2.4.1   | pglogical  | PostgreSQL Logical Replication
22 plpgsql            | 1.0     | pg_catalog | PL/pgSQL procedural language
23(5 rows)
24
25postgres=# \dx+ pglogical
26                                 Objects in extension "pglogical"
27                                        Object description
28--------------------------------------------------------------------------------------------------
29 function pglogical.alter_node_add_interface(name,name,text)
30 function pglogical.alter_node_drop_interface(name,name)
31 function pglogical.alter_replication_set(name,boolean,boolean,boolean,boolean)
32 function pglogical.alter_subscription_add_replication_set(name,name)
33 function pglogical.alter_subscription_disable(name,boolean)
34 function pglogical.alter_subscription_enable(name,boolean)
35 function pglogical.alter_subscription_interface(name,name)
36 function pglogical.alter_subscription_remove_replication_set(name,name)
37 function pglogical.alter_subscription_resynchronize_table(name,regclass,boolean)
38 function pglogical.alter_subscription_synchronize(name,boolean)
39 function pglogical.create_node(name,text)
40 function pglogical.create_replication_set(name,boolean,boolean,boolean,boolean)
41 function pglogical.create_subscription(name,text,text[],boolean,boolean,text[],interval,boolean)
42 function pglogical.drop_node(name,boolean)
43 function pglogical.drop_replication_set(name,boolean)
44 function pglogical.drop_subscription(name,boolean)
45 function pglogical.pglogical_gen_slot_name(name,name,name)
46 function pglogical.pglogical_max_proto_version()
47 function pglogical.pglogical_min_proto_version()
48 function pglogical.pglogical_node_info()
49 function pglogical.pglogical_version()
50 function pglogical.pglogical_version_num()
51 function pglogical.queue_truncate()
52 function pglogical.replicate_ddl_command(text,text[])
53 function pglogical.replication_set_add_all_sequences(name,text[],boolean)
54 function pglogical.replication_set_add_all_tables(name,text[],boolean)
55 function pglogical.replication_set_add_sequence(name,regclass,boolean)
56 function pglogical.replication_set_add_table(name,regclass,boolean,text[],text)
57 function pglogical.replication_set_remove_sequence(name,regclass)
58 function pglogical.replication_set_remove_table(name,regclass)
59 function pglogical.show_repset_table_info(regclass,text[])
60 function pglogical.show_subscription_status(name)
61 function pglogical.show_subscription_table(name,regclass)
62 function pglogical.synchronize_sequence(regclass)
63 function pglogical.table_data_filtered(anyelement,regclass,text[])
64 function pglogical.wait_for_subscription_sync_complete(name)
65 function pglogical.wait_for_table_sync_complete(name,regclass)
66 function pglogical.wait_slot_confirm_lsn(name,pg_lsn)
67 function pglogical.xact_commit_timestamp_origin(xid)
68 table pglogical.depend
69 table pglogical.local_node
70 table pglogical.local_sync_status
71 table pglogical.node
72 table pglogical.node_interface
73 table pglogical.queue
74 table pglogical.replication_set
75 table pglogical.replication_set_seq
76 table pglogical.replication_set_table
77 table pglogical.sequence_state
78 table pglogical.subscription
79 view pglogical.tables
80(51 rows)

其它

数据字典

 1select * from pglogical.depend ;
2select * from pglogical.local_node ;
3select * from pglogical.local_sync_status ;
4select * from pglogical.node ;
5select * from pglogical.node_interface ;
6select * from pglogical.queue ;
7select * from pglogical.replication_set ;
8select * from pglogical.replication_set_seq ;
9select * from pglogical.replication_set_table ;
10select * from pglogical.sequence_state ;
11select * from pglogical.subscription ;
12select * from pglogical.tables ;
13
14select * from pglogical.show_subscription_status();

复制集

在复制集default中:update/delete/truncate 操作也是同步复制。

复制集INSERTUPDATEDELETETRUNCATE
default
default_insert_only××
ddl_sql×××
1lhrdb=# select * from pglogical.replication_set ;
2   set_id   | set_nodeid |      set_name       | replicate_insert | replicate_update | replicate_delete | replicate_truncate
3------------+------------+---------------------+------------------+------------------+------------------+--------------------
4 3808633458 |  345938905 | default             | t                | t                | t                | t
5 1988720057 |  345938905 | default_insert_only | t                | f                | f                | t
6  742713876 |  345938905 | ddl_sql             | t                | f                | f                | f
7(3 rows)

复制特性扩展

延迟复制

1pglogical.create_subscription(subscription_name name, provider_dsn text, replication_sets text[], synchronize_structure boolean, synchronize_data boolean, forward_origins text[], apply_delay interval) 

参数:

  • subscription_name - 订阅的名称,必须是唯一的

  • provider_dsn - 提供者的连接字符串

  • replication_sets - 要订阅的复制集数组,这些必须已存在,默认为“{default,default_insert_only,ddl_sql}”

  • synchronize_structure - 指定是否将提供者与订阅者之间的结构同步,默认为false

  • synchronize_data - 指定是否将数据从提供者同步到订阅者,默认为true

  • forward_origins - 要转发的原始名称数组,当前只支持的值是空数组,意味着不转发任何不是源自提供者节点的更改,或“{all}”这意味着复制所有更改,无论它们的来源是什么,默认是全部}”

  • apply_delay - 延迟复制多少,默认为0秒

示例:数据表结构同步;且延迟复制1分钟

1SELECT pglogical.create_subscription(
2subscription_name := 'subscription1',
3provider_dsn := 'host=192.168.1.221 port=5432 dbname=lhrdb',
4synchronize_structure := true,
5apply_delay := '00:01:00'::interval
6);

对源端进行 行/列 过滤

过滤机制需要 PostgreSQL 9.5 +

1pglogical.replication_set_add_table(set_name name, relation regclass, synchronize_data boolean, columns text [],row_filter text)

参数:

  • set_name - 现有复制集的名称

  • relation - 要添加到集合中的表的名称或OID

  • synchronize_data - 如果为true,则表数据将在订阅给定复制集的所有订户上同步,默认为false

  • columns - 要复制的列的列表。通常,当应复制所有列时,这将设置为NULL,这是默认值

  • row_filter - 行过滤表达式,默认为NULL(无过滤),有关详细信息,请参阅(行过滤)。警告:在使用有效行筛选器同步数据时要小心。使用synchronize_data=true有效row_filter就像对表的一次性操作。使用修改后再次执行它将row_filter不会将数据同步到订户。订阅者可能需要pglogical.alter_subscription_resynchronize_table()来修复它。

**示例:对表tbl_lottu02中字段{id, name, job} 字段列过滤;且对条件 ‘id > 10’ 进行行过滤 **

 1# provider 节点 创建表并插入测试数据
2create table tbl_lottu02 (id int primary keyname text, job text, reg_time timestamp );
3insert into tbl_lottu02 select generate_series(1,20id,'lottu'||generate_series(1,20),'pg'now();
4
5# subscriber节点创建表; 可以只创建复制的列的数据表
6create table tbl_lottu02 (id int primary keyname text, job text, reg_time timestamp );
7# or
8create table tbl_lottu02 (id int primary keyname text, job text);
9
10#provider 节点 将表加入复制集中;并同步记录
11lottu=# select pglogical.replication_set_add_table(set_name := 'default', relation := 'tbl_lottu02', synchronize_data := true, columns := '{id, name, job}',row_filter := 'id < 10');
12 replication_set_add_table 
13---------------------------
14 t
15(1 row)
16
17# subscriber节点查看表tbl_lottu02记录
18lottu=# select * from tbl_lottu02;
19 id |  name  | job 
20----+--------+-----
21  1 | lhrdb1 | pg
22  2 | lhrdb2 | pg
23  3 | lhrdb3 | pg
24  4 | lhrdb4 | pg
25  5 | lhrdb5 | pg
26  6 | lhrdb6 | pg
27  7 | lhrdb7 | pg
28  8 | lhrdb8 | pg
29  9 | lhrdb9 | pg
30(9 rows)

为新表自动分配复制集

事件触发器工具可用于描述为新创建的表定义复制集的规则。

 1CREATE OR REPLACE FUNCTION pglogical_assign_repset()
2RETURNS event_trigger AS $$
3DECLARE obj record;
4BEGIN
5    FOR obj IN SELECT * FROM pg_event_trigger_ddl_commands()
6    LOOP
7        IF obj.object_type = 'table' THEN
8            IF obj.schema_name = 'config' THEN
9                PERFORM pglogical.replication_set_add_table('configuration', obj.objid);
10            ELSIF NOT obj.in_extension THEN
11                PERFORM pglogical.replication_set_add_table('default', obj.objid);
12            END IF;
13        END IF;
14    END LOOP;
15END;
16$$ LANGUAGE plpgsql;
17
18CREATE EVENT TRIGGER pglogical_assign_repset_trg
19    ON ddl_command_end
20    WHEN TAG IN ('CREATE TABLE''CREATE TABLE AS')
21    EXECUTE PROCEDURE pglogical_assign_repset();

冲突检测

冲突检测需要 PostgreSQL 9.5 +
如果节点订阅多个提供程序,或当本地写入在订阅服务器上发生,可能会发生冲突,尤其是对传入的变化。这些都自动检测,并可以就此采取行动取决于配置。
解决冲突的办法是通过配置 pglogical.conflict_resolution 参数。
pglogical.conflict_resolution 支持的配置参数选项为︰

  • error - 复制将停止上错误如果检测到冲突和手动操作需要解决

  • apply_remote - 总是应用与本地数据有冲突的更改,这是默认值

  • keep_local - 保留数据的本地版本,并忽略来自远程节点相互冲突的更改

  • last_update_wins - 时间戳为提交最新的版本(newest commit timestamp)的数据将会被保存(这可以是本地或远程版本)

  • first_update_wins - 时间戳为最旧的版本(oldest timestamp)的数据将会被保存(这可以是本地或远程版本)

当参数track_commit_timestamp被禁用时,唯一允许的配置值是 apply_remote。PostgreSQL 9.4 不支持 track_commit_timestamp 配置参数只能配置参数apply_remote(该参数是默认值)。

 1# 在 订阅者 节点配置;我们保留最新的数据
2track_commit_timestamp = on
3pglogical.conflict_resolution = 'last_update_wins'
4
5# 在 订阅者 节点创建测试表tbl_lhrdb03
6lottu=# create table tbl_lhrdb03(id int primary key, name text);
7CREATE TABLE
8lottu=# insert into tbl_lhrdb03 values (1001,'subscriber');
9INSERT 0 1
10
11# 在 发布者 节点 创建测试表
12create table tbl_lhrdb03(id int primary keyname text);
13select pglogical.replication_set_add_table( set_name := 'default', relation := 'tbl_lhrdb03',synchronize_data := true);
14insert into tbl_lhrdb03 values (1001,'provider');
15
16# 在 订阅者 节点 查看数据
17lottu=# select * from tbl_lottu03;
18  id  |   name   
19------+----------
20 1001 | provider

后记: 在订阅者的表需要主键约束;不然检测不到冲突;是否需要主键约束当然这个也是根据需求而定。

示例一:1个发布端,2个订阅端

现有实验环境

数据库版本操作系统IP数据库角色
PostgreSQL 13.4Debian GNU/Linux 11172.72.6.30lhrdbprovider
PostgreSQL 13.4Debian GNU/Linux 11172.72.6.31lhrdbsubscriber
PostgreSQL 12.8Debian GNU/Linux 11172.72.6.32lhrdbsubscriber

环境准备

  1-- 创建专用网络
2docker network create --subnet=172.72.6.0/24 pg-network
3
4
5-- PG13
6docker rm -f lhrpg30
7docker run -d --name lhrpg30 -h lhrpg30 \
8   -p 64330:5432 --net=pg-network --ip 172.72.6.30 \
9   -e POSTGRES_PASSWORD=lhr \
10   -e TZ=Asia/Shanghai \
11   postgres:13.4
12
13
14docker rm -f lhrpg31
15docker run -d --name lhrpg31 -h lhrpg31 \
16   -p 64331:5432 --net=pg-network --ip 172.72.6.31 \
17   -e POSTGRES_PASSWORD=lhr \
18   -e TZ=Asia/Shanghai \
19   postgres:13.4
20
21
22-- PG12
23docker rm -f lhrpg32
24docker run -d --name lhrpg32 -h lhrpg32 \
25   -p 64332:5432 --net=pg-network --ip 172.72.6.32 \
26   -e POSTGRES_PASSWORD=lhr \
27   -e TZ=Asia/Shanghai \
28   postgres:12.8
29
30
31-- 安装插件
32-- PG12
33apt update
34apt install -y curl
35curl https://techsupport.enterprisedb.com/api/repository/dl/default/release/deb | bash
36apt-get install postgresql-12-pglogical
37
38-- PG13
39apt update
40apt install -y curl
41curl https://techsupport.enterprisedb.com/api/repository/dl/default/release/deb | bash
42apt-get install postgresql-13-pglogical
43
44-- 常用包
45cp /etc/apt/sources.list /etc/apt/sources.list_bk
46
47cat > /etc/apt/sources.list <<"EOF"
48deb http://mirrors.aliyun.com/debian/ buster main non-free contrib
49deb http://mirrors.aliyun.com/debian-security buster/updates main
50deb http://mirrors.aliyun.com/debian/ buster-updates main non-free contrib
51deb http://mirrors.aliyun.com/debian/ buster-backports main non-free contrib
52
53deb-src http://mirrors.aliyun.com/debian-security buster/updates main
54deb-src http://mirrors.aliyun.com/debian/ buster main non-free contrib
55deb-src http://mirrors.aliyun.com/debian/ buster-updates main non-free contrib
56deb-src http://mirrors.aliyun.com/debian/ buster-backports main non-free contrib
57EOF
58
59apt-get install -y aptitude
60aptitude install -y curl wget iputils-ping procps net-tools lsb-release  build-essential  sysstat telnet  
61aptitude install -y vim  bzip2 gnupg2 libtinfo5
62
63
64
65-- 3个PG库需要配置
66psql -U postgres -h 192.168.66.35 -p 64330
67psql -U postgres -h 192.168.66.35 -p 64331
68psql -U postgres -h 192.168.66.35 -p 64332
69
70
71alter system set shared_preload_libraries = 'pglogical';
72alter system set wal_level = 'logical';
73select pg_reload_conf();
74
75select * from pg_settings where name in ('wal_level','max_worker_processes','max_replication_slots','max_wal_senders','shared_preload_libraries');
76
77
78docker restart lhrpg30 lhrpg31 lhrpg32
79
80
81
82create database lhrdb;
83\c lhrdb
84create EXTENSION pglogical;
85
86
87
88cat >  /var/lib/postgresql/data/pg_hba.conf <<"EOF"
89local   all             all                                     trust
90host    all             all             127.0.0.1/32            trust
91host    all             all             ::1/128                 trust
92local   replication     all                                     trust
93host    replication     all             127.0.0.1/32            trust
94host    replication     all             ::1/128                 trust
95
96host    replication     postgres    172.72.6.0/24   trust
97
98host all all all md5
99
100EOF
101
102
103select * from pg_hba_file_rules;
104
105
106-- 查看日志
107docker logs -n 10 -f lhrpg30
108docker logs -n 10 -f lhrpg31
109docker logs -n 10 -f lhrpg32

提供者节点(发布端)配置

 1-- 1、创建节点:在数据库PG13里创建提供者节点
2-- SELECT pglogical.drop_node(node_name := 'provider30');
3SELECT pglogical.create_node(
4    node_name := 'provider30',
5    dsn := 'host=172.72.6.30 port=5432 dbname=lhrdb'
6);
7
8-- 查看
9select * from pglogical.node_interface ;
10
11
12-- 2、创建复制集:将public架构中的所有表添加到default复制集中,复制集default的表都必需要primary key
13SELECT pglogical.replication_set_add_all_tables('default'ARRAY['public']);
14
15-- 创建完订阅者后,才会有复制槽
16select * from pg_replication_slots ;

运行过程:

 1lhrdb=# SELECT pglogical.create_node(
2lhrdb(#     node_name := 'provider30',
3lhrdb(#     dsn := 'host=172.72.6.30 port=5432 dbname=lhrdb'
4lhrdb(# );
5 create_node
6-------------
7  2019914661
8(1 row)
9
10lhrdb=# select * from pglogical.local_node ;
11  node_id   | node_local_interface
12------------+----------------------
13 2019914661 |           1955427756
14(1 row)
15
16lhrdb=# select * from pglogical.node_interface ;
17   if_id    |  if_name   | if_nodeid  |                 if_dsn
18------------+------------+------------+-----------------------------------------
19 1955427756 | provider30 | 2019914661 | host=172.72.6.30 port=5432 dbname=lhrdb
20(1 row)
21
22lhrdb=# select * from pg_replication_slots ;
23 slot_name | plugin | slot_type | datoid | database | temporary | active | active_pid | xmin | catalog_xmin | restart_lsn | confirmed_flush_lsn | wal_status | safe_wal_size
24-----------+--------+-----------+--------+----------+-----------+--------+------------+------+--------------+-------------+---------------------+------------+---------------
25(0 rows)
26
27
28lhrdb=#
29lhrdb=# SELECT pglogical.replication_set_add_all_tables('default', ARRAY['public']);
30 replication_set_add_all_tables
31--------------------------------
32 t
33(1 row)

订阅者PG13节点配置

pglogical 可以同步表/序列结构;在创建订阅者 'pglogical.create_subscription' ; 里面参数synchronize_structure - 指定是否将提供者与订阅者之间的结构同步,默认为false。可以同步表/序列/索引,该功能仅限于同版本,不同版本会报错,具体可以根据情况来测试。

 1-- 1、创建节点,在数据库PG13创建订阅者节点
2SELECT pglogical.create_node(
3    node_name := 'subnode31',
4    dsn := 'host=172.72.6.31 port=5432 dbname=lhrdb  user=postgres password=lhr'
5);
6
7-- 查询
8select * from pglogical.node_interface ;
9
10
11-- 2、订阅提供者节点,该订阅将在后台启动同步和复制过程
12SELECT pglogical.create_subscription(
13     subscription_name := 'sub31',
14     provider_dsn := 'host=172.72.6.30 port=5432 dbname=lhrdb  user=postgres password=lhr',
15     synchronize_structure := true
16     synchronize_data := true
17);
18
19-- 查询
20select * from pglogical.node_interface ;
21select * from pglogical.subscription ;
22select * from pglogical.show_subscription_status();
23
24
25-- 创建订阅这个命令执行完成后,会在“发布端”创建一个复制槽,这个很重要,一个订阅者一个复制槽
26select * from pg_replication_slots ;
27
28
29-- 删除: 若要删除,则先删除订阅者,再删除节点
30SELECT pglogical.drop_subscription(subscription_name := 'sub31');
31SELECT pglogical.drop_node(node_name := 'subnode31');

过程:

订阅端:

 1lhrdb=# SELECT pglogical.create_node(
2lhrdb(#     node_name := 'subnode31',
3lhrdb(#     dsn := 'host=172.72.6.31 port=5432 dbname=lhrdb  user=postgres password=lhr'
4lhrdb(# );
5 create_node
6-------------
7  3125886449
8(1 row)
9
10
11lhrdb=#
12lhrdb=# SELECT pglogical.create_subscription(
13lhrdb(#      subscription_name := 'sub31',
14lhrdb(#      provider_dsn := 'host=172.72.6.30 port=5432 dbname=lhrdb  user=postgres password=lhr',
15lhrdb(#      synchronize_structure := true,
16lhrdb(#      synchronize_data := true
17lhrdb(# );
18 create_subscription
19---------------------
20          1804764769
21(1 row)
22
23lhrdb=# select * from pglogical.local_node ;
24  node_id   | node_local_interface
25------------+----------------------
26 3125886449 |            450827623
27(1 row)
28
29
30lhrdb=# select * from pglogical.node ;
31  node_id   | node_name
32------------+------------
33 3125886449 | subnode31
34 2019914661 | provider30
35(2 rows)
36
37
38lhrdb=# select * from pglogical.node_interface ;
39   if_id    |  if_name   | if_nodeid  |                               if_dsn
40------------+------------+------------+---------------------------------------------------------------------
41  450827623 | subnode31  | 3125886449 | host=172.72.6.31 port=5432 dbname=lhrdb  user=postgres password=lhr
42 1955427756 | provider30 | 2019914661 | host=172.72.6.30 port=5432 dbname=lhrdb  user=postgres password=lhr
43(2 rows)
44
45lhrdb=# select * from pglogical.subscription ;
46   sub_id   | sub_name | sub_origin | sub_target | sub_origin_if | sub_target_if | sub_enabled |       sub_slot_name        |         sub_replication_sets          | sub_forward_origins | sub_apply_delay | sub_force_text_transfer
47------------+----------+------------+------------+---------------+---------------+-------------+----------------------------+---------------------------------------+---------------------+-----------------+-------------------------
48 1804764769 | sub31    | 2019914661 | 3125886449 |    1955427756 |     450827623 | t           | pgl_lhrdb_provider30_sub31 | {default,default_insert_only,ddl_sql} | {all}               | 00:00:00        | f
49(1 row)
50
51
52lhrdb=#
53lhrdb=# select * from pglogical.show_subscription_status();
54 subscription_name |   status    | provider_node |                            provider_dsn                             |         slot_name          |           replication_sets            | forward_origins
55-------------------+-------------+---------------+---------------------------------------------------------------------+----------------------------+---------------------------------------+-----------------
56 sub31             | replicating | provider30    | host=172.72.6.30 port=5432 dbname=lhrdb  user=postgres password=lhr | pgl_lhrdb_provider30_sub31 | {default,default_insert_only,ddl_sql} | {all}
57(1 row)

发布端查询:

1lhrdb=# select * from pg_replication_slots ;
2         slot_name          |      plugin      | slot_type | datoid | database | temporary | active | active_pid | xmin | catalog_xmin | restart_lsn | confirmed_flush_lsn | wal_status | safe_wal_size
3----------------------------+------------------+-----------+--------+----------+-----------+--------+------------+------+--------------+-------------+---------------------+------------+---------------
4 pgl_lhrdb_provider30_sub31 | pglogical_output | logical   |  16384 | lhrdb    | f         | t      |         80 |      |          489 | 0/166D078   | 0/166D0B0           | reserved   |
5(1 row)

发布端告警日志:

 12022-01-14 20:24:05.544 CST [77] LOG:  logical decoding found consistent point at 0/166D078
22022-01-14 20:24:05.544 CST [77] DETAIL:  There are no running transactions.
32022-01-14 20:24:05.544 CST [77] STATEMENT:  CREATE_REPLICATION_SLOT "pgl_lhrdb_provider30_sub31" LOGICAL pglogical_output
42022-01-14 20:24:05.544 CST [77] LOG:  exported logical decoding snapshot: "00000007-00000002-1" with 0 transaction IDs
52022-01-14 20:24:05.544 CST [77] STATEMENT:  CREATE_REPLICATION_SLOT "pgl_lhrdb_provider30_sub31" LOGICAL pglogical_output
62022-01-14 20:24:06.352 CST [80] LOG:  starting logical decoding for slot "pgl_lhrdb_provider30_sub31"
72022-01-14 20:24:06.352 CST [80] DETAIL:  Streaming transactions committing after 0/166D0B0, reading WAL from 0/166D078.
82022-01-14 20:24:06.352 CST [80] STATEMENT:  START_REPLICATION SLOT "pgl_lhrdb_provider30_sub31" LOGICAL 0/166D0B0 (expected_encoding 'UTF8', min_proto_version '1', max_proto_version '1', startup_params_format '1', "binary.want_internal_basetypes" '1', "binary.want_binary_basetypes" '1', "binary.basetypes_major_version" '1300', "binary.sizeof_datum" '8', "binary.sizeof_int" '4', "binary.sizeof_long" '8', "binary.bigendian" '0', "binary.float4_byval" '0', "binary.float8_byval" '1', "binary.integer_datetimes" '0', "hooks.setup_function" 'pglogical.pglogical_hooks_setup', "pglogical.forward_origins" '"all"', "pglogical.replication_set_names" '"default",default_insert_only,ddl_sql', "relmeta_cache_size" '-1', pg_version '130005', pglogical_version '2.4.1', pglogical_version_num '20401', pglogical_apply_pid '108')
92022-01-14 20:24:06.353 CST [80] LOG:  logical decoding found consistent point at 0/166D078
102022-01-14 20:24:06.353 CST [80] DETAIL:  There are no running transactions.
112022-01-14 20:24:06.353 CST [80] STATEMENT:  START_REPLICATION SLOT "pgl_lhrdb_provider30_sub31" LOGICAL 0/166D0B0 (expected_encoding 'UTF8', min_proto_version '1', max_proto_version '1', startup_params_format '1', "binary.want_internal_basetypes" '1', "binary.want_binary_basetypes" '1', "binary.basetypes_major_version" '1300', "binary.sizeof_datum" '8', "binary.sizeof_int" '4', "binary.sizeof_long" '8', "binary.bigendian" '0', "binary.float4_byval" '0', "binary.float8_byval" '1', "binary.integer_datetimes" '0', "hooks.setup_function" 'pglogical.pglogical_hooks_setup', "pglogical.forward_origins" '"all"', "pglogical.replication_set_names" '"default",default_insert_only,ddl_sql', "relmeta_cache_size" '-1', pg_version '130005', pglogical_version '2.4.1', pglogical_version_num '20401', pglogical_apply_pid '108')

订阅端告警日志:

 12022-01-14 20:23:49.062 CST [74] LOG:  apply worker [74] at slot 2 generation 1 detaching cleanly
22022-01-14 20:23:49.067 CST [97] LOG:  manager worker [97] at slot 1 generation 4 detaching cleanly
32022-01-14 20:23:49.075 CST [98] LOG:  manager worker [98] at slot 1 generation 5 detaching cleanly
42022-01-14 20:23:49.290 CST [67] LOG:  manager worker [67] at slot 0 generation 4 detaching cleanly
52022-01-14 20:23:49.296 CST [99] LOG:  manager worker [99] at slot 1 generation 6 detaching cleanly
62022-01-14 20:23:49.299 CST [100] LOG:  starting pglogical database manager for database lhrdb
72022-01-14 20:23:49.299 CST [100] LOG:  manager worker [100] at slot 0 generation 5 detaching cleanly
82022-01-14 20:23:49.304 CST [101] LOG:  manager worker [101] at slot 1 generation 7 detaching cleanly
92022-01-14 20:23:55.379 CST [102] LOG:  manager worker [102] at slot 0 generation 6 detaching cleanly
102022-01-14 20:23:55.387 CST [103] LOG:  starting pglogical database manager for database lhrdb
112022-01-14 20:23:56.390 CST [104] LOG:  manager worker [104] at slot 1 generation 8 detaching cleanly
122022-01-14 20:24:05.077 CST [107] LOG:  manager worker [107] at slot 1 generation 9 detaching cleanly
132022-01-14 20:24:05.077 CST [108] LOG:  starting apply for subscription sub31
142022-01-14 20:24:05.085 CST [109] LOG:  manager worker [109] at slot 1 generation 10 detaching cleanly

订阅者PG12节点配置

 1-- 1、创建节点,在数据库PG12创建订阅者节点
2SELECT pglogical.create_node(
3    node_name := 'subnode32',
4    dsn := 'host=172.72.6.32 port=5432 dbname=lhrdb  user=postgres password=lhr'
5);
6
7
8-- 查询
9select * from pglogical.node_interface ;
10
11-- 2、订阅提供者节点,该订阅将在后台启动同步和复制过程
12SELECT pglogical.create_subscription(
13     subscription_name := 'sub32',
14     provider_dsn := 'host=172.72.6.30 port=5432 dbname=lhrdb  user=postgres password=lhr'
15);
16
17-- 查询
18select * from pglogical.node_interface ;
19select * from pglogical.subscription ;
20select * from pglogical.show_subscription_status();
21
22
23-- 创建订阅这个命令执行完成后,会在发布端创建一个复制槽,这个很重要,一个订阅者一个复制槽
24select * from pg_replication_slots ;
25
26
27-- 删除: 若要删除,则先删除订阅者,再删除节点
28SELECT pglogical.drop_subscription(subscription_name := 'sub32');
29SELECT pglogical.drop_node(node_name := 'subnode32');

订阅端:

 1lhrdb=# SELECT pglogical.create_node(
2lhrdb(#     node_name := 'subnode32',
3lhrdb(#     dsn := 'host=172.72.6.32 port=5432 dbname=lhrdb  user=postgres password=lhr'
4lhrdb(# );
5 create_node
6-------------
7   235272207
8(1 row)
9
10
11lhrdb=# SELECT pglogical.create_subscription(
12lhrdb(#      subscription_name := 'sub32',
13lhrdb(#      provider_dsn := 'host=172.72.6.30 port=5432 dbname=lhrdb  user=postgres password=lhr'
14lhrdb(# );
15 create_subscription
16---------------------
17           239145690
18(1 row)
19
20
21lhrdb=# select * from pglogical.local_node ;
22  node_id  | node_local_interface
23-----------+----------------------
24 235272207 |           3519183068
25(1 row)
26
27
28lhrdb=# select * from pglogical.local_sync_status ;
29 sync_kind | sync_subid | sync_nspname | sync_relname | sync_status | sync_statuslsn
30-----------+------------+--------------+--------------+-------------+----------------
31 d         |  239145690 |              |              | r           | 0/0
32(1 row)
33
34
35lhrdb=# select * from pglogical.node ;
36  node_id   | node_name
37------------+------------
38  235272207 | subnode32
39 2019914661 | provider30
40(2 rows)
41
42
43lhrdb=# select * from pglogical.node_interface ;
44   if_id    |  if_name   | if_nodeid  |                               if_dsn
45------------+------------+------------+---------------------------------------------------------------------
46 3519183068 | subnode32  |  235272207 | host=172.72.6.32 port=5432 dbname=lhrdb  user=postgres password=lhr
47 1955427756 | provider30 | 2019914661 | host=172.72.6.30 port=5432 dbname=lhrdb  user=postgres password=lhr
48(2 rows)
49
50
51lhrdb=# select * from pglogical.subscription ;
52  sub_id   | sub_name | sub_origin | sub_target | sub_origin_if | sub_target_if | sub_enabled |       sub_slot_name        |         sub_replication_sets          | sub_forward_origins | sub_apply_delay | sub_force_text_transfer
53-----------+----------+------------+------------+---------------+---------------+-------------+----------------------------+---------------------------------------+---------------------+-----------------+-------------------------
54 239145690 | sub32    | 2019914661 |  235272207 |    1955427756 |    3519183068 | t           | pgl_lhrdb_provider30_sub32 | {default,default_insert_only,ddl_sql} | {all}               | 00:00:00        | f
55(1 row)
56
57
58lhrdb=# select * from pglogical.show_subscription_status();
59 subscription_name |   status    | provider_node |                            provider_dsn                             |         slot_name          |           replication_sets            | forward_origins
60-------------------+-------------+---------------+---------------------------------------------------------------------+----------------------------+---------------------------------------+-----------------
61 sub32             | replicating | provider30    | host=172.72.6.30 port=5432 dbname=lhrdb  user=postgres password=lhr | pgl_lhrdb_provider30_sub32 | {default,default_insert_only,ddl_sql} | {all}
62(1 row)

发布端:

1lhrdb=# select * from pg_replication_slots ;
2         slot_name          |      plugin      | slot_type | datoid | database | temporary | active | active_pid | xmin | catalog_xmin | restart_lsn | confirmed_flush_lsn | wal_status | safe_wal_size
3----------------------------+------------------+-----------+--------+----------+-----------+--------+------------+------+--------------+-------------+---------------------+------------+---------------
4 pgl_lhrdb_provider30_sub31 | pglogical_output | logical   |  16384 | lhrdb    | f         | t      |         80 |      |          489 | 0/166D0B0   | 0/166D0E8           | reserved   |
5 pgl_lhrdb_provider30_sub32 | pglogical_output | logical   |  16384 | lhrdb    | f         | t      |        105 |      |          489 | 0/166D0B0   | 0/166D0E8           | reserved   |
6(2 rows)

可见,一个订阅端会产生一个复制槽。

发布端告警日志:

 12022-01-14 20:30:26.462 CST [103] LOG:  logical decoding found consistent point at 0/166D0B0
22022-01-14 20:30:26.462 CST [103] DETAIL:  There are no running transactions.
32022-01-14 20:30:26.462 CST [103] STATEMENT:  CREATE_REPLICATION_SLOT "pgl_lhrdb_provider30_sub32" LOGICAL pglogical_output
42022-01-14 20:30:26.462 CST [103] LOG:  exported logical decoding snapshot: "00000008-00000002-1" with 0 transaction IDs
52022-01-14 20:30:26.462 CST [103] STATEMENT:  CREATE_REPLICATION_SLOT "pgl_lhrdb_provider30_sub32" LOGICAL pglogical_output
62022-01-14 20:30:26.583 CST [105] LOG:  starting logical decoding for slot "pgl_lhrdb_provider30_sub32"
72022-01-14 20:30:26.583 CST [105] DETAIL:  Streaming transactions committing after 0/166D0E8, reading WAL from 0/166D0B0.
82022-01-14 20:30:26.583 CST [105] STATEMENT:  START_REPLICATION SLOT "pgl_lhrdb_provider30_sub32" LOGICAL 0/166D0E8 (expected_encoding 'UTF8', min_proto_version '1', max_proto_version '1', startup_params_format '1', "binary.want_internal_basetypes" '1', "binary.want_binary_basetypes" '1', "binary.basetypes_major_version" '1200', "binary.sizeof_datum" '8', "binary.sizeof_int" '4', "binary.sizeof_long" '8', "binary.bigendian" '0', "binary.float4_byval" '1', "binary.float8_byval" '1', "binary.integer_datetimes" '0', "hooks.setup_function" 'pglogical.pglogical_hooks_setup', "pglogical.forward_origins" '"all"', "pglogical.replication_set_names" '"default",default_insert_only,ddl_sql', "relmeta_cache_size" '-1', pg_version '120009', pglogical_version '2.4.1', pglogical_version_num '20401', pglogical_apply_pid '95')
92022-01-14 20:30:26.583 CST [105] LOG:  logical decoding found consistent point at 0/166D0B0
102022-01-14 20:30:26.583 CST [105] DETAIL:  There are no running transactions.
112022-01-14 20:30:26.583 CST [105] STATEMENT:  START_REPLICATION SLOT "pgl_lhrdb_provider30_sub32" LOGICAL 0/166D0E8 (expected_encoding 'UTF8', min_proto_version '1', max_proto_version '1', startup_params_format '1', "binary.want_internal_basetypes" '1', "binary.want_binary_basetypes" '1', "binary.basetypes_major_version" '1200', "binary.sizeof_datum" '8', "binary.sizeof_int" '4', "binary.sizeof_long" '8', "binary.bigendian" '0', "binary.float4_byval" '1', "binary.float8_byval" '1', "binary.integer_datetimes" '0', "hooks.setup_function" 'pglogical.pglogical_hooks_setup', "pglogical.forward_origins" '"all"', "pglogical.replication_set_names" '"default",default_insert_only,ddl_sql', "relmeta_cache_size" '-1', pg_version '120009', pglogical_version '2.4.1', pglogical_version_num '20401', pglogical_apply_pid '95')

订阅端告警日志:

12022-01-14 20:30:19.585 CST [90] LOG:  manager worker [90] at slot 0 generation 3 detaching cleanly
22022-01-14 20:30:19.592 CST [91] LOG:  starting pglogical database manager for database lhrdb
32022-01-14 20:30:20.595 CST [92] LOG:  manager worker [92] at slot 1 generation 1 detaching cleanly
42022-01-14 20:30:25.943 CST [94] LOG:  manager worker [94] at slot 1 generation 2 detaching cleanly
52022-01-14 20:30:25.944 CST [95] LOG:  starting apply for subscription sub32
62022-01-14 20:30:25.951 CST [96] LOG:  manager worker [96] at slot 1 generation 3 detaching cleanly

验证复制

发布端创建表

由于需要验证insert/update/delete/truncate操作是否同步;所以创建的表必须要有主键,若没有主键,则不能同步。

 1-- 发布者
2create table tb_test(id int primary keyname text, reg_time timestamp);
3insert into tb_test select generate_series(1,10000),'xxt',now();
4select count(*) from tb_test;
5
6
7create table tb_test2(id int primary keyname text, reg_time timestamp);
8insert into tb_test2 select generate_series(1,20000),'xxt',now();
9select count(*) from tb_test2;
10
11
12-- 订阅端需要创建表结构
13create table tb_test(id int primary keyname text, reg_time timestamp);
14create table tb_test2(id int primary keyname text, reg_time timestamp);

将表添加对应的复制集

对新建的表;并没有为其分配对应的复制集;需要手动添加。当然可以利用触发器自动添加。

有2种方法:

方法1:将public架构中的所有表添加到default复制集中

1SELECT pglogical.replication_set_add_all_tables('default'ARRAY['public']);

方法2:将表添加到对应的复制集中

1pglogical.replication_set_add_table(set_name name, relation regclass, synchronize_data boolean, columns text [],row_filter text)  

例如:

1lhrdb=# select pglogical.replication_set_add_table(set_name := 'default', relation := 'tb_test',synchronize_data := true);
2 replication_set_add_table 
3---------------------------
4 t
5(1 row)

查询:

1select * from pglogical.replication_set_table ;
2
3select * from pglogical.depend ;
4
5select * from pglogical.tables ;

发布端执行过程:

 1lhrdb=# create table tb_test(id int primary key, name text, reg_time timestamp);
2CREATE TABLE
3lhrdb=#
4lhrdb=# insert into tb_test select generate_series(1,10000),'xxt',now();
5INSERT 0 10000
6lhrdb=# select count(*) from tb_test;
7 count
8-------
9 10000
10(1 row)
11
12
13lhrdb=# select * from pglogical.replication_set_table ;
14 set_id | set_reloid | set_att_list | set_row_filter
15--------+------------+--------------+----------------
16(0 rows)
17
18lhrdb=# SELECT pglogical.replication_set_add_all_tables('default', ARRAY['public']);
19 replication_set_add_all_tables
20--------------------------------
21 t
22(1 row)
23
24
25lhrdb=# select * from pglogical.replication_set_table ;
26  set_id  | set_reloid | set_att_list | set_row_filter
27----------+------------+--------------+----------------
28 48521716 | tb_test    |              |
29(1 row)
30
31lhrdb=# create table tb_test2(id int primary key, name text, reg_time timestamp);
32CREATE TABLE
33lhrdb=# insert into tb_test2 select generate_series(1,20000),'xxt',now();
34INSERT 0 20000
35lhrdb=# select count(*) from tb_test2;
36 count
37-------
38 20000
39(1 row)
40
41
42lhrdb=# \d
43          List of relations
44 Schema |   Name   | Type  |  Owner
45--------+----------+-------+----------
46 public | tb_test  | table | postgres
47 public | tb_test2 | table | postgres
48(2 rows)
49
50
51lhrdb=# select * from pglogical.replication_set_table ;
52  set_id  | set_reloid | set_att_list | set_row_filter
53----------+------------+--------------+----------------
54 48521716 | tb_test    |              |
55(1 row)
56
57lhrdb=# select pglogical.replication_set_add_table(set_name := 'default', relation := 'tb_test2',synchronize_data := true);
58 replication_set_add_table
59---------------------------
60 t
61(1 row)
62
63
64lhrdb=# select * from pglogical.replication_set_table ;
65  set_id  | set_reloid | set_att_list | set_row_filter
66----------+------------+--------------+----------------
67 48521716 | tb_test    |              |
68 48521716 | tb_test2   |              |
69(2 rows)
70
71lhrdb=# select * from pglogical.depend ;
72 classid |   objid    | objsubid | refclassid | refobjid | refobjsubid | deptype
73---------+------------+----------+------------+----------+-------------+---------
74   16491 | 1930385120 |    16550 |       1259 |    16550 |           0 | n
75   16491 | 1930385120 |    16558 |       1259 |    16558 |           0 | n
76(2 rows)
77
78lhrdb=# select * from pglogical.tables ;
79 relid | nspname | relname  | set_name
80-------+---------+----------+----------
81 16558 | public  | tb_test2 | default
82 16550 | public  | tb_test  | default
83(2 rows)

订阅端操作

 1-- 重新同步一个表
2select * from pglogical.alter_subscription_resynchronize_table('sub31','tb_test') ;
3
4
5-- 将所有的表都同步
6select * from pglogical.alter_subscription_synchronize('sub31'true) ;
7
8
9-- 查看同步状态
10select * from  pglogical.local_sync_status;
11
12-- 查看subscriber 节点
13select * from pglogical.show_subscription_table('sub31','tb_test');

PG31操作过程:

 1lhrdb=# select * from  pglogical.local_sync_status;
2 sync_kind | sync_subid | sync_nspname | sync_relname | sync_status | sync_statuslsn
3-----------+------------+--------------+--------------+-------------+----------------
4 f         | 1804764769 |              |              | r           | 0/0
5(1 row)
6
7lhrdb=# select * from pglogical.alter_subscription_synchronize('sub31', true) ;
8 alter_subscription_synchronize
9--------------------------------
10 t
11(1 row)
12lhrdb=# select * from  pglogical.local_sync_status;
13 sync_kind | sync_subid | sync_nspname | sync_relname | sync_status | sync_statuslsn
14-----------+------------+--------------+--------------+-------------+----------------
15 d         | 1804764769 | public       | tb_test      | i           | 0/0
16 f         | 1804764769 |              |              | r           | 0/0
17 d         | 1804764769 | public       | tb_test2     | i           | 0/0
18(3 rows)
19
20
21lhrdb=# \dt
22Did not find any relations.
23lhrdb=#
24lhrdb=# create table tb_test(id int primary key, name text, reg_time timestamp);
25CREATE TABLE
26lhrdb=# create table tb_test2(id int primary key, name text, reg_time timestamp);
27CREATE TABLE
28lhrdb=#  select * from  pglogical.local_sync_status;
29 sync_kind | sync_subid | sync_nspname | sync_relname | sync_status | sync_statuslsn
30-----------+------------+--------------+--------------+-------------+----------------
31 f         | 1804764769 |              |              | r           | 0/0
32 d         | 1804764769 | public       | tb_test      | r           | 0/1AA00D0
33 d         | 1804764769 | public       | tb_test2     | r           | 0/1AA0108
34(3 rows)
35
36
37lhrdb=#
38lhrdb=# select * from pglogical.show_subscription_table('sub31','tb_test');
39 nspname | relname |   status
40---------+---------+-------------
41 public  | tb_test | replicating
42(1 row)
43
44
45lhrdb=# select * from pglogical.show_subscription_table('sub31','tb_test2');
46 nspname | relname  |   status
47---------+----------+-------------
48 public  | tb_test2 | replicating
49(1 row)
50
51
52lhrdb=# select count(*) from tb_test;
53 count
54-------
55 10000
56(1 row)

PG32操作过程:

 1lhrdb=# \dt
2Did not find any relations.
3lhrdb=#
4lhrdb=# create table tb_test(id int primary key, name text, reg_time timestamp);
5CREATE TABLE
6lhrdb=# create table tb_test2(id int primary key, name text, reg_time timestamp);
7CREATE TABLE
8lhrdb=# select * from  pglogical.local_sync_status;
9 sync_kind | sync_subid | sync_nspname | sync_relname | sync_status | sync_statuslsn
10-----------+------------+--------------+--------------+-------------+----------------
11 d         |  239145690 |              |              | r           | 0/0
12(1 row)
13
14
15lhrdb=#
16lhrdb=# select * from pglogical.alter_subscription_synchronize('sub32', true) ;
17 alter_subscription_synchronize
18--------------------------------
19 t
20(1 row)
21
22
23lhrdb=# select * from  pglogical.local_sync_status;
24 sync_kind | sync_subid | sync_nspname | sync_relname | sync_status | sync_statuslsn
25-----------+------------+--------------+--------------+-------------+----------------
26 d         |  239145690 |              |              | r           | 0/0
27 d         |  239145690 | public       | tb_test      | r           | 0/1AA0140
28 d         |  239145690 | public       | tb_test2     | w           | 0/1AA0178
29(3 rows)
30
31
32lhrdb=# select * from pglogical.show_subscription_table('sub32','tb_test');
33 nspname | relname |   status
34---------+---------+-------------
35 public  | tb_test | replicating
36(1 row)
37
38
39lhrdb=# select * from  pglogical.local_sync_status;
40 sync_kind | sync_subid | sync_nspname | sync_relname | sync_status | sync_statuslsn
41-----------+------------+--------------+--------------+-------------+----------------
42 d         |  239145690 |              |              | r           | 0/0
43 d         |  239145690 | public       | tb_test      | r           | 0/1AA0140
44 d         |  239145690 | public       | tb_test2     | r           | 0/1AA0178
45(3 rows)
46
47
48lhrdb=# select count(*) from tb_test;
49 count
50-------
51 10000
52(1 row)

发布端新增表

 1lhrdb=# create table test(id int primary key, name text, reg_time timestamp);
2CREATE TABLE
3lhrdb=# insert into test select generate_series(1,10000),'xxt',now();
4INSERT 0 10000
5lhrdb=# select count(*) from test;
6 count
7-------
8 10000
9(1 row)
10
11lhrdb=# select * from pglogical.tables ;
12 relid | nspname | relname  | set_name
13-------+---------+----------+----------
14 16558 | public  | tb_test2 | default
15 16550 | public  | tb_test  | default
16 16568 | public  | test     |
17(3 rows)
18
19
20lhrdb=# select pglogical.replication_set_add_table(set_name := 'default', relation := 'test',synchronize_data := true);
21 replication_set_add_table
22---------------------------
23 t
24(1 row)
25
26
27lhrdb=# select * from pglogical.tables ;
28 relid | nspname | relname  | set_name
29-------+---------+----------+----------
30 16558 | public  | tb_test2 | default
31 16550 | public  | tb_test  | default
32 16568 | public  | test     | default
33(3 rows)

订阅端:

 1lhrdb=# \dt
2          List of relations
3 Schema |   Name   | Type  |  Owner
4--------+----------+-------+----------
5 public | tb_test  | table | postgres
6 public | tb_test2 | table | postgres
7(2 rows)
8
9
10lhrdb=# select * from pglogical.alter_subscription_resynchronize_table('sub31','test') ;
11ERROR:  relation "test" does not exist
12LINE 1: ...cal.alter_subscription_resynchronize_table('sub31','test') ;
13
14
15lhrdb=# create table test(id int primary key, name text, reg_time timestamp);
16CREATE TABLE
17lhrdb=# select * from  pglogical.local_sync_status;
18 sync_kind | sync_subid | sync_nspname | sync_relname | sync_status | sync_statuslsn
19-----------+------------+--------------+--------------+-------------+----------------
20 f         | 1804764769 |              |              | r           | 0/0
21 d         | 1804764769 | public       | tb_test      | r           | 0/1AA00D0
22 d         | 1804764769 | public       | tb_test2     | r           | 0/1AA0108
23 d         | 1804764769 | public       | test         | r           | 0/1C2B9E8
24(4 rows)
25
26
27lhrdb=# select count(*)  from test;
28 count
29-------
30 10000
31(1 row)

进程

发布端:

 1root@lhrpg30:/# ps -ef|grep post
2postgres     1     0  0 20:14 ?        00:00:04 postgres
3postgres    27     1  0 20:14 ?        00:00:01 postgres: checkpointer
4postgres    28     1  0 20:14 ?        00:00:00 postgres: background writer
5postgres    29     1  0 20:14 ?        00:00:04 postgres: walwriter
6postgres    30     1  0 20:14 ?        00:00:00 postgres: autovacuum launcher
7postgres    31     1  0 20:14 ?        00:00:01 postgres: stats collector
8postgres    32     1  0 20:14 ?        00:00:00 postgres: pglogical supervisor
9postgres    33     1  0 20:14 ?        00:00:00 postgres: logical replication launcher
10postgres    37     1  0 20:15 ?        00:00:00 postgres: postgres lhrdb 192.168.66.64(1444) idle
11postgres    51     1  0 20:17 ?        00:00:00 postgres: pglogical manager 16384
12postgres    80     1  0 20:24 ?        00:00:02 postgres: walsender postgres 172.72.6.31(55390) idle
13postgres   105     1  0 20:30 ?        00:00:02 postgres: walsender postgres 172.72.6.32(41266) idle
14root      3354    38  0 21:00 pts/0    00:00:00 grep post

订阅端:

 1root@lhrpg31:/# ps -ef| grep post
2postgres     1     0  0 20:14 ?        00:00:01 postgres
3postgres    27     1  0 20:14 ?        00:00:00 postgres: checkpointer
4postgres    28     1  0 20:14 ?        00:00:00 postgres: background writer
5postgres    29     1  0 20:14 ?        00:00:03 postgres: walwriter
6postgres    30     1  0 20:14 ?        00:00:00 postgres: autovacuum launcher
7postgres    31     1  0 20:14 ?        00:00:01 postgres: stats collector
8postgres    32     1  0 20:14 ?        00:00:00 postgres: pglogical supervisor
9postgres    33     1  0 20:14 ?        00:00:00 postgres: logical replication launcher
10postgres    37     1  0 20:15 ?        00:00:00 postgres: postgres lhrdb 192.168.66.64(1452) idle
11postgres   103     1  0 20:23 ?        00:00:00 postgres: pglogical manager 16384
12postgres   108     1  0 20:24 ?        00:00:03 postgres: pglogical apply 16384:1804764769
13root      1205    38  0 21:08 pts/0    00:00:00 grep post

示例二:2个发布端,1个订阅端

现有实验环境

数据库版本操作系统IP数据库角色
PostgreSQL 13.4Debian GNU/Linux 11172.72.6.30lhrdb2provider
PostgreSQL 13.4Debian GNU/Linux 11172.72.6.31lhrdb2provider
PostgreSQL 12.8Debian GNU/Linux 11172.72.6.32lhrdb2subscriber

可从多个上游服务器,做数据的聚集和合并;

发布者跟订阅者的关系:一个发布者可以被多个订阅者订阅。多个发布者可以被同一个订阅者订阅。

环境

3个节点都操作:

1create database lhrdb2;
2\c lhrdb2
3create EXTENSION pglogical;

3个节点都创建测试表; 订阅者创建的表可以无主键;若订阅者有主键,可利用序列自增来解决冲突。(例如:本例是两个发布者,则发布者1可取奇数;发布者二可取偶数)。若无主键;数据不受影响。

 1-- 6.30
2create table test(id int primary key,name text);
3CREATE SEQUENCE seq_id INCREMENT BY 2 START WITH 1;
4
5-- 6.31
6create table test(id int primary key,name text);
7CREATE SEQUENCE seq_id INCREMENT BY 2 START WITH 2;
8
9-- 6.32
10create table test(id int primary key,name text);

发布端

在6.30和6.31创建节点,注意修改IP地址:

 1-- 1、创建节点:在数据库PG13里创建提供者节点
2-- SELECT pglogical.drop_node(node_name := 'provider30');
3SELECT pglogical.create_node(
4    node_name := 'provider30',
5    dsn := 'host=172.72.6.30 port=5432 dbname=lhrdb2'
6);
7
8-- 查看
9select * from pglogical.node_interface ;
10
11-- 2、将表加入同步
12select pglogical.replication_set_add_table( set_name := 'default', relation := 'test',synchronize_data := true);
13
14
15-- 查询
16select * from pglogical.tables ;

订阅端

订阅端的数据来自于2个发布端,所以,需要在相应的数据库下创建1个订阅者节点和2个订阅。

 1-- 1、创建节点,在数据库PG12创建订阅者节点
2SELECT pglogical.create_node(
3    node_name := 'subnode32',
4    dsn := 'host=172.72.6.32 port=5432 dbname=lhrdb2  user=postgres password=lhr'
5);
6
7
8-- 查询
9select * from pglogical.node_interface ;
10
11
12-- 2、订阅提供者节点,该订阅将在后台启动同步和复制过程,这里需要2个提供者
13SELECT pglogical.create_subscription(
14     subscription_name := 'sub32from30',
15     provider_dsn := 'host=172.72.6.30 port=5432 dbname=lhrdb2  user=postgres password=lhr'
16);
17
18SELECT pglogical.create_subscription(
19     subscription_name := 'sub32from31',
20     provider_dsn := 'host=172.72.6.31 port=5432 dbname=lhrdb2  user=postgres password=lhr'
21);
22
23-- 查询
24select * from pglogical.node_interface ;
25select * from pglogical.subscription ;
26select * from pglogical.show_subscription_status();
27
28
29-- 创建订阅这个命令执行完成后,会在发布端创建一个复制槽,这个很重要,一个订阅者一个复制槽
30select * from pg_replication_slots ;
31
32
33-- 删除: 若要删除,则先删除订阅者,再删除节点
34SELECT pglogical.drop_subscription(subscription_name := 'sub32from30');
35SELECT pglogical.drop_node(node_name := 'subnode32');

测试数据

 1-- 60.30
2insert into test select nextval('seq_id'),'xxt' || generate_series(1,10,2);
3
4-- 60.31
5insert into test select nextval('seq_id'),'xxt' || generate_series(2,10,2);
6
7
8-- 订阅端查询
9lhrdb2=# select * from test;
10 id | name
11----+------
12  1 | xxt1
13  3 | xxt3
14  5 | xxt5
15  7 | xxt7
16  9 | xxt9
17  2 | xxt2
18  4 | xxt4
19  6 | xxt6
20  8 | xxt8
21 10 | xxt10
22(10 rows)

示例三:云环境RDS中配置pglogical(未成功)

国内的华为云、阿里云和腾讯云的RDS for PG都已经内置了pglogical插件!

华为云支持的插件列表可以参考:https://support.huaweicloud.com/usermanual-rds/rds_09_0045.html

购买2台华为云rds环境:

image-20220115142958399
数据库版本环境内网IPEIP数据库角色
PostgreSQL 12.6华为云RDS10.0.0.73119.3.169.211lhrdb发布者
PostgreSQL 12.6华为云RDS10.0.0.74139.9.129.179lhrdb订阅者

环境

2个节点都操作:

 1create database lhrdb;
2\c lhrdb
3
4
5
6
7select * from pg_settings where name in ('wal_level','max_worker_processes','max_replication_slots','max_wal_senders','shared_preload_libraries');
8
9
10
11lhrdb=> select * from pg_available_extensions where name like 'pglogical%';
12       name       | default_version | installed_version |                              comment
13------------------+-----------------+-------------------+--------------------------------------------------------------------
14 pglogical        | 2.3.3           |                   | PostgreSQL Logical Replication
15 pglogical_origin | 1.0.0           |                   | Dummy extension for compatibility when upgrading from Postgres 9.4
16(2 rows)
17
18lhrdb=> create EXTENSION pglogical;
19ERROR:  pglogical is not in shared_preload_libraries
20
21lhrdb=> show shared_preload_libraries;
22                   shared_preload_libraries
23---------------------------------------------------------------
24 passwordcheck.so, pg_stat_statements, pg_sql_history, pgaudit
25(1 row)
26
27
28-- 这里由于没有权限修改参数,使用命令和界面都不能修改,必须提工单,由华为的人后台进行修改这个参数才可以!!!!
29
30-- 修改完之后继续
31
32lhrdb=>  show shared_preload_libraries;
33                         shared_preload_libraries
34--------------------------------------------------------------------------
35 passwordcheck.so, pg_stat_statements, pg_sql_history, pgaudit, pglogical
36(1 row)
37
38lhrdb=> create EXTENSION pglogical;
39CREATE EXTENSION
40lhrdb=>
41lhrdb=>
42lhrdb=>
43lhrdb=>
44lhrdb=> \dx
45                   List of installed extensions
46   Name    | Version |   Schema   |          Description
47-----------+---------+------------+--------------------------------
48 pglogical | 2.3.3   | pglogical  | PostgreSQL Logical Replication
49 plpgsql   | 1.0     | pg_catalog | PL/pgSQL procedural language
50(2 rows)

2个节点都创建测试表:

1-- 6.30
2create table test(id int primary key,name text);
3CREATE SEQUENCE seq_id INCREMENT BY 2 START WITH 1;
4
5
6-- 6.32
7create table test(id int primary key,name text);

发布端

 1-- 1、创建节点:在数据库PG13里创建提供者节点
2-- SELECT pglogical.drop_node(node_name := 'provider30');
3SELECT pglogical.create_node(
4    node_name := 'provider',
5    dsn := 'host=127.0.0.1 port=5432 dbname=lhrdb'
6);
7
8-- 查看
9select * from pglogical.node_interface ;
10
11-- 2、将表加入同步
12select pglogical.replication_set_add_table( set_name := 'default', relation := 'test',synchronize_data := true);
13
14
15-- 查询
16select * from pglogical.tables ;

订阅端

 1-- 1、创建节点,在数据库PG12创建订阅者节点
2SELECT pglogical.create_node(
3    node_name := 'subnode',
4    dsn := 'host=127.0.0.1 port=5432 dbname=lhrdb  user=root password=lhr@xxt123'
5);
6
7
8-- 查询
9select * from pglogical.node_interface ;
10
11
12-- 2、订阅提供者节点,该订阅将在后台启动同步和复制过程
13SELECT pglogical.create_subscription(
14     subscription_name := 'sub32from30',
15     provider_dsn := 'host=10.0.0.73 port=5432 dbname=lhrdb  user=root password=lhr@xxt123'
16);
17
18
19
20-- 查询
21select * from pglogical.node_interface ;
22select * from pglogical.subscription ;
23select * from pglogical.show_subscription_status();
24
25
26-- 创建订阅这个命令执行完成后,会在发布端创建一个复制槽,这个很重要,一个订阅者一个复制槽
27select * from pg_replication_slots ;

这里发现华为云rds的源端并没有生成复制槽,所以,数据并不会同步!!

排错

failed: fe_sendauth: no password supplied

在创建订阅端的时候报错:

1postgres=# SELECT pglogical.create_subscription(
2postgres(# subscription_name :='subscription1',
3postgres(# provider_dsn := 'host=172.72.7.20 dbname=postgres user=postgres password=postgres'
4postgres(# );
5ERROR:  could not connect to the postgresql server: connection to server at "172.72.7.20", port 5432 failed: fe_sendauth: no password supplied
6
7DETAIL:  dsn was:  host=172.72.7.20 port=5432 dbname=postgres

解决:

 1SELECT pglogical.create_node(
2    node_name := 'provider1',
3    dsn := 'host=172.72.7.10 port=5432 dbname=test user=root password=postgres'
4);
5
6
7
8SELECT pglogical.create_subscription(
9subscription_name := 'sub1',
10provider_dsn := 'host=172.72.7.20 port=5432 dbname=postgres user=postgres password=postgres'
11);

密码应该在node和subscription里都提供。

总结

1、经过多次试验,发现创建表的操作往往不能成功!!!若发布端新增表,那么目标端最好也需要新建表。

2、创建节点和订阅时,一定要注意参数的值,例如:dbname、user和host

3、创建一个发布者节点,会同时在后台启动一个进程,叫“postgres: pglogical manager 16384”

4、创建一个订阅者,会在发布端后台启动一个进程,叫“postgres: walsender postgres 172.72.6.32(50340) idle”;同时会在订阅节点后台启动一个进程,叫“postgres: pglogical apply 16760:810137120”

4、排错请查看发布端和订阅端的数据库告警日志内容。

5、pglogical实现的这些功能,完全可以使用OGG来替代,具体可以参考:https://www.xmmup.com/shiyongogg-for-pgweifuwukuaisushuangxiangtongburdsshujukushuangzhu.html

6、在华为云端rds中,虽然有pglogical插件,但是我自己没有配置成功,创建订阅的时候不能使用公网,使用内网IP后,虽然创建订阅成功了,但是源端也不能生成复制槽,导致数据并不能同步到目标端。其它云没有测试过!!


文章转载自DB宝,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论