
前言
逻辑复制过程中往往会出现冲突的情况。冲突的情况多种多样,我们今天就来研究一下怎么处理这些冲突。
根据PostgreSQL官方文档介绍,解决冲突的办法有两种:
手工修改订阅库上的数据,使发布端传过来的SQL可以在订阅端上执行。 通过 pg_replication_origin_advance()
函数跳过当前的事务。
手工解决冲突
我们来通过实际案例来模拟一下问题。
--发布端和订阅端都创建T1表,id为主键hr=# create table t1 (id int primary key,name text);--发布端创建发布指定t1表CREATE PUBLICATION testpub FOR TABLE t1;--创建订阅create subscription testsub connection 'host=192.168.56.119 port=5432 dbname=hr user=replication' publication testpub;
主库插入一些数据。
hr=# insert into t1 values(1,'aaa');INSERT 0 1hr=# insert into t1 values(2,'aaa');INSERT 0 1
此时订阅端会顺利的把这些数据同步过来。
hr=# select * from t1; id | name ----+------ 1 | aaa 2 | aaa(2 rows)
接下来我们在订阅端插入id号为3的记录。
hr=# insert into t1 values(3,'aaa');INSERT 0 1
此时我们在发布端在插入同样的id号为3的记录,当它复制过来的时候,就会出现主键冲突的情况。我们来测试一下。
hr=# insert into t1 values(3,'aaa');INSERT 0 1
顺利插入之后,在从库的后台日志上,我们看到了冲突的产生。
2021-03-29 16:52:29.901 CST [2141] ERROR: duplicate key value violates unique constraint "t1_pkey"2021-03-29 16:52:29.901 CST [2141] DETAIL: Key (id)=(3) already exists.2021-03-29 16:52:29.902 CST [1470] LOG: background worker "logical replication worker" (PID 2141) exited with exit code 12021-03-29 16:52:29.905 CST [2156] LOG: logical replication apply worker for subscription "testsub" has started2021-03-29 16:52:29.911 CST [2156] ERROR: duplicate key value violates unique constraint "t1_pkey"2021-03-29 16:52:29.911 CST [2156] DETAIL: Key (id)=(3) already exists.2021-03-29 16:52:29.911 CST [1470] LOG: background worker "logical replication worker" (PID 2156) exited with exit code 1
错误显示的比较明显,那么解决问题的第一种方法就是我们把订阅先disable一下,删除相应的记录,在enable订阅。
hr=# alter subscription testsub disable;ALTER SUBSCRIPTIONhr=# delete from t1 where id=3;DELETE 1hr=# alter subscription testsub enable;ALTER SUBSCRIPTION
执行完上述操作后,后台日志没有再报错。冲突已经解决。查看订阅侧的t1表数据已经被复制过来了。
hr=# select * from t1; id | name ----+------ 1 | aaa 2 | aaa 3 | aaa(3 rows)
使用pg_replication_origin_advance跳过事务
接下来我们在订阅端插入id号为4的记录。
hr=# insert into t1 values(4,'aaa');INSERT 0 1
然后发布端同样插入id号为4的记录。
hr=# insert into t1 values(4,'aaa');INSERT 0 1
此时就产生了事务冲突。那么接下来我们需要找到主库冲突的的LSN。找到这个LSN是比较麻烦的,假设我们的发布端上一直有程序在执行插入。你会发现当前的LSN一直在往前增加。
insert into t1 values(5,'aaa');insert into t1 values(6,'aaa');hr=# SELECT pg_current_wal_lsn() ; pg_current_wal_lsn -------------------- 0/4A905B98(1 row)hr=# insert into t1 values(7,'aaa');INSERT 0 1hr=# SELECT pg_current_wal_lsn() ; pg_current_wal_lsn -------------------- 0/4A905C48(1 row)
此时你要跳跃的LSN位置就不知道从哪开始了?那么简单的做法是通过读取逻辑插槽来判断。
hr=# select * from pg_replication_slots;-[ RECORD 1 ]-------+-----------slot_name | testsubplugin | pgoutputslot_type | logicaldatoid | 99550database | hrtemporary | factive | factive_pid | xmin | catalog_xmin | 29946restart_lsn | 0/4A903990confirmed_flush_lsn | 0/4A9039C8wal_status | reservedsafe_wal_size |
逻辑插槽记录了复制的位置。我们要查看逻辑插槽中的WAL内容。要查看WAL内容先需要复制一个逻辑插槽。然后再进行查看。
注:这里之所以要复制一个,是因为你没有办法直接查看当前再使用的逻辑插槽。复制一个就可以查看复制后的插槽。
hr=# SELECT * from pg_logical_slot_peek_changes('testsub',null,null);ERROR: replication slot "testsub" is active for PID 3137hr=# SELECT pg_copy_logical_replication_slot('testsub', 'test_sub_copy', true, 'test_decoding'); pg_copy_logical_replication_slot ---------------------------------- (test_sub_copy,0/4A9039C8)(1 row)hr=# SELECT * from pg_logical_slot_peek_changes('test_sub_copy', NULL, NULL); lsn | xid | data ------------+-------+--------------------------------------------------------- 0/4A9039C8 | 29946 | BEGIN 29946 0/4A9039C8 | 29946 | table public.t1: INSERT: id[integer]:4 name[text]:'aaa' 0/4A903B98 | 29946 | COMMIT 29946 --> 一般在commit位置 0/4A903BD0 | 29947 | BEGIN 29947 0/4A903BD0 | 29947 | table public.t1: INSERT: id[integer]:5 name[text]:'aaa' 0/4A903C80 | 29947 | COMMIT 29947 0/4A905920 | 29948 | BEGIN 29948 0/4A905920 | 29948 | table public.t1: INSERT: id[integer]:6 name[text]:'aaa' 0/4A905B60 | 29948 | COMMIT 29948 0/4A905B98 | 29949 | BEGIN 29949 0/4A905B98 | 29949 | table public.t1: INSERT: id[integer]:7 name[text]:'aaa' 0/4A905C48 | 29949 | COMMIT 29949(12 rows)
我们对复制的插槽进行了分析,发现一共有4条记录,那么究竟在哪个位置出现了冲突。一般是第一个记录的commit这里。也就是0/4A903B98
找到这个lsn后,我们在订阅侧就可以执行pg_replication_origin_advance
函数来跳过事务了,第一个参数是表的外部节点标识符。可以从pg_replication_origin_status
视图中获取,第二个参数就是刚刚我们查的0/4A903B98
。
hr=# SELECT * FROM pg_replication_origin_status; local_id | external_id | remote_lsn | local_lsn ----------+-------------+------------+------------ 1 | pg_24618 | 0/4A9038A8 | 0/7913EAC8
这里的external_id,就是表的外部节点标识符。
SELECT pg_replication_origin_advance ('pg_24618', '0/4A903B98'::pg_lsn);
执行完成之后再次查看jobs。冲突的事务已经跳过。数据已经复制过来了。
hr=# select * from t1; id | name ----+------ 1 | aaa 2 | aaa 5 | aaa 6 | aaa 7 | aaa 3 | aaa 4 | aaa(7 rows)
后记
以上就是解决PostgreSQL逻辑复制冲突的办法,您学会了吗?


励志成为PostgreSQL大神
长按关注吧




