1 流复制

2 逻辑复制

3 流复制和逻辑复制对比
主要区别:
流复制主库上的事务提交不需要等待备库接收到WAL文件后的确认,逻辑复制相反。 流复制要求主备库的大版本一致,逻辑复制可以跨大版本的数据同步,也可以实现异构数据库的数据同步。 流复制的主库可读写,从库只允许读,逻辑复制的目标端数据库要求可读写。 流复制是对实例级别的复制(整个postgresql数据库集簇),逻辑复制是选择性的复制一些表,所以是对表级别的复制。 流复制有主库的DDL、DML操作,逻辑复制只有DML操作。
4 逻辑复制家族

5 核心技术:logical decoding

6 社区插件的原理及功能
6.1 test_decoding plugin
test_decoding是Postgres现有的一个plugin,它的主要作用是将筛选过后的wal日志,转化为人们可以理解的形式。现在PG内部有两种方法可以使用plugin。如下是对这两种情况的简单说明。详情请查看官方文档。
6.1.1 内置SQL函数使用
--创建复制槽test=# SELECT * FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding');slot_name | lsn-----------------+-----------regression_slot | 0/16569D8(1 row)-- 查看复制槽test=# select * from pg_replication_slots ;--插入一条数据到test01表中test=# insert into test01 values(1,'test_decoding use by SQL');INSERT 0 1--查看解析的sqltest=# SELECT * FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL);lsn | xid | data------------+------+-----------------------------------------------------------------------------------------------0/340001F8 | 1673 | BEGIN 16730/340001F8 | 1673 | table public.test01: INSERT: id[integer]:1 name[character varying]:'test_decoding use by SQL'0/34000628 | 1673 | COMMIT 1673

test_decoding 流程图
文档地址:http://www.postgres.cn/docs/12/app-pgrecvlogical.html

