暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

PostgreSQL 逻辑复制模块(二)

ClickHouse周边 2022-08-23
1124

前言

      本文主要介绍PostgreSQL 逻辑复制社区插件原理与功能、逻辑订阅处理流程解析、适应场景,了解PostgreSQL 逻辑复制家族、核心技术、同步过程及同步原理等等,点击PostgreSQL 逻辑复制模块(一)

4 社区插件的原理及功能

4.1 test_decoding plugin

      test_decoding是Postgres现有的一个plugin,它的主要作用是将筛选过后的wal日志,转化为人们可以理解的形式。现在PG内部有两种方法可以使用plugin。如下是对这两种情况的简单说明。

官方文档:https://www.postgresql.org/docs/12/logicaldecoding-example.htm

4.1.1 内置SQL函数使用

    --创建复制槽
    test=# SELECT * FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding');
    slot_name | lsn
    -----------------+-----------
    regression_slot | 0/16569D8
    (1 row)


    -- 查看复制槽
    test=# select * from pg_replication_slots ;


    --插入一条数据到test01表中
    test=# insert into test01 values(1,'test_decoding use by SQL');
    INSERT 0 1


    --查看解析的sql
    test=# SELECT * FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL);
    lsn | xid | data
    ------------+------+-----------------------------------------------------------------------------------------------
    0/340001F8 | 1673 | BEGIN 1673
    0/340001F8 | 1673 | table public.test01: INSERT: id[integer]:1 name[character varying]:'test_decoding use by SQL'
    0/34000628 | 1673 | COMMIT 1673

    test_decoding 原理图

    4.1.2 pg_recvlogical使用

    文档地址:http://www.postgres.cn/docs/12/app-pgrecvlogical.html

      # 在test库中创建默认test_decoding类型的复制槽 test2
      [postgres@bogon ~]$ pg_recvlogical -d test --slot=test2 --create-slot


      # 输出逻辑解码的信息到当前目录下的 ld.out 文件中
      [postgres@bogon ~]$ pg_recvlogical -d test --slot=test2 --start -f ld.out &
      [1] 19807


      #从test库中插入一条数据
      [postgres@bogon ~]$ psql -d test -c "insert into test01 values(2,'test_decoding use by pg_recvlogical');"
      INSERT 0 1


      #查看ld.out 中的信息
      [postgres@bogon ~]$ cat ld.out
      BEGIN 1674
      table public.test01: INSERT: id[integer]:2 name[character varying]:'test_decoding use by pg_recvlogical'
      COMMIT 1674

      pg_recvlogical 命令流程图

      4.2 wal2json plugin

             wal2json plugin 是 test_decoding 的一个升级版,它优化了输出结果的结构,使之更容易被应用,输出的是 json 格式的数据。

        # 在test库中创建wal2json类型的复制槽 test_slot_wal2json
        [postgres@bogon ~]$ pg_recvlogical -d test --slot test_slot_wal2json --create-slot -P wal2json


        # 输出逻辑解码的信息到当前目录下的 ld_wal2json.out 文件中
        [postgres@bogon ~]$ pg_recvlogical -d test --slot test_slot_wal2json --start -o pretty-print=1 -f ld_wal2json.out &


        #从test库中插入一条数据
        [postgres@bogon ~]$ psql -d test -c "insert into test01 values(2,'wal2json use by pg_recvlogical');"
        INSERT 0 1

               查看 ld_wal2json.out 结果

          {
          "change": [
          {
          "kind": "insert",
          "schema": "public",
          "table": "test01",
          "columnnames": ["id", "name"],
          "columntypes": ["integer", "character varying"],
          "columnvalues": [2, "wal2json use by pg_recvlogical"]
          }
          ]
          }

          4.3 logical replication

                 pger 可以使用test_decoding去认识逻辑复制,但是并不能真正的应用test_decoding来做什么,除非在test_decoding的基础上写代码去实现它的后续。logical replication真正的让用户可以在Postgres上体验另外一种不同的数据同步方式。换句话说,用户无法直接使用test_decoding功能用于生产环境,除非你使用第三方插件才能使用test_decoding完成数据同步的功能,而logical replication使得logical decoding技术更容易的用于生产环境。

          4.4 pglogical plugin

                 pglogical 是 PostgreSQL 的拓展模块, 为 PostgreSQL 数据库提供了逻辑流复制发布和订阅的功能。pglogical 重用了 BDR 项目中的一部分相关技术。pglogical 是一个完全作为 PostgreSQL 扩展实现的逻辑复制系统。完全集成,它不需要触发器或外部程序。这种物理复制的替代方法是使用发布/订阅模型复制数据以进行选择性复制的一种高效方法。提供比 Slony、Bucardo 或 Londiste 更快的复制速度,以及跨版本升级。

                 pglogical和logical replicate的原理是相同的,只不过pglogical有更加强大的冲突处理能力。

          pglogical 文档:

          https://www.2ndquadrant.com/en/resources/pglogical/pglogical-docs/

          pglogical下载地址:

          http://packages.2ndquadrant.com/pglogical/tarballs/

          4.5 其他逻辑解码插件

          • pgoutput  plugin

                   标准的 pg12 默认的逻辑解码插件。

          • decoderbufs

                   一个PostgreSQL逻辑解码器输出插件,用于将数据作为协议缓冲区传送。

          • decoder_raw

                   重新构造应用更改的查询。

          • ali_decoding

                   阿里巴巴研发的开源的解码插件。

          • BDR

                   专门设计用于分布在不同地理位置的集群的双向复制,区别与使用触发器进行双向复制的如SymmetricDS 的双主数据库。如果一个特定的数据集只在一个节点上修改,BDR工作的最好。BDR支持地理上分布的集群,不受距离的限制,并包含了地理围栏的能力。它的设计目的是最小化节点间的延迟。

          5 Logical Replication 逻辑订阅处理流程解析

          5.1 表同步阶段处理流程概述

                 订阅端执行CREATE SUBSCRIPTION后,在后台进行表数据同步。每个表的数据同步状态记录在pg_subscription_rel.srsubstate中,一共有4种状态码。

          • 'i':初始化(SUBREL_STATE_INIT)

          • 'd':正在copy数据(SUBREL_STATE_DATASYNC)

          • 's':已同步(SUBREL_STATE_SYNCDONE)

          • 'r':准备好 (普通复制)(SUBREL_STATE_READY)

            subdb=# select * from pg_subscription_rel;
            srsubid | srrelid | srsubstate | srsublsn
            ---------+---------+------------+------------
            29912 | 29904 | r | 0/3502AA90
            (1 row)

                   从执行CREATE SUBSCRIPTION开始订阅端的相关处理流程概述如下:

            1. 设置每个表的为srsubstate中 'i'(SUBREL_STATE_INIT);

            2. logical replication launcher 进程启动一个 logical replication apply worker 进程;

            3. logical replication apply worker 进程连接到订阅端开始接受订阅消息,此时表尚未完成初始同步(状态为 i 或 d ),跳过所有 insert、update 和 delete 消息的处理;

            4. logical replication apply worker 进程为每个未同步的表启动 logical replication sync worker 进程(每个订阅最多同时启动max_sync_workers_per_subscription个 sync worker);

            5. logical replication sync worker 进程连接到订阅端并同步初始数据;

              • 创建临时复制槽,并记录快照位置

              • 设置表同步状态为 'd'(SUBREL_STATE_DATASYNC)

              • copy 表数据

              • 设置表同步状态为SUBREL_STATE_SYNCWAIT(内部状态),并等待 apply worker 更新状态为SUBREL_STATE_CATCHUP(内部状态)

            6. logical replication apply worker 进程更新表同步状态为SUBREL_STATE_CATCHUP(内部状态),记录最新lsn,并等待 sync worker 更新状态为SUBREL_STATE_SYNCDONE;

            7. logical replication sync worker 进程完成初始数据同步;

              • 检查 apply worker 当前处理的订阅消息位置是否已经走到了快照位置前面,如果是从订阅端接受消息并处理直到追上 apply worker 

              • 设置表同步状态为 's'(SUBREL_STATE_SYNCDONE)

              • 进程退出

            8. logical replication apply worker 进程继续接受订阅消息并处理。

              • 接受到 insert、update 和 delete 消息,如果是同步点(进入's' 或 'r' 状态时的 lsn 位置)之后的消息进行应用

              • 接受到 commit 消息

              • 暂时没有新的消息处理

                  1.向发布端发送订阅位置反馈

                 2.如果不在事务块里,同步表状态。将所有处于 's'(SUBREL_STATE_SYNCDONE)同步状态的表更新为 'r'(SUBREL_STATE_READY)

            5.2 表同步后的持续逻辑复制

                   订阅表进入同步状态(状态码是 ‘s’ 或 'r' )后,发布端的变更都会通过消息通知订阅端;订阅端 apply worker 按照订阅消息的接受顺序(即发布端事务提交顺序)对每个表 apply 变更,并反馈 apply 位置,用于监视复制延迟。

                   插入订阅表

              insert into test01 values(100,'insert 1 条');

                     发布端修改订阅表时,在事务提交时,发布端依次发送下面的消息到订阅端

              • B(BEGIN)

              • R(RELATION)

              • I(INSERT)

              • C(COMMIT)

                更新复制源状态表pg_replication_origin_status中的remote_lsn和local_lsn,该位点对应于每个订阅表最后一次事务提交的位置。

              • k(KEEPALIVE)

              • k(KEEPALIVE)

                2个 keepalive 消息,会更新统计表中的位置:

                • 发布端pg_stat_replication:write_lsn,flush_lsn,replay_lsn

                  pubdb=# select usesysid,usename,application_name,sent_lsn,write_lsn,flush_lsn,replay_lsn,sync_state from pg_stat_replication where usename = 'rep';
                  • 发布端pg_get_replication_slots():confirmed_flush_lsn

                    pubdb=# select slot_name,plugin,active_pid,restart_lsn,confirmed_flush_lsn from pg_get_replication_slots() where slot_name = 'sub1';
                    • 订阅端更新pg_stat_subscription:latest_end_lsn

                      subdb=# select subid,subname,received_lsn,latest_end_lsn from pg_stat_subscription;

                    5.3 异常处理

                    5.3.1 sync worker 进程

                    1. SQL错误(如主键冲突):worker 进程异常退出,之后 apply worker 进程创建一个新的 sync worker 重试。错误解除前每5秒重试一次

                    2. 表被锁:等待

                    3. 更新或删除的记录不存在:正常执行,检测不到错误,也么没有日志输出(输出一条DEBUG1级别的日志)

                    5.3.2 apply worker 进程

                    1. SQL错误(如主键冲突):apply worker 进程异常退出,之后 logical replication launcher 进程创建一个新的apply worker 重试。错误解除前每5秒重试一次

                    2. 表被锁:等待

                    3. 更新或删除的记录不存在:正常执行,检测不到错误,也么没有日志输出(输出一条DEBUG1级别的日志)。

                    6 逻辑复制适用场景

                    • 可从多个上游服务器,做数据的聚集和合并

                             发布者跟订阅者的关系:一个发布者可以被多个订阅者订阅。多个发布者可以被同一个订阅者订阅。

                    • PostgreSQL大版本升级,数据直接同步到高版本

                             pglogical 对 PostgreSQL 版本升级是一个很实用的工具。能实现以几乎为零的停机时间迁移和升级PostgreSQL。局限性在于pglogical支持的 PostgreSQL 版本。

                    • 将多个数据库实例的数据,同步到一个目标数据库。例如多个数据库同步到一个大的数据仓库

                    • 满足业务上需求,实现某些指定表的数据同步

                    7 逻辑复制注意事项

                    • 发布节点的WAL_LEVEL参数需要设置成LOGICAL

                    • 发布节点上逻辑复制用户至少需要REPLICATION角色权限

                    • 支持一次发布一个数据库中的所有表

                    • 发布节点上需要发布的表如果需要将UPDATE/DELETE操作同步到订阅节点,需要给发布配置复制标识(复制标识默认为主键,如果没有主键也可以是唯一索引)

                    • 发布表上的DDL操作不会自动同步到订阅节点,如果发布节点上发布的表执行了DDL操作,需手工给订阅节点的相应表执行DDL,之前如果有没有同步的数据会自动同步

                    • 当发布者添加新表时,订阅者不能自动的获知,需要将表的SELECT权限赋给逻辑复制用户 (发布端执行 )

                    • 订阅者列多于发布者(订阅者的列包含发布者的列)时,发布者插入的数据会同步到订阅者,多的字段内容为 NULL

                    • 建立逻辑复制后,由于冲突或者表结构变更,导致逻辑复制关系挂起后,通过解决冲突和问题后,逻辑复制关系会恢复,并会同步此时发布者的数据

                    • 发布者表结构变更后,插入数据(1、字段为之前数据类型,2、字段为变更后的数据类型),逻辑复制挂起,均不能进行数据同步,当解决冲突后,逻辑复制恢复,同步数据


                    声明
                           因小编个人水平有限,专栏中难免存在错漏之处,请勿直接复制文档中的参数、命令或方法应用于线上环境中操作。



                    近期文章推荐
                    PostgreSQL 逻辑复制模块(一)
                    PostgreSQL体系结构(上)
                    PostgreSQL体系结构(下)


                    文章转载自ClickHouse周边,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

                    评论