暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

如何设置和运行PostgreSQL变更数据捕获

原创 eternity 2022-07-18
1064

现代web应用程序的架构由几个软件组件组成,如仪表板、分析、数据库、数据湖、缓存、搜索等。

数据库通常是任何应用程序的核心部分。实时数据更新使不同的数据系统保持连续同步,并快速响应新信息。那么,如何保持应用程序生态系统的同步呢?这些其他组件如何获取有关数据库中更改的信息?变更数据捕获或CDC是指识别新数据或变更数据的任何解决方案。

这篇文章是关于PostgreSQL CDC和实现方法的。

更改数据捕获(CDC)是一种数据集成方法,用于检测、捕获和交付对数据库数据源所做的更改。

通常,基于CDC的数据集成包括以下步骤:

  • 在源数据库中捕获更改数据。

  • 将更改后的数据转换为消费者可以接受的格式。

  • 将数据发布到消费者或目标数据库。

  • PostgreSQL提供了两种内置方式来实现CDC:

从事务日志中,PostgreSQL WAL,也称为预写日志。

数据库触发器。

让我们简要讨论一下使用事务日志(WAL)和触发器来捕获数据更改的优缺点。

触发器

基于触发器的方法涉及在数据库上创建审计触发器,以捕获与插入、更新和删除方法相关的所有事件。

触发器可以附加到表(分区或非分区)或视图。

触发器也可以为TRUNCATE语句触发。如果发生触发器事件,则会在适当的时间调用触发器的函数来处理事件。

这种方法最重要的优点是,与事务日志不同,所有这些都可以在SQL级别完成。

然而,触发器的使用对源数据库的性能有重大影响,因为当对数据进行更改时,这些触发器需要在应用程序数据库上运行。

事务日志

另一方面,对于现代DBMS,事务日志(用于PostgreSQL的WAL)通常用于事务日志记录和复制。

在PostgreSQL中,所有事务(如INSERT、UPDATE和DELETE)都会在客户端收到事务结果之前写入WAL。

  • 这种方法的优点是,它不会以任何方式影响数据库的性能。

  • 它也不需要修改DB表或应用程序。不需要在源数据库中创建其他表。

  • 基于日志的CDC通常被认为是适用于所有可能场景(包括具有极高事务量的系统)的更改数据捕获的最佳方法。

请注意,目前大多数DDL语句(如CREATE、DROP、ALTER)都没有被跟踪。但是,TRUNCATE命令位于逻辑复制流中。

如果您希望在Postgres数据发生更改时逐行流式处理,则需要逻辑解码或Postgres逻辑复制功能。

使用Postgres逻辑解码

逻辑解码是PostgreSQL基于日志的CDC(逻辑复制)的正式名称。

逻辑解码使用PostgreSQL预写日志的内容来存储数据库中发生的所有活动。预写日志是一种内部日志,用于描述存储级别上的数据库更改。

1.使用逻辑解码的第一步是在Postgres配置“postgresql”中设置以下参数。

wal_level = logical
max_replication_slots = 5
max_wal_senders = 10
  • 将wal_level设置为logical允许wal记录逻辑解码所需的信息。

  • 确保您的max\u replication\u slots值等于或高于使用WAL的PostgreSQL连接器的数量加上数据库使用的其他复制插槽的数量。

  • 确保max\u wal\u senders参数(指定wal的最大并发连接数)至少是逻辑复制插槽数的两倍。例如,如果数据库总共使用5个复制插槽,则max\u wal\u senders值必须为10或更大。

重新启动Postgres服务器以应用更改。

2.第二步是使用输出插件test_decoding设置逻辑复制

通过运行以下命令,为要同步的数据库创建逻辑复制槽。

SELECT pg_create_logical_replication_slot('replication_slot', 'test_decoding');

注意:每个复制槽都有一个名称,可以包含小写字母、数字和下划线。

要验证插槽是否已成功创建,请运行以下命令。

SELECT slot_name, plugin, slot_type, database, active, restart_lsn, confirmed_flush_lsn FROM pg_replication_slots;

3.在下一步中,为所有表或仅特定表创建发布。如果指定表,则可以在以后从发布中添加或删除表。

CREATE PUBLICATION pub FOR ALL TABLES;

或者

CREATE PUBLICATION pub FOR TABLE table1, table2, table3;

您可以选择要包含在发布中的操作。例如,以下出版物仅包括表1的插入和更新操作。

CREATE PUBLICATION insert_update_only_pub FOR TABLE table1 WITH (publish = 'INSERT, UPDATE');

4.验证您选择的表格是否在出版物中。

