暂无图片
暂无图片
1
暂无图片
暂无图片
暂无图片

如何解决PostgreSQL逻辑复制冲突

前言

逻辑复制过程中往往会出现冲突的情况。冲突的情况多种多样,我们今天就来研究一下怎么处理这些冲突。

根据PostgreSQL官方文档介绍,解决冲突的办法有两种:

  • 手工修改订阅库上的数据,使发布端传过来的SQL可以在订阅端上执行。
  • 通过pg_replication_origin_advance()
    函数跳过当前的事务。

手工解决冲突

我们来通过实际案例来模拟一下问题。

--发布端和订阅端都创建T1表,id为主键hr=# create table t1 (id int primary key,name text);--发布端创建发布指定t1表CREATE PUBLICATION testpub FOR TABLE t1;--创建订阅create subscription testsub connection 'host=192.168.56.119 port=5432 dbname=hr user=replication' publication testpub;

主库插入一些数据。

hr=# insert into t1 values(1,'aaa');INSERT 0 1hr=# insert into t1 values(2,'aaa');INSERT 0 1

此时订阅端会顺利的把这些数据同步过来。

hr=# select * from t1; id | name ----+------  1 | aaa  2 | aaa(2 rows)

接下来我们在订阅端插入id号为3的记录。

hr=# insert into t1 values(3,'aaa');INSERT 0 1

此时我们在发布端在插入同样的id号为3的记录,当它复制过来的时候,就会出现主键冲突的情况。我们来测试一下。

hr=# insert into t1 values(3,'aaa');INSERT 0 1

顺利插入之后,在从库的后台日志上,我们看到了冲突的产生。

2021-03-29 16:52:29.901 CST [2141] ERROR:  duplicate key value violates unique constraint "t1_pkey"2021-03-29 16:52:29.901 CST [2141] DETAIL:  Key (id)=(3) already exists.2021-03-29 16:52:29.902 CST [1470] LOG:  background worker "logical replication worker" (PID 2141) exited with exit code 12021-03-29 16:52:29.905 CST [2156] LOG:  logical replication apply worker for subscription "testsub" has started2021-03-29 16:52:29.911 CST [2156] ERROR:  duplicate key value violates unique constraint "t1_pkey"2021-03-29 16:52:29.911 CST [2156] DETAIL:  Key (id)=(3) already exists.2021-03-29 16:52:29.911 CST [1470] LOG:  background worker "logical replication worker" (PID 2156) exited with exit code 1

错误显示的比较明显,那么解决问题的第一种方法就是我们把订阅先disable一下,删除相应的记录,在enable订阅。

hr=# alter subscription testsub disable;ALTER SUBSCRIPTIONhr=# delete from t1 where id=3;DELETE 1hr=# alter subscription testsub enable;ALTER SUBSCRIPTION

执行完上述操作后,后台日志没有再报错。冲突已经解决。查看订阅侧的t1表数据已经被复制过来了。

hr=# select * from t1; id | name ----+------  1 | aaa  2 | aaa  3 | aaa(3 rows)

使用pg_replication_origin_advance跳过事务

接下来我们在订阅端插入id号为4的记录。

hr=#  insert into t1 values(4,'aaa');INSERT 0 1

然后发布端同样插入id号为4的记录。

hr=# insert into t1 values(4,'aaa');INSERT 0 1

此时就产生了事务冲突。那么接下来我们需要找到主库冲突的的LSN。找到这个LSN是比较麻烦的,假设我们的发布端上一直有程序在执行插入。你会发现当前的LSN一直在往前增加。

insert into t1 values(5,'aaa');insert into t1 values(6,'aaa');hr=# SELECT pg_current_wal_lsn() ; pg_current_wal_lsn -------------------- 0/4A905B98(1 row)hr=# insert into t1 values(7,'aaa');INSERT 0 1hr=# SELECT pg_current_wal_lsn() ;   pg_current_wal_lsn -------------------- 0/4A905C48(1 row)

此时你要跳跃的LSN位置就不知道从哪开始了?那么简单的做法是通过读取逻辑插槽来判断。

hr=#  select * from  pg_replication_slots;-[ RECORD 1 ]-------+-----------slot_name           | testsubplugin              | pgoutputslot_type           | logicaldatoid              | 99550database            | hrtemporary           | factive              | factive_pid          | xmin                | catalog_xmin        | 29946restart_lsn         | 0/4A903990confirmed_flush_lsn | 0/4A9039C8wal_status          | reservedsafe_wal_size       | 

逻辑插槽记录了复制的位置。我们要查看逻辑插槽中的WAL内容。要查看WAL内容先需要复制一个逻辑插槽。然后再进行查看。

注:这里之所以要复制一个,是因为你没有办法直接查看当前再使用的逻辑插槽。复制一个就可以查看复制后的插槽。

hr=# SELECT * from pg_logical_slot_peek_changes('testsub',null,null);ERROR:  replication slot "testsub" is active for PID 3137hr=# SELECT pg_copy_logical_replication_slot('testsub', 'test_sub_copy', true, 'test_decoding'); pg_copy_logical_replication_slot ---------------------------------- (test_sub_copy,0/4A9039C8)(1 row)hr=# SELECT * from pg_logical_slot_peek_changes('test_sub_copy', NULL, NULL);    lsn     |  xid  |                          data                           ------------+-------+--------------------------------------------------------- 0/4A9039C8 | 29946 | BEGIN 29946 0/4A9039C8 | 29946 | table public.t1: INSERT: id[integer]:4 name[text]:'aaa' 0/4A903B98 | 29946 | COMMIT 29946   -->  一般在commit位置 0/4A903BD0 | 29947 | BEGIN 29947 0/4A903BD0 | 29947 | table public.t1: INSERT: id[integer]:5 name[text]:'aaa' 0/4A903C80 | 29947 | COMMIT 29947 0/4A905920 | 29948 | BEGIN 29948 0/4A905920 | 29948 | table public.t1: INSERT: id[integer]:6 name[text]:'aaa' 0/4A905B60 | 29948 | COMMIT 29948 0/4A905B98 | 29949 | BEGIN 29949 0/4A905B98 | 29949 | table public.t1: INSERT: id[integer]:7 name[text]:'aaa' 0/4A905C48 | 29949 | COMMIT 29949(12 rows)

我们对复制的插槽进行了分析,发现一共有4条记录,那么究竟在哪个位置出现了冲突。一般是第一个记录的commit这里。也就是0/4A903B98

找到这个lsn后,我们在订阅侧就可以执行pg_replication_origin_advance
函数来跳过事务了,第一个参数是表的外部节点标识符。可以从pg_replication_origin_status
视图中获取,第二个参数就是刚刚我们查的0/4A903B98

hr=#  SELECT * FROM pg_replication_origin_status; local_id | external_id | remote_lsn | local_lsn  ----------+-------------+------------+------------        1 | pg_24618    | 0/4A9038A8 | 0/7913EAC8

这里的external_id,就是表的外部节点标识符。

SELECT pg_replication_origin_advance ('pg_24618''0/4A903B98'::pg_lsn);

执行完成之后再次查看jobs。冲突的事务已经跳过。数据已经复制过来了。

hr=# select * from t1; id | name ----+------  1 | aaa  2 | aaa  5 | aaa  6 | aaa  7 | aaa  3 | aaa  4 | aaa(7 rows)

后记

以上就是解决PostgreSQL逻辑复制冲突的办法,您学会了吗?

励志成为PostgreSQL大神

长按关注吧

文章转载自励志成为postgresql大神,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论