pg_recvlogical 流程图
# 在test库中创建wal2json类型的复制槽 test_slot_wal2json$ pg_recvlogical -d test --slot test_slot_wal2json --create-slot -P wal2json# 输出逻辑解码的信息到当前目录下的 ld_wal2json.out 文件中$ pg_recvlogical -d test --slot test_slot_wal2json --start -o pretty-print=1 -f ld_wal2json.out &#从test库中插入一条数据$ psql -d test -c "insert into test01 values(2,'wal2json use by pg_recvlogical');"查看 ld_wal2json.out 结果{"change": [{"kind": "insert","schema": "public","table": "test01","columnnames": ["id", "name"],"columntypes": ["integer", "character varying"],"columnvalues": [2, "wal2json use by pg_recvlogical"]}]}
6.4 pglogical plugin
pglogical 文档:https://www.2ndquadrant.com/en/resources/pglogical/pglogical-docs/pglogical下载地址:http://packages.2ndquadrant.com/pglogical/tarballs/
6.5 其他逻辑解码插件
pgoutput plugin
标准的 pg12 默认的逻辑解码插件。
decoderbufs
一个PostgreSQL逻辑解码器输出插件,用于将数据作为协议缓冲区传送。
decoder_raw
重新构造应用更改的查询。
ali_decoding
阿里巴巴研发的开源的解码插件。
BDR
专门设计用于分布在不同地理位置的集群的双向复制,区别与使用触发器进行双向复制的如SymmetricDS 的双主数据库。如果一个特定的数据集只在一个节点上修改,BDR工作的最好。BDR支持地理上分布的集群,不受距离的限制,并包含了地理围栏的能力。它的设计目的是最小化节点间的延迟。
7 Logical Replication 逻辑复制演示
7.1 发布端配置
# 1.postgresql.conflisten_addresses = '*'wal_level = logicalmax_replication_slots = 8max_wal_senders = 10# 2.pg_hba.confhost replication qqq 192.168.5.23/24 md5# 3.pg_ctl reload
# 1.postgresql.conflisten_addresses = '*'wal_level = logicalmax_replication_slots = 8max_logical_replication_workers = 8# 2.pg_ctl reload
7.3 发布端配置
# 1.创建复制用户:create user qqq replication login password 'qqq';# 2.模拟造数:create database pubaaa;\c pubaaa qqqcreate table zzz (id int primary key,sex character varying(2),name character varying(10),now_address character varying(100),address character varying(100));insert into zzz values(generate_series(1,10000),repeat( chr(int4(random()*26)+65),1),repeat( chr(int4(random()*26)+65),6),repeat( chr(int4(random()*26)+65),60),repeat( chr(int4(random()*26)+65),50));create index on test(id,sex);# 3.用 postgres 用户创建发布节点\c pubaaa postgrescreate publication pub01 for table zzz;select * from pg_publication;# 4.发布节点为复制用户授权\c pubaaa postgresgrant connect on database pubaaa to qqq;grant usage on schema public to qqq;grant select on zzz to qqq;select slot_name,plugin,slot_type,database,active,restart_lsn from pg_replication_slots where slot_name='sub01';
7.4 订阅端配置
# 1.创建复制用户:create user qqq replication login password 'qqq';# 2.订阅节点创建接收表create database subaaa;\c subaaa qqqcreate table zzz (id int primary key,sex character varying(2),name character varying(10),now_address character varying(100),address character varying(100));# 3.用 postgres 用户创建订阅节点\c subaaa postgrescreate subscription sub01 connection 'host=192.168.6.51 port=5432 dbname=pubaaa user=qqq password=qqq' publication pub01;select * from pg_subscription;
# 1.发布节点创建表结构\c pubaaa postgrescreate table test01(id int primary key,addr varchar(100));# 2.订阅节点创建表结构\c subaaa postgrescreate table test01(id int primary key,addr varchar(100));# 3.添加新表到发布节点\c pubaaa postgresgrant select on test01 to qqq;alter publication pub01 add table test01;insert into test01 values(generate_series(1,10000),repeat( chr(int4(random()*26)+65),88));--while [ true ];do echo `psql -d pubaaa -c "insert into test01 values((random()*(1000^3))::integer,repeat( chr(int4(random()*26)+65),1))"`;sleep 0.5;echo -e '\n';done# 4.发布节点查看发布列表中的表名select * from pg_publication_tables;
\c subaaa postgres;alter subscription sub01 disable;ALTER SUBSCRIPTION sub01 SET (slot_name = NONE);drop subscription sub01 ;
8 Logical Replication 逻辑订阅处理流程解析
8.1 表同步阶段处理流程概述
订阅端执行CREATE SUBSCRIPTION后,在后台进行表数据同步。每个表的数据同步状态记录在pg_subscription_rel.srsubstate中,一共有4种状态码。
'i':初始化(SUBREL_STATE_INIT)
'd':正在copy数据(SUBREL_STATE_DATASYNC)
's':已同步(SUBREL_STATE_SYNCDONE)
'r':准备好 (普通复制)(SUBREL_STATE_READY)
subaaa=# select * from pg_subscription_rel;srsubid | srrelid | srsubstate | srsublsn---------+---------+------------+------------29912 | 29904 | r | 0/3502AA90(1 row)
设置每个表的为srsubstate中 'i'(SUBREL_STATE_INIT); logical replication launcher 进程启动一个 logical replication apply worker 进程; logical replication apply worker 进程连接到订阅端开始接受订阅消息,此时表尚未完成初始同步(状态为 i 或 d ),跳过所有 insert、update 和 delete 消息的处理; logical replication apply worker 进程为每个未同步的表启动 logical replication sync worker 进程(每个订阅最多同时启动max_sync_workers_per_subscription个 sync worker); logical replication sync worker 进程连接到订阅端并同步初始数据 5.1 创建临时复制槽,并记录快照位置; 5.2 设置表同步状态为 'd'(SUBREL_STATE_DATASYNC) 5.3 copy 表数据 5.4 设置表同步状态为SUBREL_STATE_SYNCWAIT(内部状态),并等待 apply worker 更新状态为SUBREL_STATE_CATCHUP(内部状态) logical replication apply worker 进程更新表同步状态为SUBREL_STATE_CATCHUP(内部状态),记录最新lsn,并等待 sync worker 更新状态为SUBREL_STATE_SYNCDONE logical replication sync worker 进程完成初始数据同步 7.1 检查 apply worker 当前处理的订阅消息位置是否已经走到了快照位置前面,如果是从订阅端接受消息并处理直到追上 apply worker 。 7.2 设置表同步状态为 's'(SUBREL_STATE_SYNCDONE) 7.3 进程退出 logical replication apply worker 进程继续接受订阅消息并处理 8.1 接受到 insert、update 和 delete 消息,如果是同步点(进入's' 或 'r' 状态时的 lsn 位置)之后的消息进行应用。 8.2 接受到 commit 消息 8.3 暂时没有新的消息处理
8.2 表同步后的持续逻辑复制
订阅表进入同步状态(状态码是 ‘s’ 或 'r' )后,发布端的变更都会通过消息通知订阅端;订阅端 apply worker 按照订阅消息的接受顺序(即发布端事务提交顺序)对每个表 apply 变更,并反馈 apply 位置,用于监视复制延迟。
通过调试,确认发布端发生更新时,发送给订阅端的数据包。
# 插入订阅表insert into test01 values(100,'insert 1 条');
发布端修改订阅表时,在事务提交时,发布端依次发送下面的消息到订阅端
B(BEGIN) R(RELATION) I(INSERT) C(COMMIT)更新复制源状态表 pg_replication_origin_status
中的remote_lsn
和local_lsn
,该位点对应于每个订阅表最后一次事务提交的位置。k(KEEPALIVE) k(KEEPALIVE)2个 keepalive 消息,会更新统计表中的位置:
pg_stat_replication:
write_lsn,
flush_lsn,
replay_lsnselect usesysid,usename,application_name,sent_lsn,write_lsn,flush_lsn,replay_lsn,sync_state from pg_stat_replication where usename = 'rep';
pg_get_replication_slots():
confirmed_flush_lsnselect slot_name,plugin,active_pid,restart_lsn,confirmed_flush_lsn from pg_get_replication_slots() where slot_name = 'sub1';
pg_stat_subscription:
latest_end_lsnselect subid,subname,received_lsn,latest_end_lsn from pg_stat_subscription;
更多精彩内容欢迎关注微信公众号





