关键字:
电科金仓、KingbaseFlySync、KFS、同步程序、replicator、数据同步、数据解析、增量同步、实时同步、值过滤、过滤DML
1.功能介绍
实时同步支持基于字段值对过滤DML进行过滤。
1、本过滤器和数据类型相关,根据操作对象、字段进行匹配并且根据字段类型、字段值及判定条件进行处理;
2、适用数据库字段类型:数值类型、字符类型、时间类型;
数值类型:
MySQL:INT、TINYINT、SMALLINT、MEDIUMINT、BIGINT、FLOAT、DOUBLE、DECIMAL
Oracle:INT、SMALLINT、FLOAT、REAL、DOUBLE、NUMBER、NUMERIC、DECIMAL
KingbaseES/PG:INTEGER、TINYINT、SMALLINT、BIGINT、NUMERIC、REAL、FLOAT、DOUBLE
MSSQL:INT、TINYINT、SMALLINT、BIGINT、REAL、FLOAT、NUMERIC、DECIMAL
字符类型:
MySQL:CHAR、VARCHAR
Oracle:CHAR、NCHAR、VARCHAR2、NVARCHAR2
KingbaseES/PG:CHAR、VARCHAR
MSSQL:CHAR、NCHAR、VARCHAR2、NVARCHAR
时间类型:
MySQL:DATE、TIMESTAMP
Oracle:DATE、TIME、DATETIME、TIMESTAMP
KingbaseES/PG:DATE、TIME、DATETIME、TIMESTAMP
MSSQL:SMALLDATETIME、DATETIME、DATETIME2、DATE、TIME
3、支持的判定规则:
1)数值类型:大于(>)、大于等于(>=)、等于(==)、小于(<)、小于等于(<=);
2)字符类型:相等(equals)、忽略大小写相等(equalsIgnoreCase)、字符串是否以指定的前缀开始(startsWith)、字符串是否以指定的后缀结束(endsWith)、字符串是否包含指定的内容(contains);
3)时间类型:与数值类型判断规则一致,如:在某个时间前(<)、在某个时间后(>)
2.使用场景
KingbaseFlySync在实时同步过程中默认不会对同步的数据进行处理,源端写入的数据是什么样的,就会同步什么样的数据。在特定场景下,用户希望对实时同步中的增量数据按照指定的值进行过滤。
3.配置使用方式介绍
3.1.配置方式
需要在源端同步服务或者是目标端同步服务配置文件flysync.ini添加对应的过滤参数并指定值过滤规则文件,具体配置参数如下所示
在源端同步服务(master)配置文件flysync.ini中添加如下参数:
svc-extractor-filters = skipeventbyvalue
property=replicator.filter.skipeventbyvalue.definitionsFile=${replicator.home.dir}/support/filters-config/skipeventbyvalue.json
或
在目标端同步服务(salve)配置文件flysync.ini中添加如下参数:
svc-remote-filters = skipeventbyvalue
property=replicator.filter.skipeventbyvalue.definitionsFile=${replicator.home.dir}/support/filters-config/skipeventbyvalue.json
注意:KFS源端目标端同步服务均支持配置该参数,可根据需要选配。
3.2.指定值过滤规则文件配置方法
指定值过滤规则文件skipeventbyvalue.json的配置示例如下:
1、场景一:数值类型
{
/* 等效于 WHERE AGE > 18 */
"SCHEMA_NAME_1.TABLE_NAME_1": {
"condition": {
"column": "AGE",
"function": ">",
"value": 18,
"operator": "",
"conditions": []
}
},
/* 等效于 WHERE AGE >= 18 */
"SCHEMA_NAME_1.TABLE_NAME_2": {
"condition": {
"column": "AGE",
"function": ">=",
"value": 18,
"operator": "",
"conditions": []
}
},
/* 等效于 WHERE AGE = 18 */
"SCHEMA_NAME_1.TABLE_NAME_3": {
"condition": {
"column": "AGE",
"function": "==",
"value": 18,
"operator": "",
"conditions": []
}
},
/* 等效于 WHERE AGE <= 18 */
"SCHEMA_NAME_1.TABLE_NAME_4": {
"condition": {
"column": "AGE",
"function": "<=",
"value": 18,
"operator": "",
"conditions": []
}
},
/* 等效于 WHERE AGE < 18 */
"SCHEMA_NAME_1.TABLE_NAME_5": {
"condition": {
"column": "AGE",
"function": "<",
"value": 18,
"operator": "",
"conditions": []
}
},
/* 等效于 WHERE AGE <> 18 */
"SCHEMA_NAME_1.TABLE_NAME_5": {
"condition": {
"column": "AGE",
"function": "<>",
"value": 18,
"operator": "",
"conditions": []
}
}
}
2、场景二:字符类型
{
/* 等效于 WHERE SEX = '女' */
"SCHEMA_NAME_1.TABLE_NAME_1": {
"condition": {
"column": "SEX",
"function": "equals",
"value": "女",
"operator": "",
"conditions": []
}
},
/* 等效于 WHERE upper(SEX) = upper('女') */
"SCHEMA_NAME_1.TABLE_NAME_2": {
"condition": {
"column": "SEX",
"function": "equalsIgnoreCase",
"value": "女",
"operator": "",
"conditions": []
}
},
/* 等效于 WHERE SEX LIKE '女%' */
"SCHEMA_NAME_1.TABLE_NAME_3": {
"condition": {
"column": "SEX",
"function": "startsWith",
"value": "女",
"operator": "",
"conditions": []
}
},
/* 等效于 WHERE SEX LIKE '%女' */
"SCHEMA_NAME_1.TABLE_NAME_3": {
"condition": {
"column": "SEX",
"function": "endsWith",
"value": "女",
"operator": "",
"conditions": []
}
},
/* 等效于 WHERE SEX LIKE '%女%' */
"SCHEMA_NAME_1.TABLE_NAME_4": {
"condition": {
"column": "SEX",
"function": "contains",
"value": "女",
"operator": "",
"conditions": []
}
}
}
3、场景三:时间类型
{
/* 等效于 WHERE BIRTH > '2008-08-08' */
"SCHEMA_NAME_1.TABLE_NAME_1": {
"condition": {
"column": "BIRTH",
"function": ">",
"value": "2008-08-08"
"operator": ""
"conditions": []
}
},
/* 等效于 WHERE BIRTH < '2008-08-08' */
"SCHEMA_NAME_1.TABLE_NAME_1": {
"condition": {
"column": "BIRTH",
"function": "<",
"value": "2008-08-08"
"operator": ""
"conditions": []
}
},
/* 等效于 WHERE BIRTH > '2008-08-08' AND BIRTH < '2008-09-08' */
"SCHEMA_NAME_1.TABLE_NAME_1": {
"condition": {
"column": "",
"function": "",
"value": ""
"operator": ""
"conditions": [ {
"column": "BIRTH",
"function": ">",
"value": "2008-08-08"
"operator": ""
"conditions": []
},
{
"column": "BIRTH",
"function": "<",
"value": "2008-09-08"
"operator": "AND"
"conditions": []
} ]
}
}
}
4、场景四:单个字段多条件组合方式
{
/* 等效于 WHERE AGE > 18 AND AGE < 35 */
"SCHEMA_NAME_1.TABLE_NAME_1": {
"condition": {
"column": "",
"function": "",
"value": ""
"operator": ""
"conditions": [ {
"column": "AGE",
"function": ">",
"value": 18
"operator": "",
"conditions": []
},
{
"column": "AGE",
"function": "<",
"value": 35
"operator": "AND",
"conditions": []
} ]
}
}
}
5、场景五:多字段单条件组合方式
{
/* 等效于 WHERE AGE > 18 AND SEX = '女' */
"SCHEMA_NAME_1.TABLE_NAME_1": {
"condition": {
"column": "",
"function": "",
"value": ""
"operator": ""
"conditions": [ {
"column": "AGE",
"function": ">",
"value": 18
"operator": "",
"conditions": []
},
{
"column": "SEX",
"function": "equals",
"value": "女"
"operator": "AND"
"conditions": []
} ]
}
}
}
4.示例
4.1.前置条件
1)完成安装KFS同步程序前的环境准备;
2)源端目标端数据库含有以下表结构:
create table mytest.test4(id int, c1 int, c2 varchar2(100),c3 timestamp);
create table mytest.test5(id int, c1 int, c2 varchar2(100),c3 timestamp);
create table mytest.test6(id int, c1 int, c2 varchar2(100),c3 timestamp);
4.2.操作步骤
1)配置flysync.ini文件,在目标端的flysync.ini添加以下参数
[default]
...
[target]
...
svc-remote-filters = skipeventbyvalue #添加指定值过滤器
property=replicator.filter.skipeventbyvalue.definitionsFile=/home/hes/skipeventbyvalue.json
#指定值过滤处理规则文件
2)值过滤处理规则文件的skipeventbyvalue.json文件,假定筛选条件如下:
- 过滤掉mytest模式下test4表c1列的值大于200且c2列的值等于“test”的数据
- 过滤掉mytest模式下test5表c3列小于2023-10-10 00:00:00的数据
对应的skipeventbyvalue.json配置文件如下:
{
"mytest.test4": {
"condition": {
"column": "",
"function": "",
"value": "",
"operator": "",
"conditions": [ {
"column": "c1",
"function": ">",
"value": 200,
"operator": "",
"conditions": []},
{
"column": "c2",
"function": "equals",
"value": "test",
"operator": "AND",
"conditions": []}
]
}
},
"mytest.test5": {
"condition": {
"column": "c3",
"function": "<",
"value": "2023-10-10 00:00:00",
"operator": "",
"conditions": []}
}
}
注意:配置文件区分大小写,若配置在源端,配置文件中的对象名称要和源端数据库中的对象名称大小写保持一致;若配置在目标端需要和目标端kufl中对象名称的大小写保持一致。
3)安装同步程序并启动。具体的安装步骤参见《Kingbase FlySync 安装部署手册》
4)在源端数据库执行以下操作:
insert into mytest.test4 values(1,100,'test_001',to_timestamp('2008-01-01 00:00:00.0', 'yyyy-mm-dd hh24:mi:ss.ff'));
insert into mytest.test4 values(2,101,'test',to_timestamp('2013-11-01 00:00:00.0', 'yyyy-mm-dd hh24:mi:ss.ff'));
insert into mytest.test4 values(3,201,'test',to_timestamp('2023-11-01 00:00:00.0', 'yyyy-mm-dd hh24:mi:ss.ff'));
insert into mytest.test4 values(4,202,'ceshi_004',to_timestamp('2023-11-01 00:00:00.0', 'yyyy-mm-dd hh24:mi:ss.ff'));
源端数据库的中数据如下所示:

查看目标端的数据,test4表c1列的值大于200且c2列的值等于“test”的数据都被过滤了:

5) 在源端数据库执行以下操作:
insert into mytest.test5 values(1,100,'test_001',to_timestamp('2008-01-01 00:00:00.0', 'yyyy-mm-dd hh24:mi:ss.ff'));
insert into mytest.test5 values(2,101,'test',to_timestamp('2013-11-01 00:00:00.0', 'yyyy-mm-dd hh24:mi:ss.ff'));
insert into mytest.test5 values(3,201,'test',to_timestamp('2023-11-01 00:00:00.0', 'yyyy-mm-dd hh24:mi:ss.ff'));
insert into mytest.test5 values(4,202,'ceshi_004',to_timestamp('2023-11-01 00:00:00.0', 'yyyy-mm-dd hh24:mi:ss.ff'));
源端数据库的中数据如下所示:

查看目标端的数据,test5表c3列小于2023-10-10 00:00:00的数据都被过滤了:

5)在源端数据库执行以下操作:
insert into mytest.test6 values(1,100,'test_001',to_timestamp('2008-01-01 00:00:00.0', 'yyyy-mm-dd hh24:mi:ss.ff'));
insert into mytest.test6 values(2,101,'test',to_timestamp('2013-11-01 00:00:00.0', 'yyyy-mm-dd hh24:mi:ss.ff'));
insert into mytest.test6 values(3,201,'test',to_timestamp('2023-11-01 00:00:00.0', 'yyyy-mm-dd hh24:mi:ss.ff'));
insert into mytest.test6 values(4,202,'ceshi_004',to_timestamp('2023-11-01 00:00:00.0', 'yyyy-mm-dd hh24:mi:ss.ff'));
源端数据库的中数据如下所示:

查看目标端的数据,test6表的数据均没有被过滤:





