暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

PostgreSQL 逻辑复制模块介绍

ClickHouse周边 2021-07-19
2215

1 流复制

        大家都知道Streaming Replication已经成为PostgreSQL的一部分,并且通常用于pg的高可用和读写分离,流复制是基于 WAL 日志的物理复制,适用于整个数据库集簇的复制,并且备库是只读的。
        流复制(物理复制)是一个更为传统数据同步方式,在 pg10 之前流复制承载了pg主备之间数据同步的功能,它的实现方式是将wal日志中记录的内容按照确切的块地址逐字节的拷贝到备库,因此主备之间数据分布是一致的,这意味着在主备机器上,同一条记录的ctid是相同的。

2 逻辑复制

        逻辑复制也是基于WAL文件,由逻辑解析模块对wal日志进行初步的解析,它的解析结果为ReorderBufferChange(可以简单理解为HeapTupleData),再由pgoutput plugin对中间结果进行过滤和消息化拼接后,然后将其发送到订阅端,订阅端根据接收到的HeapTupleData重新对其执行insert、delete、update的操作。这里要注意,流复制是将数据从walrecord拷贝到数据页,逻辑复制是将数据重新执行一次insert、update或delete。
        通俗的说:在逻辑复制中把主库称为源端库,备库称为目标端数据库,源端数据库根据预先指定好的逻辑解析规则对WAL文件进行解析,把DML操作解析成一定的逻辑变化信息(标准SQL语句),源端数据库把标准SQL语句发给目标端数据库,目标端数据库接收到之后进行应用,从而实现数据同步。

3 流复制和逻辑复制对比

        逻辑复制与流复制最直观的不同就是复制粒度,逻辑复制支持表级复制,流复制支持整库复制。

主要区别:

  • 流复制主库上的事务提交不需要等待备库接收到WAL文件后的确认,逻辑复制相反。
  • 流复制要求主备库的大版本一致,逻辑复制可以跨大版本的数据同步,也可以实现异构数据库的数据同步。
  • 流复制的主库可读写,从库只允许读,逻辑复制的目标端数据库要求可读写。
  • 流复制是对实例级别的复制(整个postgresql数据库集簇),逻辑复制是选择性的复制一些表,所以是对表级别的复制。
  • 流复制有主库的DDL、DML操作,逻辑复制只有DML操作。

4 逻辑复制家族

        logical decoding是实现所有logical功能的核心技术,下面有对它的详细解释。跟这个核心技术一起加入PG内核的还有一个demo级别的功能test_decoding,但是 test_decoding 的输出结果看起来没有那么容易被二次开发使用,wal2json 以一个 contrib 的形式出现,它的输出结果对二次开发更加友好。BDR 和 pglogical 是实现了逻辑复制的插件,BDR 特点是支持多主机、DDL 复制、全局序列、全局 DDL 锁, pglogical 特点是灵活性高、支持级联逻辑复制,在他们的基础上PG内核引入了 logical replication ,它实现了逻辑复制的功能,从使用方法来看 logical replication 与 pglogical 更为一致。

5 核心技术:logical decoding

        walsender 进程不断的从自己的复制槽中获取新产生的 wal record ,并通过内核中的 LogicalDecodingProcessRecord() 函数进行 wal record 的初步过滤和解析,解析结果为一个 ReorderBufferChange 结构的数据(对于 DML 语句而言这个结构里面主要的信息为 oldtuple 和 newtuple ),并将这个解析结果放置到当前事务 ID 对应的解析 buf 中去。当某个事务发生了提交,这个事务对应的 buf 里面的 ReorderBufferChange 会被指定的 plugin 做二次处理,并将处理结果按照指定的途径输出。

6 社区插件的原理及功能

6.1 test_decoding plugin

        test_decoding是Postgres现有的一个plugin,它的主要作用是将筛选过后的wal日志,转化为人们可以理解的形式。现在PG内部有两种方法可以使用plugin。如下是对这两种情况的简单说明。详情请查看官方文档。

