MogDB 5.0.8在逻辑解码的过程中,解码插件支持解析DDL操作。
1.环境准备
初始化两个cluster
$ gs_initdb -D data_pub --nodename=pub
$ gs_initdb -D data_sub --nodename=sub
修改发布端配置参数
$ vi data_pub/postgresql.conf
文件尾部增加如下参数:
listen_addresses = '*' port=3000 wal_level = logical enable_ddl_logical_record=on max_process_memory = 2GB shared_buffers = 128MB cstore_buffers = 16MB
修改订阅端配置参数
$ vi data_sub/postgresql.conf
文件尾部增加如下参数:
listen_addresses = '*' port=5000 wal_level = logical enable_ddl_logical_record=on max_process_memory = 2GB shared_buffers = 128MB cstore_buffers = 16MB
发布和订阅端pg_hba.conf文件增加如下设置:
host replication all 0/0 sha256
分别启动发布端和订阅端的数据库服务
$ gs_ctl start -D data_pub/
$ gs_ctl start -D data_sub/
发布和订阅端同时创建repuser如下:
CREATE USER repuser REPLICATION SYSADMIN PASSWORD 'XXX';
2.建立发布与订阅关系
gsql进入发布端
$ gsql -r -p 3000 -U mog postgres
创建测试表及发布
CREATE TABLE public.tab1(id int primary key,info varchar(100));
CREATE PUBLICATION mypub FOR TABLE public.tab1;
创建订阅端key文件
$ gs_guc generate -S XXX -D $GAUSSHOME/bin -o subscription
gsql进入订阅端
$ gsql -r -p 5000 -U mog postgres
创建测试表及订阅
CREATE TABLE public.tab1(id int primary key,info varchar(100));
CREATE SUBSCRIPTION mysub
CONNECTION 'host=x.x.x.x port=3001 user=repuser password=XXX dbname=postgres'
PUBLICATION mypub ;
注意:这里订阅端的表结构还是需要手工提前创建,因为默认的解码插件pgoutput还不支持DDL。
目前发布订阅语法并不支持设置解码插件,也就是不支持手工创建复制槽进行指定。
3.DDL解码插件支持
MogDB 5.0.8在逻辑解码过程中,wal2json/mppdb_decoding/test_decoding支持如下DDL操作:
- CREATE/DROP TABLE|TABLE PARTITION
- CREATE/DROP INDEX
- TRUNCATE TABLE
- ALTER TABLE ADD COLUMN [CONSTRAINT]
- ALTER TABLE DROP COLUMN
- ALTER TABLE ALTER COLUMN [TYPE|SET NOT NULL|DROP NOT NULL|SET DEFAULT|DROP DEFAULT]
- ALTER TABLE [DROP|ADD|TRUNCATE] PARTITION
- ALTER TABLE MODIFY COLUMN data_type [ON UPDATE]
- ALTER TABLE MODIFY COLUMN [NOT] NULL
- ALTER TABLE ADD COLUMN [AFTER|FIRST]
4.test_decoding插件测试
SELECT 'init' FROM pg_create_logical_replication_slot('slot1','test_decoding');
CREATE TABLE logical_tab1(col1 boolean[],col2 boolean);
DROP TABLE logical_tab1;
CREATE TABLE range_sales
(
product_id INT4 NOT NULL,
customer_id INT4 PRIMARY KEY,
time_id DATE,
channel_id CHAR(1),
type_id INT4,
quantity_sold NUMERIC(3),
amount_sold NUMERIC(10,2)
)
PARTITION BY RANGE (time_id)
(
PARTITION time_2008 VALUES LESS THAN ('2009-01-01'),
PARTITION time_2009 VALUES LESS THAN ('2010-01-01'),
PARTITION time_2010 VALUES LESS THAN ('2011-01-01'),
PARTITION time_2011 VALUES LESS THAN ('2012-01-01')
);
CREATE INDEX range_sales_idx1 ON range_sales(product_id) LOCAL;
CREATE INDEX range_sales_idx2 ON range_sales(time_id) GLOBAL;
DROP INDEX range_sales_idx1 ;
DROP INDEX range_sales_idx2 ;
DROP TABLE range_sales;
SELECT data FROM pg_logical_slot_get_changes('slot1', NULL, NULL);
SELECT 'stop' FROM pg_drop_replication_slot('slot1');
5.mppdb_decoding插件测试
SELECT 'init' FROM pg_create_logical_replication_slot('slot2','mppdb_decoding');
CREATE TABLE logical_tab1(col1 boolean[],col2 boolean);
DROP TABLE logical_tab1;
CREATE TABLE range_sales
(
product_id INT4 NOT NULL,
customer_id INT4 PRIMARY KEY,
time_id DATE,
channel_id CHAR(1),
type_id INT4,
quantity_sold NUMERIC(3),
amount_sold NUMERIC(10,2)
)
PARTITION BY RANGE (time_id)
(
PARTITION time_2008 VALUES LESS THAN ('2009-01-01'),
PARTITION time_2009 VALUES LESS THAN ('2010-01-01'),
PARTITION time_2010 VALUES LESS THAN ('2011-01-01'),
PARTITION time_2011 VALUES LESS THAN ('2012-01-01')
);
CREATE INDEX range_sales_idx1 ON range_sales(product_id) LOCAL;
CREATE INDEX range_sales_idx2 ON range_sales(time_id) GLOBAL;
DROP INDEX range_sales_idx1 ;
DROP INDEX range_sales_idx2 ;
DROP TABLE range_sales;
SELECT data FROM pg_logical_slot_get_changes('slot2', NULL, NULL);
SELECT 'stop' FROM pg_drop_replication_slot('slot2');
6.wal2json插件测试
wal2json插件只支持format-version1,不支持format-version2。
SELECT 'init' FROM pg_create_logical_replication_slot('slot3','wal2json');
CREATE TABLE logical_tab1(col1 boolean[],col2 boolean);
DROP TABLE logical_tab1;
CREATE TABLE range_sales
(
product_id INT4 NOT NULL,
customer_id INT4 PRIMARY KEY,
time_id DATE,
channel_id CHAR(1),
type_id INT4,
quantity_sold NUMERIC(3),
amount_sold NUMERIC(10,2)
)
PARTITION BY RANGE (time_id)
(
PARTITION time_2008 VALUES LESS THAN ('2009-01-01'),
PARTITION time_2009 VALUES LESS THAN ('2010-01-01'),
PARTITION time_2010 VALUES LESS THAN ('2011-01-01'),
PARTITION time_2011 VALUES LESS THAN ('2012-01-01')
);
CREATE INDEX range_sales_idx1 ON range_sales(product_id) LOCAL;
CREATE INDEX range_sales_idx2 ON range_sales(time_id) GLOBAL;
DROP INDEX range_sales_idx1 ;
DROP INDEX range_sales_idx2 ;
DROP TABLE range_sales;
SELECT data FROM pg_logical_slot_get_changes('slot3', NULL, NULL, 'format-version', '1');
SELECT 'stop' FROM pg_drop_replication_slot('slot3');
最后修改时间:2024-08-09 09:26:09
「喜欢这篇文章,您的关注和赞赏是给作者最好的鼓励」
关注作者
【版权声明】本文为墨天轮用户原创内容,转载时必须标注文章的来源(墨天轮),文章链接,文章作者等基本信息,否则作者和墨天轮有权追究责任。如果您发现墨天轮中有涉嫌抄袭或者侵权的内容,欢迎发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。




