暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

MogDB 5.0.8 逻辑解码支持DDL操作

原创 彭冲 2024-08-04
244

MogDB 5.0.8在逻辑解码的过程中,解码插件支持解析DDL操作。

1.环境准备

初始化两个cluster

$ gs_initdb -D data_pub --nodename=pub $ gs_initdb -D data_sub --nodename=sub

修改发布端配置参数

$ vi data_pub/postgresql.conf

文件尾部增加如下参数:

listen_addresses = '*' port=3000 wal_level = logical enable_ddl_logical_record=on max_process_memory = 2GB shared_buffers = 128MB cstore_buffers = 16MB

修改订阅端配置参数

$ vi data_sub/postgresql.conf

文件尾部增加如下参数:

listen_addresses = '*' port=5000 wal_level = logical enable_ddl_logical_record=on max_process_memory = 2GB shared_buffers = 128MB cstore_buffers = 16MB

发布和订阅端pg_hba.conf文件增加如下设置:

host  replication  all  0/0  sha256

分别启动发布端和订阅端的数据库服务

$ gs_ctl start -D data_pub/ $ gs_ctl start -D data_sub/

发布和订阅端同时创建repuser如下:

CREATE USER repuser REPLICATION SYSADMIN PASSWORD 'XXX';

2.建立发布与订阅关系

gsql进入发布端

$ gsql -r -p 3000 -U mog postgres

创建测试表及发布

CREATE TABLE public.tab1(id int primary key,info varchar(100)); CREATE PUBLICATION mypub FOR TABLE public.tab1;

创建订阅端key文件

$ gs_guc generate -S XXX -D $GAUSSHOME/bin -o subscription

gsql进入订阅端

$ gsql -r -p 5000 -U mog postgres

创建测试表及订阅

CREATE TABLE public.tab1(id int primary key,info varchar(100)); CREATE SUBSCRIPTION mysub CONNECTION 'host=x.x.x.x port=3001 user=repuser password=XXX dbname=postgres' PUBLICATION mypub ;

注意:这里订阅端的表结构还是需要手工提前创建,因为默认的解码插件pgoutput还不支持DDL。

目前发布订阅语法并不支持设置解码插件,也就是不支持手工创建复制槽进行指定。

3.DDL解码插件支持

MogDB 5.0.8在逻辑解码过程中,wal2json/mppdb_decoding/test_decoding支持如下DDL操作:

  • CREATE/DROP TABLE|TABLE PARTITION
  • CREATE/DROP INDEX
  • TRUNCATE TABLE
  • ALTER TABLE ADD COLUMN [CONSTRAINT]
  • ALTER TABLE DROP COLUMN
  • ALTER TABLE ALTER COLUMN [TYPE|SET NOT NULL|DROP NOT NULL|SET DEFAULT|DROP DEFAULT]
  • ALTER TABLE [DROP|ADD|TRUNCATE] PARTITION
  • ALTER TABLE MODIFY COLUMN data_type [ON UPDATE]
  • ALTER TABLE MODIFY COLUMN [NOT] NULL
  • ALTER TABLE ADD COLUMN [AFTER|FIRST]

4.test_decoding插件测试

SELECT 'init' FROM pg_create_logical_replication_slot('slot1','test_decoding'); CREATE TABLE logical_tab1(col1 boolean[],col2 boolean); DROP TABLE logical_tab1; CREATE TABLE range_sales ( product_id INT4 NOT NULL, customer_id INT4 PRIMARY KEY, time_id DATE, channel_id CHAR(1), type_id INT4, quantity_sold NUMERIC(3), amount_sold NUMERIC(10,2) ) PARTITION BY RANGE (time_id) ( PARTITION time_2008 VALUES LESS THAN ('2009-01-01'), PARTITION time_2009 VALUES LESS THAN ('2010-01-01'), PARTITION time_2010 VALUES LESS THAN ('2011-01-01'), PARTITION time_2011 VALUES LESS THAN ('2012-01-01') ); CREATE INDEX range_sales_idx1 ON range_sales(product_id) LOCAL; CREATE INDEX range_sales_idx2 ON range_sales(time_id) GLOBAL; DROP INDEX range_sales_idx1 ; DROP INDEX range_sales_idx2 ; DROP TABLE range_sales; SELECT data FROM pg_logical_slot_get_changes('slot1', NULL, NULL); SELECT 'stop' FROM pg_drop_replication_slot('slot1');

5.mppdb_decoding插件测试

SELECT 'init' FROM pg_create_logical_replication_slot('slot2','mppdb_decoding'); CREATE TABLE logical_tab1(col1 boolean[],col2 boolean); DROP TABLE logical_tab1; CREATE TABLE range_sales ( product_id INT4 NOT NULL, customer_id INT4 PRIMARY KEY, time_id DATE, channel_id CHAR(1), type_id INT4, quantity_sold NUMERIC(3), amount_sold NUMERIC(10,2) ) PARTITION BY RANGE (time_id) ( PARTITION time_2008 VALUES LESS THAN ('2009-01-01'), PARTITION time_2009 VALUES LESS THAN ('2010-01-01'), PARTITION time_2010 VALUES LESS THAN ('2011-01-01'), PARTITION time_2011 VALUES LESS THAN ('2012-01-01') ); CREATE INDEX range_sales_idx1 ON range_sales(product_id) LOCAL; CREATE INDEX range_sales_idx2 ON range_sales(time_id) GLOBAL; DROP INDEX range_sales_idx1 ; DROP INDEX range_sales_idx2 ; DROP TABLE range_sales; SELECT data FROM pg_logical_slot_get_changes('slot2', NULL, NULL); SELECT 'stop' FROM pg_drop_replication_slot('slot2');

6.wal2json插件测试

wal2json插件只支持format-version1,不支持format-version2。

SELECT 'init' FROM pg_create_logical_replication_slot('slot3','wal2json'); CREATE TABLE logical_tab1(col1 boolean[],col2 boolean); DROP TABLE logical_tab1; CREATE TABLE range_sales ( product_id INT4 NOT NULL, customer_id INT4 PRIMARY KEY, time_id DATE, channel_id CHAR(1), type_id INT4, quantity_sold NUMERIC(3), amount_sold NUMERIC(10,2) ) PARTITION BY RANGE (time_id) ( PARTITION time_2008 VALUES LESS THAN ('2009-01-01'), PARTITION time_2009 VALUES LESS THAN ('2010-01-01'), PARTITION time_2010 VALUES LESS THAN ('2011-01-01'), PARTITION time_2011 VALUES LESS THAN ('2012-01-01') ); CREATE INDEX range_sales_idx1 ON range_sales(product_id) LOCAL; CREATE INDEX range_sales_idx2 ON range_sales(time_id) GLOBAL; DROP INDEX range_sales_idx1 ; DROP INDEX range_sales_idx2 ; DROP TABLE range_sales; SELECT data FROM pg_logical_slot_get_changes('slot3', NULL, NULL, 'format-version', '1'); SELECT 'stop' FROM pg_drop_replication_slot('slot3');
最后修改时间:2024-08-09 09:26:09
「喜欢这篇文章,您的关注和赞赏是给作者最好的鼓励」
关注作者
【版权声明】本文为墨天轮用户原创内容,转载时必须标注文章的来源(墨天轮),文章链接,文章作者等基本信息,否则作者和墨天轮有权追究责任。如果您发现墨天轮中有涉嫌抄袭或者侵权的内容,欢迎发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论