psql-stream=# SELECT * FROM pg_publication_tables WHERE pubname='pub';
Output
pubname | schemaname | tablename
---------+------------+-----------
pub     | public     | table1
pub     | public     | table2
pub     | public     | table3
(3 rows)

从那时起,我们的发布pub跟踪psql流数据库中所有表的更改。

5.让我们创建一个抽象表t,并用一些记录填充它。

create table t (id int, name text);
INSERT INTO t(id, name) SELECT g.id, k.name FROM generate_series(1, 10) as g(id), substr(md5(random()::text), 0, 25) as k(name);

因此,表t中有10条记录。

psql-stream=# SELECT count(*) FROM t;
count
-------
10
(1 row)

6.最后,是时候检查我们的逻辑复制是否有效了。

在PostgreSQL控制台中运行以下命令以查看Postgres WAL条目。

SELECT * FROM pg_logical_slot_get_changes('replication_slot', NULL, NULL);

因此,您会得到如下结果:

    lsn    | xid  |                          data                          
-----------+------+--------------------------------------------------------
 0/19EA2C0 | 1045 | BEGIN 1045
 0/19EA2C0 | 1045 | table public.t: INSERT: id[integer]:1 name[text]:51459cbc211647e7b31c8720
 0/19EA300 | 1045 | table public.t: INSERT: id[integer]:2 name[text]:51459cbc211647e7b31c8720
 0/19EA340 | 1045 | table public.t: INSERT: id[integer]:3 name[text]:51459cbc211647e7b31c8720
 0/19EA380 | 1045 | table public.t: INSERT: id[integer]:4 name[text]:51459cbc211647e7b31c8720
 0/19EA3C0 | 1045 | table public.t: INSERT: id[integer]:5 name[text]:51459cbc211647e7b31c8720
 0/19EA400 | 1045 | table public.t: INSERT: id[integer]:6 name[text]:51459cbc211647e7b31c8720
 0/19EA440 | 1045 | table public.t: INSERT: id[integer]:7 name[text]:51459cbc211647e7b31c8720
 0/19EA480 | 1045 | table public.t: INSERT: id[integer]:8 name[text]:51459cbc211647e7b31c8720
 0/19EA4C0 | 1045 | table public.t: INSERT: id[integer]:9 name[text]:51459cbc211647e7b31c8720
 0/19EA500 | 1045 | table public.t: INSERT: id[integer]:10 name[text]:51459cbc211647e7b31c8720
 0/19EA5B0 | 1045 | COMMIT 1045
(13 rows)

pg_logical_slot_peek_changes是另一个PostgreSQL命令,用于从WAL条目中查看更改,而不使用它们。因此多次调用pg_logical_slot_peek_changes每次返回相同的结果。

另一方面,pg_logical_slot_peek_changes只在第一次返回结果。以下对pg_logical_slot_peek_changes的调用返回空结果集。这意味着当执行get命令时,结果将被提供和删除,这大大增强了我们编写逻辑的能力,以便使用这些事件创建表的副本。

7.记住销毁不再需要阻止其消费的插槽

SELECT pg_drop_replication_slot('replication_slot');

输出插件

我们已经讨论了Postgres 9.4+上可用的test_解码输出插件。虽然它是作为输出插件的示例创建的,但如果您的消费者支持它,它仍然很有用。

PostgreSQL自带了另一个pgoutput插件和test_解码插件。pgoutput从Postgres 10开始提供。一些消费者支持它进行解码(例如Debezium)。

运行以下命令以基于pgoutput创建插件,如上面的步骤2所示。

SELECT * FROM pg_create_logical_replication_slot('replication_slot', 'pgoutput');

以下命令使用与步骤6中描述的类似的数据更改。

psql-stream=# SELECT * FROM pg_logical_slot_peek_binary_changes('replication_slot', null, null, 'proto_version', '1', 'publication_names', 'pub');
    lsn    | xid  |                                           data                                           
-----------+------+------------------------------------------------------------------------------------------
 0/19A15F8 | 1038 | \x4200000000019a1d9000027de20a91a0ea0000040e
 0/19A15F8 | 1038 | \x52000080387075626c69630074006400020169640000000017ffffffff006e616d650000000019ffffffff
 0/19A15F8 | 1038 | \x49000080384e0002740000000234306e
 0/19A1890 | 1038 | \x49000080384e0002740000000234316e
 0/19A1910 | 1038 | \x49000080384e0002740000000234326e
 0/19A1990 | 1038 | \x49000080384e0002740000000234336e
 0/19A1A10 | 1038 | \x49000080384e0002740000000234346e
 0/19A1A90 | 1038 | \x49000080384e0002740000000234356e
 0/19A1B10 | 1038 | \x49000080384e0002740000000234366e
 0/19A1B90 | 1038 | \x49000080384e0002740000000234376e
 0/19A1C10 | 1038 | \x49000080384e0002740000000234386e
 0/19A1C90 | 1038 | \x49000080384e0002740000000234396e
 0/19A1DC0 | 1038 | \x430000000000019a1d9000000000019a1dc000027de20a91a0ea
