暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

PostgreSQL 内置Logical Replication output plugin pgoutput

digoal 2019-09-19
4059

作者

digoal

日期

2019-09-19

标签

PostgreSQL , pgoutput


背景

2017年内置了logical 复制的wal 解析plugin.

30 PG_MODULE_MAGIC; 31 32 extern void _PG_output_plugin_init(OutputPluginCallbacks *cb); 33 34 static void pgoutput_startup(LogicalDecodingContext *ctx, 35 OutputPluginOptions *opt, bool is_init); 36 static void pgoutput_shutdown(LogicalDecodingContext *ctx); 37 static void pgoutput_begin_txn(LogicalDecodingContext *ctx, 38 ReorderBufferTXN *txn); 39 static void pgoutput_commit_txn(LogicalDecodingContext *ctx, 40 ReorderBufferTXN *txn, XLogRecPtr commit_lsn); 41 static void pgoutput_change(LogicalDecodingContext *ctx, 42 ReorderBufferTXN *txn, Relation rel, 43 ReorderBufferChange *change); 44 static void pgoutput_truncate(LogicalDecodingContext *ctx, 45 ReorderBufferTXN *txn, int nrelations, Relation relations[], 46 ReorderBufferChange *change); 47 static bool pgoutput_origin_filter(LogicalDecodingContext *ctx, 48 RepOriginId origin_id); 49 50 static bool publications_valid; 51 52 static List *LoadPublications(List *pubnames); 53 static void publication_invalidation_cb(Datum arg, int cacheid, 54 uint32 hashvalue);

例子

1、使用pgoutput

```
select pg_create_logical_replication_slot('out','pgoutput');

create table aa(id int primary key, info text);

do language plpgsql $$
declare
begin
for i in 1..100000 loop
insert into aa values (i, 'test'||i);
commit;
end loop;
end;
$$;

postgres=# \set VERBOSITY verbose
postgres=# select * from pg_logical_slot_get_changes('out', pg_current_wal_lsn(), 10);
psql: ERROR: 0A000: client sent proto_version=0 but we only support protocol 1 or higher
CONTEXT: slot "out", output plugin "pgoutput", in the startup callback
LOCATION: pgoutput_startup, pgoutput.c:188
```

报错原因:

/* 149 * Initialize this plugin 150 */ 151 static void 152 pgoutput_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt, 153 bool is_init) 154 { 155 PGOutputData *data = palloc0(sizeof(PGOutputData)); 156 157 /* Create our memory context for private allocations. */ 158 data->context = AllocSetContextCreate(ctx->context, 159 "logical replication output context", 160 ALLOCSET_DEFAULT_SIZES); 161 162 ctx->output_plugin_private = data; 163 164 /* This plugin uses binary protocol. */ 165 opt->output_type = OUTPUT_PLUGIN_BINARY_OUTPUT; 166 167 /* 168 * This is replication start and not slot initialization. 169 * 170 * Parse and validate options passed by the client. 171 */ 172 if (!is_init) 173 { 174 /* Parse the params and ERROR if we see any we don't recognize */ 175 parse_output_parameters(ctx->output_plugin_options, 176 &data->protocol_version, 177 &data->publication_names); 178 179 /* Check if we support requested protocol */ 180 if (data->protocol_version > LOGICALREP_PROTO_VERSION_NUM) 181 ereport(ERROR, 182 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), 183 errmsg("client sent proto_version=%d but we only support protocol %d or lower", 184 data->protocol_version, LOGICALREP_PROTO_VERSION_NUM))); 185 186 if (data->protocol_version < LOGICALREP_PROTO_MIN_VERSION_NUM) 187 ereport(ERROR, 188 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), 189 errmsg("client sent proto_version=%d but we only support protocol %d or higher", 190 data->protocol_version, LOGICALREP_PROTO_MIN_VERSION_NUM))); 191 192 if (list_length(data->publication_names) < 1) 193 ereport(ERROR, 194 (errcode(ERRCODE_INVALID_PARAMETER_VALUE), 195 errmsg("publication_names parameter missing"))); 196 197 /* Init publication state. */ 198 data->publications = NIL; 199 publications_valid = false; 200 CacheRegisterSyscacheCallback(PUBLICATIONOID, 201 publication_invalidation_cb, 202 (Datum) 0); 203 204 /* Initialize relation schema cache. */ 205 init_rel_sync_cache(CacheMemoryContext); 206 } 207 }

2、使用test_decoding