6.1.1 内置SQL函数使用

    --创建复制槽
    test=# SELECT * FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding');
    slot_name | lsn
    -----------------+-----------
    regression_slot | 0/16569D8
    (1 row)


    -- 查看复制槽
    test=# select * from pg_replication_slots ;


    --插入一条数据到test01表中
    test=# insert into test01 values(1,'test_decoding use by SQL');
    INSERT 0 1


    --查看解析的sql
    test=# SELECT * FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL);
    lsn | xid | data
    ------------+------+-----------------------------------------------------------------------------------------------
    0/340001F8 | 1673 | BEGIN 1673
    0/340001F8 | 1673 | table public.test01: INSERT: id[integer]:1 name[character varying]:'test_decoding use by SQL'
    0/34000628 | 1673 | COMMIT 1673

    test_decoding 流程图
    6.1.2 pg_recvlogical使用
      文档地址:http://www.postgres.cn/docs/12/app-pgrecvlogical.html

      pg_recvlogical 流程图
      6.2 wal2json plugin
              wal2json plugin 是 test_decoding 的一个升级版,它优化了输出结果的结构,使之更容易被应用,输出的是 json 格式的数据。
        # 在test库中创建wal2json类型的复制槽 test_slot_wal2json
        $ pg_recvlogical -d test --slot test_slot_wal2json --create-slot -P wal2json


        # 输出逻辑解码的信息到当前目录下的 ld_wal2json.out 文件中
        $ pg_recvlogical -d test --slot test_slot_wal2json --start -o pretty-print=1 -f ld_wal2json.out &


        #从test库中插入一条数据
        $ psql -d test -c "insert into test01 values(2,'wal2json use by pg_recvlogical');"




        查看 ld_wal2json.out 结果
        {
        "change": [
        {
        "kind": "insert",
        "schema": "public",
        "table": "test01",
        "columnnames": ["id", "name"],
        "columntypes": ["integer", "character varying"],
        "columnvalues": [2, "wal2json use by pg_recvlogical"]
        }
        ]
        }
        6.3 logical replication
                pger 可以使用test_decoding去认识逻辑复制,但是并不能真正的应用test_decoding来做什么,除非在test_decoding的基础上写代码去实现它的后续。logical replication真正的让用户可以在Postgres上体验另外一种不同的数据同步方式。换句话说,用户无法直接使用test_decoding功能用于生产环境,除非你使用第三方插件才能使用test_decoding完成数据同步的功能,而logical replication使得logical decoding技术更容易的用于生产环境。

        6.4 pglogical plugin

                pglogical 是 PostgreSQL 的拓展模块, 为 PostgreSQL 数据库提供了逻辑流复制发布和订阅的功能。pglogical 重用了 BDR 项目中的一部分相关技术。pglogical 是一个完全作为 PostgreSQL 扩展实现的逻辑复制系统。完全集成,它不需要触发器或外部程序。这种物理复制的替代方法是使用发布/订阅模型复制数据以进行选择性复制的一种高效方法。提供比 Slony、Bucardo 或 Londiste 更快的复制速度,以及跨版本升级。
                pglogical和logical replicate的原理是相同的,只不过pglogical有更加强大的冲突处理能力。
          pglogical 文档:
          https://www.2ndquadrant.com/en/resources/pglogical/pglogical-docs/


          pglogical下载地址:
          http://packages.2ndquadrant.com/pglogical/tarballs/

          6.5 其他逻辑解码插件

          • pgoutput  plugin

                    标准的 pg12 默认的逻辑解码插件。

          • decoderbufs

                    一个PostgreSQL逻辑解码器输出插件,用于将数据作为协议缓冲区传送。

          • decoder_raw

                    重新构造应用更改的查询。

          • ali_decoding

                    阿里巴巴研发的开源的解码插件。

          • BDR

                    专门设计用于分布在不同地理位置的集群的双向复制,区别与使用触发器进行双向复制的如SymmetricDS 的双主数据库。如果一个特定的数据集只在一个节点上修改,BDR工作的最好。BDR支持地理上分布的集群,不受距离的限制,并包含了地理围栏的能力。它的设计目的是最小化节点间的延迟。

          7 Logical Replication 逻辑复制演示

          7.1 发布端配置

            # 1.postgresql.conf
            listen_addresses = '*'
            wal_level = logical
            max_replication_slots = 8
            max_wal_senders = 10


            # 2.pg_hba.conf
            host replication qqq 192.168.5.23/24 md5


            # 3.pg_ctl reload
            7.2 订阅端配置
              # 1.postgresql.conf
              listen_addresses = '*'
              wal_level = logical
              max_replication_slots = 8
              max_logical_replication_workers = 8


              # 2.pg_ctl reload

              7.3 发布端配置

                # 1.创建复制用户:
                create user qqq replication login password 'qqq';


                # 2.模拟造数:
                create database pubaaa;
                \c pubaaa qqq
                create table zzz (id int primary key,sex character varying(2),name character varying(10),now_address character varying(100),address character varying(100));
                insert into zzz values(generate_series(1,10000),repeat( chr(int4(random()*26)+65),1),repeat( chr(int4(random()*26)+65),6),repeat( chr(int4(random()*26)+65),60),repeat( chr(int4(random()*26)+65),50));
                create index on test(id,sex);


                # 3.用 postgres 用户创建发布节点
                \c pubaaa postgres
                create publication pub01 for table zzz;
                select * from pg_publication;


                # 4.发布节点为复制用户授权
                \c pubaaa postgres
                grant connect on database pubaaa to qqq;
                  grant usage on schema public to qqq;
                grant select on zzz to qqq;
                  select slot_name,plugin,slot_type,database,active,restart_lsn from pg_replication_slots where slot_name='sub01';

                7.4 订阅端配置

                  # 1.创建复制用户:
                  create user qqq replication login password 'qqq';


                  # 2.订阅节点创建接收表
                  create database subaaa;
                  \c subaaa qqq
                  create table zzz (id int primary key,sex character varying(2),name character varying(10),now_address character varying(100),address character varying(100));


                  # 3.用 postgres 用户创建订阅节点
                  \c subaaa postgres
                  create subscription sub01 connection 'host=192.168.6.51 port=5432 dbname=pubaaa user=qqq password=qqq' publication pub01;
                    select * from pg_subscription;
                  7.5 新加复制表
                    # 1.发布节点创建表结构
                    \c pubaaa postgres
                    create table test01(id int primary key,addr varchar(100));


                    # 2.订阅节点创建表结构
                    \c subaaa postgres
                    create table test01(id int primary key,addr varchar(100));


                    # 3.添加新表到发布节点
                    \c pubaaa postgres
                    grant select on test01 to qqq;
                    alter publication pub01 add table test01;
                      insert into test01 values(generate_series(1,10000),repeat( chr(int4(random()*26)+65),88));
                    --while [ true ];do echo `psql -d pubaaa -c "insert into test01 values((random()*(1000^3))::integer,repeat( chr(int4(random()*26)+65),1))"`;sleep 0.5;echo -e '\n';done


                    # 4.发布节点查看发布列表中的表名
                    select * from pg_publication_tables;
                    7.6 摘除订阅端
                      \c subaaa postgres;
                      alter subscription sub01 disable;
                      ALTER SUBSCRIPTION sub01 SET (slot_name = NONE);
                      drop subscription sub01 ;

                      8 Logical Replication 逻辑订阅处理流程解析

                      8.1 表同步阶段处理流程概述

                              订阅端执行CREATE SUBSCRIPTION后,在后台进行表数据同步。每个表的数据同步状态记录在pg_subscription_rel.srsubstate中,一共有4种状态码。

                      • 'i':初始化(SUBREL_STATE_INIT)

                      • 'd':正在copy数据(SUBREL_STATE_DATASYNC)

                      • 's':已同步(SUBREL_STATE_SYNCDONE)

                      • 'r':准备好 (普通复制)(SUBREL_STATE_READY)

                        subaaa=# select * from pg_subscription_rel;
                        srsubid | srrelid | srsubstate | srsublsn
                        ---------+---------+------------+------------
                        29912 | 29904 | r | 0/3502AA90
                        (1 row)
                        从执行CREATE SUBSCRIPTION开始订阅端的相关处理流程概述如下:
                        1. 设置每个表的为srsubstate中 'i'(SUBREL_STATE_INIT);
                        2. logical replication launcher 进程启动一个 logical replication apply worker 进程;
                        3. logical replication apply worker 进程连接到订阅端开始接受订阅消息,此时表尚未完成初始同步(状态为 i 或 d ),跳过所有 insert、update 和 delete 消息的处理;
                        4. logical replication apply worker 进程为每个未同步的表启动 logical replication sync worker 进程(每个订阅最多同时启动max_sync_workers_per_subscription个 sync worker);
                        5. logical replication sync worker 进程连接到订阅端并同步初始数据
                          • 5.1 创建临时复制槽,并记录快照位置;
                          • 5.2 设置表同步状态为 'd'(SUBREL_STATE_DATASYNC)
                          • 5.3 copy 表数据
                          • 5.4 设置表同步状态为SUBREL_STATE_SYNCWAIT(内部状态),并等待 apply worker 更新状态为SUBREL_STATE_CATCHUP(内部状态)
                        6. logical replication apply worker 进程更新表同步状态为SUBREL_STATE_CATCHUP(内部状态),记录最新lsn,并等待 sync worker 更新状态为SUBREL_STATE_SYNCDONE
                        7. logical replication sync worker 进程完成初始数据同步
                          • 7.1 检查 apply worker 当前处理的订阅消息位置是否已经走到了快照位置前面,如果是从订阅端接受消息并处理直到追上 apply worker 。
                          • 7.2 设置表同步状态为 's'(SUBREL_STATE_SYNCDONE)
                          • 7.3 进程退出
                        8. logical replication apply worker 进程继续接受订阅消息并处理
                          • 8.1 接受到 insert、update 和 delete 消息,如果是同步点(进入's' 或 'r' 状态时的 lsn 位置)之后的消息进行应用。
                          • 8.2 接受到 commit 消息
                          • 8.3 暂时没有新的消息处理

                        8.2 表同步后的持续逻辑复制

                                订阅表进入同步状态(状态码是 ‘s’ 或 'r' )后,发布端的变更都会通过消息通知订阅端;订阅端 apply worker 按照订阅消息的接受顺序(即发布端事务提交顺序)对每个表 apply 变更,并反馈 apply 位置,用于监视复制延迟。

                                通过调试,确认发布端发生更新时,发送给订阅端的数据包。

                          # 插入订阅表
                          insert into test01 values(100,'insert 1 条');

                          发布端修改订阅表时,在事务提交时,发布端依次发送下面的消息到订阅端

                          • B(BEGIN)
                          • R(RELATION)
                          • I(INSERT)
                          • C(COMMIT)更新复制源状态表pg_replication_origin_status
                            中的remote_lsn
                            local_lsn
                            ,该位点对应于每个订阅表最后一次事务提交的位置。
                          • k(KEEPALIVE)
                          • k(KEEPALIVE)2个 keepalive 消息,会更新统计表中的位置:
                          • 发布端pg_stat_replication
                            :write_lsn
                            ,flush_lsn
                            ,replay_lsn
                              select usesysid,usename,application_name,sent_lsn,write_lsn,flush_lsn,replay_lsn,sync_state from pg_stat_replication where usename = 'rep';
                              发布端pg_get_replication_slots()
                              :confirmed_flush_lsn
                                select slot_name,plugin,active_pid,restart_lsn,confirmed_flush_lsn from pg_get_replication_slots() where slot_name = 'sub1';
                                订阅端更新pg_stat_subscription
                                :latest_end_lsn
                                  select subid,subname,received_lsn,latest_end_lsn from pg_stat_subscription;




                                更多精彩内容欢迎关注微信公众号


                                文章转载自ClickHouse周边,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

                                评论