(13 rows)

在这里,您可以注意到结果是以二进制格式返回的。pgoutput插件生成二进制输出。

wal2json是另一种用于逻辑解码的流行输出插件。

以下是wal2json插件的样例输出

{
      "change":[
         {
            "kind":"insert",
            "schema":"public",
            "table":"t",
            "columnnames":[
               "id",
               "name"
            ],
            "columntypes":[
               "integer",
               "character varying(255)"
            ],
            "columnvalues":[
               1,
               ""
            ]
         }
      ]
   }
   {
      "change":[
         {
            "kind":"update",
            "schema":"public",
            "table":"t",
            "columnnames":[
               "id",
               "name"
            ],
            "columntypes":[
               "integer",
               "character varying(255)"
            ],
            "columnvalues":[
               1,
               "New Value"
            ],
            "oldkeys":{
               "keynames":[
                  "id"
               ],
               "keytypes":[
                  "integer"
               ],
               "keyvalues":[
                  1
               ]
            }
         }
      ]
   }
   {
      "change":[
         {
            "kind":"delete",
            "schema":"public",
            "table":"t",
            "oldkeys":{
               "keynames":[
                  "id"
               ],
               "keytypes":[
                  "integer"
               ],
               "keyvalues":[
                  1
               ]
            }
         }
      ]
   }

关于插槽的重要提示

使用插槽时请记住以下几点:

  • 每个插槽只有一个输出插件(您可以选择哪一个)。

  • 每个插槽仅提供来自一个数据库的更改。

  • 一个数据库可以有多个插槽。

  • 每个数据更改通常在每个插槽中发出一次。

  • 但是,当Postgres实例重新启动时,插槽可能会重新发出更改。消费者必须应对这种情况。

  • 未使用的插槽对Postgres实例的可用性构成威胁。Postgres将为这些未使用的更改保存所有WAL文件。这可能会导致存储溢出。

PostgreSQL WAL消费者

消费者是任何可以接收Postgres逻辑解码流的应用程序。pg_Recvlogic是一个PostgreSQL应用程序,可以管理插槽并使用其中的流。它包含在Postgres发行版中,因此可能已经与PostgreSQL一起安装。

Golang示例代码

下面的Golang代码示例演示了如何开始创建自己的Postgress WAL消费者。它使用PostgreSQL-10。x逻辑复制到源数据库的流数据库更改(解码的WAL消息)。

package main

import (
    "context"
    "fmt"
    "os"
    "os/signal"
    "strings"
    "time"

    "github.com/jackc/pgconn"
    "github.com/jackc/pglogrepl"
    "github.com/jackc/pgproto3/v2"
)

// Note that runtime parameter "replication=database" in connection string is obligatory
// replicaiton slot will not be created if replication=database is omitted

const CONN = "postgres://postgres:postgres@localhost/psql-streamer?replication=database"
const SLOT_NAME = "replication_slot"
const OUTPUT_PLUGIN = "pgoutput"
const INSERT_TEMPLATE = "create table t (id int, name text);"

var Event = struct {
    Relation string
    Columns  []string
}{}

func main() {
    ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt)
    defer cancel()
    conn, err := pgconn.Connect(ctx, CONN)
    if err != nil {
        panic(err)
    }
    defer conn.Close(ctx)

    // 1. Create table
    if _, err := conn.Exec(ctx, INSERT_TEMPLATE).ReadAll(); err != nil {
        fmt.Errorf("failed to create table: %v", err)
    }

    // 2. ensure publication exists
    if _, err := conn.Exec(ctx, "DROP PUBLICATION IF EXISTS pub;").ReadAll(); err != nil {
        fmt.Errorf("failed to drop publication: %v", err)
    }

    if _, err := conn.Exec(ctx, "CREATE PUBLICATION pub FOR ALL TABLES;").ReadAll(); err != nil {
        fmt.Errorf("failed to create publication: %v", err)
    }

    // 3. create temproary replication slot server
    if _, err = pglogrepl.CreateReplicationSlot(ctx, conn, SLOT_NAME, OUTPUT_PLUGIN, pglogrepl.CreateReplicationSlotOptions{Temporary: true}); err != nil {
        fmt.Errorf("failed to create a replication slot: %v", err)
    }

    var msgPointer pglogrepl.LSN
    pluginArguments := []string{"proto_version '1'", "publication_names 'pub'"}

    // 4. establish connection
    err = pglogrepl.StartReplication(ctx, conn, SLOT_NAME, msgPointer, pglogrepl.StartReplicationOptions{PluginArgs: pluginArguments})
    if err != nil {
        fmt.Errorf("failed to establish start replication: %v", err)
    }

    var pingTime time.Time
    for ctx.Err() != context.Canceled {
        if time.Now().After(pingTime) {
            if err = pglogrepl.SendStandbyStatusUpdate(ctx, conn, pglogrepl.StandbyStatusUpdate{WALWritePosition: msgPointer}); err != nil {
                fmt.Errorf("failed to send standby update: %v", err)
            }
            pingTime = time.Now().Add(10 * time.Second)
            //fmt.Println("client: please standby")
        }

        ctx, cancel := context.WithTimeout(ctx, time.Second*10)
        defer cancel()

        msg, err := conn.ReceiveMessage(ctx)
        if pgconn.Timeout(err) {
            continue
        }
        if err != nil {
            fmt.Errorf("something went wrong while listening for message: %v", err)
        }

        switch msg := msg.(type) {
        case *pgproto3.CopyData:
            switch msg.Data[0] {
            case pglogrepl.PrimaryKeepaliveMessageByteID:
            //    fmt.Println("server: confirmed standby")

            case pglogrepl.XLogDataByteID:
                walLog, err := pglogrepl.ParseXLogData(msg.Data[1:])
                if err != nil {
                    fmt.Errorf("failed to parse logical WAL log: %v", err)
                }

                var msg pglogrepl.Message
                if msg, err = pglogrepl.Parse(walLog.WALData); err != nil {
                    fmt.Errorf("failed to parse logical replication message: %v", err)
                }
                switch m := msg.(type) {
                case *pglogrepl.RelationMessage:
                    Event.Columns = []string{}
                    for _, col := range m.Columns {
                        Event.Columns = append(Event.Columns, col.Name)
                    }
                    Event.Relation = m.RelationName
                case *pglogrepl.InsertMessage:
                    var sb strings.Builder
                    sb.WriteString(fmt.Sprintf("INSERT %s(", Event.Relation))
                    for i := 0; i < len(Event.Columns); i++ {
                        sb.WriteString(fmt.Sprintf("%s: %s ", Event.Columns[i], string(m.Tuple.Columns[i].Data)))
                    }
                    sb.WriteString(")")
                    fmt.Println(sb.String())
                case *pglogrepl.UpdateMessage:
                    var sb strings.Builder
                    sb.WriteString(fmt.Sprintf("UPDATE %s(", Event.Relation))
                    for i := 0; i < len(Event.Columns); i++ {
                        sb.WriteString(fmt.Sprintf("%s: %s ", Event.Columns[i], string(m.NewTuple.Columns[i].Data)))
                    }
                    sb.WriteString(")")
                    fmt.Println(sb.String())
                case *pglogrepl.DeleteMessage:
                    var sb strings.Builder
                    sb.WriteString(fmt.Sprintf("DELETE %s(", Event.Relation))
                    for i := 0; i < len(Event.Columns); i++ {
                        sb.WriteString(fmt.Sprintf("%s: %s ", Event.Columns[i], string(m.OldTuple.Columns[i].Data)))
                    }
                    sb.WriteString(")")
                    fmt.Println(sb.String())
                case *pglogrepl.TruncateMessage:
                    fmt.Println("ALL GONE (TRUNCATE)")
                }
            }
        default:
            fmt.Printf("received unexpected message: %T", msg)
        }
    }
}

这段代码只记录传入事件,但在生产环境中,您可以轻松地将它们发送到消息队列或目标数据库。

结论

PostgreSQL中的逻辑解码为其他应用程序组件提供了一种有效的方法,以跟上Postgres数据库中的数据更改。

传统上,使用拉通知模型,其中每个应用程序组件以一定的间隔查询Postgre。逻辑编码使用推送通知模型,Postgres在每次更改发生时立即通知应用程序的其他部分。

数据更改事件现在可以在毫秒内发送给消费者,而无需查询数据库。通过逻辑解码,PostgreSQL数据库成为现代动态实时应用程序的核心部分。

原文标题:How to Set Up and Run PostgreSQL Change Data Capture
原文作者:Dmitry Narizhnykh
原文链接:https://dzone.com/articles/postgresql-change-data-capture

「喜欢这篇文章,您的关注和赞赏是给作者最好的鼓励」
关注作者
【版权声明】本文为墨天轮用户原创内容,转载时必须标注文章的来源(墨天轮),文章链接,文章作者等基本信息,否则作者和墨天轮有权追究责任。如果您发现墨天轮中有涉嫌抄袭或者侵权的内容,欢迎发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论