一、前言
前段时间我公司一客户需要将Sybase ASE数据库替换成PostgreSQL,其数据库数据约为30G。其中数据表数量为900多张,索引为300多个,存储过程为40多个(约4万行代码)。

二、数据表迁移
2.1 数据类型比对
Sybase ASE的数据类型与PostgreSQL数据类型存在一些差异,在迁移表之前,我们需要调研好数据类型对应关系才能建立好正确的表结构。
| 数据类型 | |||
|---|---|---|---|
| Sybase Adaptive Server | Description | PostgreSQL | Description |
| char | 一致 | CHAR | 一致 |
| integer | 整型 | INTEGER | 一致 |
| datetime | 年月日时分秒 毫秒 | timestamp without time zone 或者 timestamp | 年月日时分秒 毫秒 纳秒 |
| varchar | 一致 | varchar | 一致 |
| nvarchar | Unicode数据类型的字符,它所有的字符都用两个字节表示 | varchar | 中文两字节,英文一字节 |
| SMALLINT | 一致 | SMALLINT | 一致 |
| decimal | 一致 | decimal | 一致 |
| binary | 二进制数据类型 | bytea | 二进制数据类型 |
| tinyint | 保存整型数据,范围为0到255。最大长度为1字节。 | SMALLINT | 保存整型数据,-32768 到 +32767。长度为2字节。 |
2.2 通过外部表访问Sybase数据库
PostgreSQL有外部表插件支持众多数据库,对Sybase而言,可以通过PostgreSQL的TDS_fdw(安装略)工具访问Sybase ASE数据库中的表。如此,我们可以利用该特性,通过PostgreSQL函数实现迁移工具的制作。
例如我们先在Sybase数据库中创建一个表,并插入一些数据
1> use master 2> go 1> create table test_mg(id int,name varchar(20),other tinyint) 2> go 1> create table test_mg(id int,name varchar(20),other tinyint,price decimal(6,2)) 2> go 1> insert into test_mg select 1,'商品一',20,20.18 insert into test_mg select 2,'商品二',30,36.68 insert into test_mg select 3,'商品三',40,45.98 2> 3> 4> go (1 row affected) (1 row affected) (1 row affected)
为了在PostgreSQL中访问test_mg表,需要做如下操作:
1、创建服务器
CREATE SERVER serverFOREIGN DATA WRAPPER tds_fdw OPTIONS (servername '192.168.1.164', port '4114');
2、创建用户映射
CREATE USER MAPPING FOR postgresSERVER serverOPTIONS (username 'sa', password '123456');
3、创建外部表
CREATE FOREIGN TABLE test_mg ( id int , name varchar(20) , other smallint, price decimal(6,2) )SERVER serverOPTIONS (query 'select * from test_mg');
在PG中查看该外部表
postgres=# select * from test_mg ;NOTICE: tds_fdw: Query executed correctly NOTICE: tds_fdw: Getting results id | name | other | price ----+------+-------+------- 1 | Ʒһ | 20 | 20.18 2 | Ʒ | 30 | 36.68 3 | Ʒ | 40 | 45.98 (3 rows)
从结果的name字段我们发现是乱码,这是因为PostgreSQL使用的是UTF8编码,而Sybase使用的是GBK,解决该问题可以使用PG中的convert_from函数
postgres=# select id,convert_from(name::bytea,'GB18030'),other ,price from test_mg ;NOTICE: tds_fdw: Query executed correctly NOTICE: tds_fdw: Getting results id | convert_from | other | price ----+--------------+-------+------- 1 | 商品一 | 20 | 20.18 2 | 商品二 | 30 | 36.68 3 | 商品三 | 40 | 45.98 (3 rows)
那么通过上述步骤我们可以看出,如果想以PostgreSQL函数实现表自动化迁移,难点是要获取到被迁移表的表结构及处理好对应数据类型及长度。至于字符编码处理相对较容易写,我们可以使用convert_from函数在对每个字段进行一个处理即可。
2.3 利用函数实现迁移
我们可以充分利用PostgreSQL外部表,先建立一个我们所需要的Sybase系统表关于被迁移表信息的外部表,从而得到被迁移表的结构。
CREATE FOREIGN TABLE tb_status ( colname varchar, typename varchar, collength int, scale int, prec int, allow_null int ) SERVER server OPTIONS (query ' select a.name as colname, b.name as typename, a.length as collength,a.scale as scale, a.prec as prec , convert(bit,(a.status & 8)) as allow_null from master..syscolumns a,master..systypes b , master..sysobjects c where a.usertype=b.usertype and a.id=c.id and c.id =object_id("master..test_mg") order by colid');
在PG中查询该外部表:
postgres=# select * from tb_status; NOTICE: tds_fdw: Query executed correctly NOTICE: tds_fdw: Getting results colname | typename | collength | scale | prec | allow_null ---------+----------+-----------+-------+------+------------ id | int | 4 | | | 0 name | varchar | 20 | | | 0 other | tinyint | 1 | | | 0 price | decimal | 4 | 2 | 6 | 0 (4 rows)
在PostgreSQL中创建临时表结构数据同外部表tb_status,以便处理数据类型对应:
postgres=# create temp table tb_status_pg as select * from tb_status ;NOTICE: tds_fdw: Query executed correctly NOTICE: tds_fdw: Getting resultsSELECT 4
加入一列序列,以便之后作为排序依据,防止之后建表语句中字段排列顺序错位
alter TABLE tb_status_pg add seq serial;
至此,我们可以删除tb_status外部表并且开始处理数量类型对应关系了
drop FOREIGN TABLE tb_status; update tb_status_pg set typename = (case when lower(typename) = 'datetime' then 'timestamp' when lower(typename) = 'tinyint' then 'smallint' when lower(typename) = 'nvarchar' then 'varchar' when lower(typename) = 'binary' then 'bytea' when lower(typename) = 'image' then 'bytea' when lower(typename) = 'longsysname' then 'varchar' else typename end);
这个基本已经是我们想要的结果了
postgres=# select * from tb_status_pg; colname | typename | collength | scale | prec | allow_null | seq ---------+----------+-----------+-------+------+------------+----- id | int | 4 | | | 0 | 1 name | varchar | 20 | | | 0 | 2 other | smallint | 1 | | | 0 | 3 price | decimal | 4 | 2 | 6 | 0 | 4 (4 rows)
进一步优化结果:
update tb_status_pg set collength = '('||collength||')' where scale is null and prec is null ;update tb_status_pg set collength = '('||prec::varchar||','||scale::varchar||')' where scale is not null and prec is not null ;update tb_status_pg set collength = '' where typename = 'timestamp' or lower(typename) = 'integer' or lower(typename) = 'smallint' or lower(typename) = 'int' or lower(typename) = 'bytea' or lower(typename) = 'real' or lower(typename) = 'text' or lower(typename) = 'bigint';
查询pg_status_pg:
postgres=# select * from tb_status_pg; colname | typename | collength | scale | prec | allow_null | seq ---------+----------+-----------+-------+------+------------+----- name | varchar | (20) | | | 0 | 2 price | decimal | (6,2) | 2 | 6 | 0 | 4 id | int | | | | 0 | 1 other | smallint | | | | 0 | 3 (4 rows)
那么此时我们就可以利用上述临时表在函数中通过循环来拼接出迁移外部表的建表语句了。
以下函数为实现PostgreSQL外部表迁移主要函数:
CREATE OR REPLACE FUNCTION migration_func(schname varchar,tbname varchar,encoding varchar default 'None' ,al_null int default 0)returns voidas$$declarev_sql varchar;v_sql2 varchar = ''; v_sqlsy varchar; v_sqlsy2 varchar = ''; v_sql_insert varchar; v_sql_insert2 varchar = '';begin v_sql = 'create schema if not exists '||$1; execute v_sql; --创建外部表收集被迁移表的表结构信息,从Sybase系统表中查得 v_sql = ' CREATE FOREIGN TABLE tb_status ( colname varchar, typename varchar, collength int, scale int, prec int, allow_null int ) SERVER mig_server OPTIONS (query '' select a.name as colname, b.name as typename, a.length as collength,a.scale as scale, a.prec as prec , convert(bit,(a.status & 8)) as allow_null from '||$1||'..syscolumns a,'||$1||'..systypes b , '||$1||'..sysobjects c where a.usertype=b.usertype and a.id=c.id and c.id =object_id("'||$1||'..'||$2||'") order by colid'')'; execute v_sql; --将所收集到的被迁移表的外部表信息转换成本地临时表,以便可修改 create temp table tb_status_pg on commit drop as select * from tb_status ; alter TABLE tb_status_pg add seq serial; drop FOREIGN TABLE tb_status; --更新数据类型差异对应逻辑 update tb_status_pg set typename = (case when lower(typename) = 'datetime' then 'timestamp' when lower(typename) = 'tinyint' then 'smallint' when lower(typename) = 'nvarchar' then 'varchar' when lower(typename) = 'binary' then 'bytea' when lower(typename) = 'image' then 'bytea' when lower(typename) = 'longsysname' then 'varchar' else typename end); alter table tb_status_pg alter COLUMN collength type varchar,alter COLUMN allow_null type varchar; --是否允许非空约束逻辑 if al_null = 0 then update tb_status_pg set allow_null = (case allow_null when '0' then 'not null' when '1' then 'null' end); else update tb_status_pg set allow_null ='null'; end if; --数据类型长度逻辑处理 update tb_status_pg set collength = '('||collength||')' where scale is null and prec is null ; update tb_status_pg set collength = '('||prec::varchar||','||scale::varchar||')' where scale is not null and prec is not null ; update tb_status_pg set collength = '' where typename = 'timestamp' or lower(typename) = 'integer' or lower(typename) = 'smallint' or lower(ty pename) = 'int' or lower(typename) = 'bytea' or lower(typename) = 'real' or lower(typename) = 'text' or lower(typename) = 'bigint'; --将数据类型、长度、是否非空循环拼接成所需的建表(被迁移外部表所在PG的结构)语句,并加入编码转换功能 for i in 0..(select count(1) from tb_status_pg) loop select concat('"',colname,'"',' ',typename,collength,' ',allow_null,',') into v_sql from tb_status_pg order by seq limit 1 offset i ; select concat(colname,',') into v_sqlsy from tb_status_pg order by seq limit 1 offset i; select concat('convert_from(replace("'||colname||'"::varchar,''\'',''\\'')::bytea, '''||encoding||''')::'||typename||collength||',') from tb_status_pg into v_sql_insert order by seq limit 1 offset i; v_sql2 = concat(v_sql2,v_sql); v_sqlsy2 = concat(v_sqlsy2,v_sqlsy); v_sql_insert2 = concat(v_sql_insert2,v_sql_insert); end loop; v_sql2 = substring(v_sql2,1,length(v_sql2)-1); v_sqlsy2 = substring(v_sqlsy2,1,length(v_sqlsy2)-1); v_sql_insert2 = substring(v_sql_insert2,1,length(v_sql_insert2)-1); v_sql = 'CREATE FOREIGN TABLE tmp_tb('||v_sql2||') SERVER mig_server OPTIONS (query ''select '||v_sqlsy2||' from '||$1||'..'||$2||''')'; execute v_sql; v_sql = 'create table '||$1||'.'||$2||'('||lower(v_sql2)||')'; execute v_sql; --依据第三参数判断是否需要转换编码 if $3 = 'None' then v_sql = 'insert into '||$1||'.'||$2||' select * from tmp_tb'; else v_sql = 'insert into '||$1||'.'||$2||' select '||v_sql_insert2||' from tmp_tb'; end if; execute v_sql; drop FOREIGN TABLE tmp_tb; drop table tb_status_pg; end; $$ LANGUAGE PLPGSQL;
通过该函数,我们可以实现只输入schema + 表名即可轻松实现Sybase的单表迁移,同理也能实现索引、约束迁移,如需要整库迁移,可在此函数上层编写一个库级迁移函数来循环调用此函数来实现。当然,也可以通过其他语言编写GUI调用改函数实现迁移。

扫码关注了解更多