```
postgres=# select pg_create_logical_replication_slot('test','test_decoding');
pg_create_logical_replication_slot


(test,265/E1481948)
(1 row)

postgres=# do language plpgsql $$
declare
begin
for i in 100001..200000 loop
insert into aa values (i, 'test'||i);
commit;
end loop;
end;
$$;
DO
postgres=# select * from pg_logical_slot_get_changes('test', pg_current_wal_lsn(), 10);
lsn | xid | data
--------------+-----------+---------------------------------------------------------------------
265/E1481948 | 306864514 | BEGIN 306864514
265/E1481948 | 306864514 | table public.aa: INSERT: id[integer]:100001 info[text]:'test100001'
265/E1481E18 | 306864514 | COMMIT 306864514
265/E1481E18 | 306864515 | BEGIN 306864515
265/E1481E18 | 306864515 | table public.aa: INSERT: id[integer]:100002 info[text]:'test100002'
265/E1481ED0 | 306864515 | COMMIT 306864515
265/E1481ED0 | 306864516 | BEGIN 306864516
265/E1481ED0 | 306864516 | table public.aa: INSERT: id[integer]:100003 info[text]:'test100003'
265/E1481F88 | 306864516 | COMMIT 306864516
265/E1481F88 | 306864517 | BEGIN 306864517
265/E1481F88 | 306864517 | table public.aa: INSERT: id[integer]:100004 info[text]:'test100004'
265/E1482058 | 306864517 | COMMIT 306864517
(12 rows)

postgres=# select * from pg_logical_slot_get_changes('test', pg_current_wal_lsn(), 10);
lsn | xid | data
--------------+-----------+---------------------------------------------------------------------
265/E1482058 | 306864518 | BEGIN 306864518
265/E1482058 | 306864518 | table public.aa: INSERT: id[integer]:100005 info[text]:'test100005'
265/E1482110 | 306864518 | COMMIT 306864518
265/E1482110 | 306864519 | BEGIN 306864519
265/E1482110 | 306864519 | table public.aa: INSERT: id[integer]:100006 info[text]:'test100006'
265/E14821C8 | 306864519 | COMMIT 306864519
265/E14821C8 | 306864520 | BEGIN 306864520
265/E14821C8 | 306864520 | table public.aa: INSERT: id[integer]:100007 info[text]:'test100007'
265/E1482280 | 306864520 | COMMIT 306864520
265/E1482280 | 306864521 | BEGIN 306864521
265/E1482280 | 306864521 | table public.aa: INSERT: id[integer]:100008 info[text]:'test100008'
265/E1482338 | 306864521 | COMMIT 306864521
(12 rows)
```

pgoutput也被用于PG内置的逻辑复制:PUBLICATION SUBSCRIPTION

``` postgres=# \h create publication Command: CREATE PUBLICATION Description: define a new publication Syntax: CREATE PUBLICATION name [ FOR TABLE [ ONLY ] table_name [ * ] [, ...] | FOR ALL TABLES ] [ WITH ( publication_parameter [= value] [, ... ] ) ]

URL: https://www.postgresql.org/docs/12/sql-createpublication.html

postgres=# \h create subscription Command: CREATE SUBSCRIPTION Description: define a new subscription Syntax: CREATE SUBSCRIPTION subscription_name CONNECTION 'conninfo' PUBLICATION publication_name [, ...] [ WITH ( subscription_parameter [= value] [, ... ] ) ]

URL: https://www.postgresql.org/docs/12/sql-createsubscription.html ```

https://git.postgresql.org/gitweb/?p=postgresql.git;a=commit;h=039eb6e92f20499ac36cc74f8a5cef7430b706f6

``` Logical replication support for TRUNCATE

Update the built-in logical replication system to make use of the previously added logical decoding for TRUNCATE support. Add the required truncate callback to pgoutput and a new logical replication protocol message.

Publications get a new attribute to determine whether to replicate truncate actions. When updating a publication via pg_dump from an older version, this is not set, thus preserving the previous behavior.

Author: Simon Riggs simon@2ndquadrant.com Author: Marco Nenciarini marco.nenciarini@2ndquadrant.it Author: Peter Eisentraut peter.eisentraut@2ndquadrant.com Reviewed-by: Petr Jelinek petr.jelinek@2ndquadrant.com Reviewed-by: Andres Freund andres@anarazel.de Reviewed-by: Alvaro Herrera alvherre@alvh.no-ip.org ```

参考

src/backend/replication/pgoutput/pgoutput.c

https://git.postgresql.org/gitweb/?p=postgresql.git;a=blob;f=src/backend/replication/pgoutput/pgoutput.c;h=9c08757fcaf264f663de098c4b85563b45e92a5c;hb=48770492c3b796b251112fa9b74534f087c9f471

https://www.postgresql.org/docs/devel/protocol-replication.html

https://www.postgresql.org/docs/devel/test-decoding.html

https://git.postgresql.org/gitweb/?p=postgresql.git;a=commit;h=039eb6e92f20499ac36cc74f8a5cef7430b706f6

PostgreSQL 许愿链接

您的愿望将传达给PG kernel hacker、数据库厂商等, 帮助提高数据库产品质量和功能, 说不定下一个PG版本就有您提出的功能点. 针对非常好的提议,奖励限量版PG文化衫、纪念品、贴纸、PG热门书籍等,奖品丰富,快来许愿。开不开森.

9.9元购买3个月阿里云RDS PostgreSQL实例

PostgreSQL 解决方案集合

德哥 / digoal's github - 公益是一辈子的事.

digoal's wechat

文章转载自digoal,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论