作者
digoal
日期
2019-09-19
标签
PostgreSQL , pgoutput
背景
2017年内置了logical 复制的wal 解析plugin.
30 PG_MODULE_MAGIC;
31
32 extern void _PG_output_plugin_init(OutputPluginCallbacks *cb);
33
34 static void pgoutput_startup(LogicalDecodingContext *ctx,
35 OutputPluginOptions *opt, bool is_init);
36 static void pgoutput_shutdown(LogicalDecodingContext *ctx);
37 static void pgoutput_begin_txn(LogicalDecodingContext *ctx,
38 ReorderBufferTXN *txn);
39 static void pgoutput_commit_txn(LogicalDecodingContext *ctx,
40 ReorderBufferTXN *txn, XLogRecPtr commit_lsn);
41 static void pgoutput_change(LogicalDecodingContext *ctx,
42 ReorderBufferTXN *txn, Relation rel,
43 ReorderBufferChange *change);
44 static void pgoutput_truncate(LogicalDecodingContext *ctx,
45 ReorderBufferTXN *txn, int nrelations, Relation relations[],
46 ReorderBufferChange *change);
47 static bool pgoutput_origin_filter(LogicalDecodingContext *ctx,
48 RepOriginId origin_id);
49
50 static bool publications_valid;
51
52 static List *LoadPublications(List *pubnames);
53 static void publication_invalidation_cb(Datum arg, int cacheid,
54 uint32 hashvalue);
例子
1、使用pgoutput
```
select pg_create_logical_replication_slot('out','pgoutput');
create table aa(id int primary key, info text);
do language plpgsql $$
declare
begin
for i in 1..100000 loop
insert into aa values (i, 'test'||i);
commit;
end loop;
end;
$$;
postgres=# \set VERBOSITY verbose
postgres=# select * from pg_logical_slot_get_changes('out', pg_current_wal_lsn(), 10);
psql: ERROR: 0A000: client sent proto_version=0 but we only support protocol 1 or higher
CONTEXT: slot "out", output plugin "pgoutput", in the startup callback
LOCATION: pgoutput_startup, pgoutput.c:188
```
报错原因:
/*
149 * Initialize this plugin
150 */
151 static void
152 pgoutput_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
153 bool is_init)
154 {
155 PGOutputData *data = palloc0(sizeof(PGOutputData));
156
157 /* Create our memory context for private allocations. */
158 data->context = AllocSetContextCreate(ctx->context,
159 "logical replication output context",
160 ALLOCSET_DEFAULT_SIZES);
161
162 ctx->output_plugin_private = data;
163
164 /* This plugin uses binary protocol. */
165 opt->output_type = OUTPUT_PLUGIN_BINARY_OUTPUT;
166
167 /*
168 * This is replication start and not slot initialization.
169 *
170 * Parse and validate options passed by the client.
171 */
172 if (!is_init)
173 {
174 /* Parse the params and ERROR if we see any we don't recognize */
175 parse_output_parameters(ctx->output_plugin_options,
176 &data->protocol_version,
177 &data->publication_names);
178
179 /* Check if we support requested protocol */
180 if (data->protocol_version > LOGICALREP_PROTO_VERSION_NUM)
181 ereport(ERROR,
182 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
183 errmsg("client sent proto_version=%d but we only support protocol %d or lower",
184 data->protocol_version, LOGICALREP_PROTO_VERSION_NUM)));
185
186 if (data->protocol_version < LOGICALREP_PROTO_MIN_VERSION_NUM)
187 ereport(ERROR,
188 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
189 errmsg("client sent proto_version=%d but we only support protocol %d or higher",
190 data->protocol_version, LOGICALREP_PROTO_MIN_VERSION_NUM)));
191
192 if (list_length(data->publication_names) < 1)
193 ereport(ERROR,
194 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
195 errmsg("publication_names parameter missing")));
196
197 /* Init publication state. */
198 data->publications = NIL;
199 publications_valid = false;
200 CacheRegisterSyscacheCallback(PUBLICATIONOID,
201 publication_invalidation_cb,
202 (Datum) 0);
203
204 /* Initialize relation schema cache. */
205 init_rel_sync_cache(CacheMemoryContext);
206 }
207 }
2、使用test_decoding
```
postgres=# select pg_create_logical_replication_slot('test','test_decoding');
pg_create_logical_replication_slot
(test,265/E1481948)
(1 row)
postgres=# do language plpgsql $$
declare
begin
for i in 100001..200000 loop
insert into aa values (i, 'test'||i);
commit;
end loop;
end;
$$;
DO
postgres=# select * from pg_logical_slot_get_changes('test', pg_current_wal_lsn(), 10);
lsn | xid | data
--------------+-----------+---------------------------------------------------------------------
265/E1481948 | 306864514 | BEGIN 306864514
265/E1481948 | 306864514 | table public.aa: INSERT: id[integer]:100001 info[text]:'test100001'
265/E1481E18 | 306864514 | COMMIT 306864514
265/E1481E18 | 306864515 | BEGIN 306864515
265/E1481E18 | 306864515 | table public.aa: INSERT: id[integer]:100002 info[text]:'test100002'
265/E1481ED0 | 306864515 | COMMIT 306864515
265/E1481ED0 | 306864516 | BEGIN 306864516
265/E1481ED0 | 306864516 | table public.aa: INSERT: id[integer]:100003 info[text]:'test100003'
265/E1481F88 | 306864516 | COMMIT 306864516
265/E1481F88 | 306864517 | BEGIN 306864517
265/E1481F88 | 306864517 | table public.aa: INSERT: id[integer]:100004 info[text]:'test100004'
265/E1482058 | 306864517 | COMMIT 306864517
(12 rows)
postgres=# select * from pg_logical_slot_get_changes('test', pg_current_wal_lsn(), 10);
lsn | xid | data
--------------+-----------+---------------------------------------------------------------------
265/E1482058 | 306864518 | BEGIN 306864518
265/E1482058 | 306864518 | table public.aa: INSERT: id[integer]:100005 info[text]:'test100005'
265/E1482110 | 306864518 | COMMIT 306864518
265/E1482110 | 306864519 | BEGIN 306864519
265/E1482110 | 306864519 | table public.aa: INSERT: id[integer]:100006 info[text]:'test100006'
265/E14821C8 | 306864519 | COMMIT 306864519
265/E14821C8 | 306864520 | BEGIN 306864520
265/E14821C8 | 306864520 | table public.aa: INSERT: id[integer]:100007 info[text]:'test100007'
265/E1482280 | 306864520 | COMMIT 306864520
265/E1482280 | 306864521 | BEGIN 306864521
265/E1482280 | 306864521 | table public.aa: INSERT: id[integer]:100008 info[text]:'test100008'
265/E1482338 | 306864521 | COMMIT 306864521
(12 rows)
```
pgoutput也被用于PG内置的逻辑复制:PUBLICATION SUBSCRIPTION
``` postgres=# \h create publication Command: CREATE PUBLICATION Description: define a new publication Syntax: CREATE PUBLICATION name [ FOR TABLE [ ONLY ] table_name [ * ] [, ...] | FOR ALL TABLES ] [ WITH ( publication_parameter [= value] [, ... ] ) ]
URL: https://www.postgresql.org/docs/12/sql-createpublication.html
postgres=# \h create subscription Command: CREATE SUBSCRIPTION Description: define a new subscription Syntax: CREATE SUBSCRIPTION subscription_name CONNECTION 'conninfo' PUBLICATION publication_name [, ...] [ WITH ( subscription_parameter [= value] [, ... ] ) ]
URL: https://www.postgresql.org/docs/12/sql-createsubscription.html ```
https://git.postgresql.org/gitweb/?p=postgresql.git;a=commit;h=039eb6e92f20499ac36cc74f8a5cef7430b706f6
``` Logical replication support for TRUNCATE
Update the built-in logical replication system to make use of the previously added logical decoding for TRUNCATE support. Add the required truncate callback to pgoutput and a new logical replication protocol message.
Publications get a new attribute to determine whether to replicate truncate actions. When updating a publication via pg_dump from an older version, this is not set, thus preserving the previous behavior.
Author: Simon Riggs simon@2ndquadrant.com Author: Marco Nenciarini marco.nenciarini@2ndquadrant.it Author: Peter Eisentraut peter.eisentraut@2ndquadrant.com Reviewed-by: Petr Jelinek petr.jelinek@2ndquadrant.com Reviewed-by: Andres Freund andres@anarazel.de Reviewed-by: Alvaro Herrera alvherre@alvh.no-ip.org ```
参考
src/backend/replication/pgoutput/pgoutput.c
https://git.postgresql.org/gitweb/?p=postgresql.git;a=blob;f=src/backend/replication/pgoutput/pgoutput.c;h=9c08757fcaf264f663de098c4b85563b45e92a5c;hb=48770492c3b796b251112fa9b74534f087c9f471
https://www.postgresql.org/docs/devel/protocol-replication.html
https://www.postgresql.org/docs/devel/test-decoding.html
https://git.postgresql.org/gitweb/?p=postgresql.git;a=commit;h=039eb6e92f20499ac36cc74f8a5cef7430b706f6
PostgreSQL 许愿链接
您的愿望将传达给PG kernel hacker、数据库厂商等, 帮助提高数据库产品质量和功能, 说不定下一个PG版本就有您提出的功能点. 针对非常好的提议,奖励限量版PG文化衫、纪念品、贴纸、PG热门书籍等,奖品丰富,快来许愿。开不开森.
9.9元购买3个月阿里云RDS PostgreSQL实例
PostgreSQL 解决方案集合
德哥 / digoal's github - 公益是一辈子的事.





