暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

阿里云RDS PostgreSQL OSS 外部表实践 - (dblink异步调用封装并行) 从OSS并行导入数据

digoal 2018-04-27
250

作者

digoal

日期

2018-04-27

标签

PostgreSQL , oss对象存储 , 阿里云RDS PG , 并行写 , dblink , 异步调用 , 异步任务监控 , OSS外部表 , 数据传输


背景

《阿里云RDS PostgreSQL OSS 外部表实践 - (dblink异步调用封装并行) 数据并行导出到OSS》

本文为从OSS并行导入数据到数据库中。

请先阅读:

RDS PG OSS 外部表文档1

RDS PG OSS 外部表文档2

原文

https://www.atatech.org/articles/98990

一.准备工作

首先,创建我们要用到的插件。

create extension dblink; create extension oss_fdw;

二.创建异步存储过程

异步数据装载的准备工作,获取oss文件列表

```
CREATE OR REPLACE FUNCTION rds_oss_fdw_load_data_prepare(t_from text, t_to text)
RETURNS bool AS
$BODY$
DECLARE
t_exist int;
curs1 refcursor;
r record;
filepath text;
fileindex int8;
s1 text;
s2 text;
s3 text;
c int = 0;
s4 text;
s5 text;
ss4 text;
ss5 text;
sql text;
BEGIN
create table if not exists oss_fdw_load_status(id BIGSERIAL primary key, filename text, size int8, rows int8 default 0, status int default 0);

select count(*) into t_exist from oss_fdw_load_status;

if t_exist != 0 then  
    RAISE NOTICE 'oss_fdw_load_status not empty';  
    return false;  
end if;

-- 通过 oss_fdw_list_file 函数,把外部表 t_from 匹配的 OSS 中的文件列到表中  
insert into oss_fdw_load_status (filename, size) select name,size from oss_fdw_list_file(t_from) order by size desc;

select count(*) into t_exist from oss_fdw_load_status;  
if t_exist = 0 then  
    RAISE NOTICE 'oss_fdw_load_status empty,not task found';  
    return false;  
end if;

return true;

END;
$BODY$
LANGUAGE plpgsql;
```

数据装载的工作函数

```
CREATE OR REPLACE FUNCTION rds_oss_fdw_load_data_execute(t_from text, t_to text, num_work int, pass text)
RETURNS setof text AS
$BODY$
DECLARE
t_exist int;
curs1 refcursor;
r record;
filepath text;
fileindex int8;
s1 text;
s2 text;
s3 text;
c int = 0;
s4 text;
s5 text;
ss4 text;
ss5 text;
sql text;
db text;
user text;
BEGIN
select count(*) into t_exist from oss_fdw_load_status;
if t_exist = 0 then
RAISE NOTICE 'oss_fdw_load_status empty';
return next 'false';
end if;

s4 = 'oss_loader';  
s5 = 'idle';  
ss4 = '''' || s4 ||'''';  
ss5 = '''' || s5 ||'''';  
sql = 'select count(*) from pg_stat_activity where application_name = ' || ss4 || ' and state != ' || ss5;

select current_database() into db;  
select current_user into user;

-- 通过游标,不断获取单个任务  
OPEN curs1 FOR SELECT id, filename FROM oss_fdw_load_status order by id;  
loop  
    fetch curs1 into r;  
    if not found then  
        exit;  
    end if;  
    fileindex = r.id;  
    filepath = r.filename;

    s1 = '''' || t_from ||'''';  
    s2 = '''' || t_to ||'''';  
    s3 = '''' || filepath ||'''';

    LOOP  
        -- 查看当前正在工作的任务数,过达到并发数就在这里等待  
        select a into c from dblink('hostaddr=127.0.0.1 port='||current_setting('port')||' dbname='||db ||' user='||user || ' password='||pass ,sql)as t(a int);  
        IF c < num_work THEN  
            EXIT;  
        END IF;  
        RAISE NOTICE 'current runing % loader', c;  
        perform pg_sleep(1);  
    END LOOP;

    -- 通过 DBLINK 创建异步任务  
    perform dis_conn('oss_loader_'||fileindex);  
    perform dblink_connect('oss_loader_'||fileindex, 'hostaddr=127.0.0.1 port='||current_setting('port')||' dbname='||db ||' user='||user || ' application_name=oss_loader' || ' password='||pass);  
    perform dblink_send_query('oss_loader_'||fileindex, format('  
        select rds_oss_fdw_load_single_file(%s,%s,%s,%s);  
        '
        , fileindex, s1, s2, s3)  
    );  
    RAISE NOTICE 'runing loader task % filename %', fileindex, filepath;  
end loop;  
close curs1;

-- 任务分配完成,等待所有任务完成  
LOOP  
    select a into c from dblink('hostaddr=127.0.0.1 port='||current_setting('port')||' dbname='||db ||' user='||user || ' password='||pass ,sql)as t(a int);  
    IF c = 0 THEN  
        EXIT;  
    END IF;  
    RAISE NOTICE 'current runing % loader', c;  
    perform pg_sleep(1);  
END LOOP;

-- 获取异步调用结果
OPEN curs1 FOR SELECT id, filename FROM oss_fdw_load_status order by id;  
loop  
    fetch curs1 into r;  
    if not found then  
        exit;  
    end if;  
    fileindex = r.id;

    return query select fileindex||' - '||res from dblink_get_result('oss_loader_'||fileindex) as t(res text);   
end loop;  
close curs1;

return next 'true';
return;

END;
$BODY$
LANGUAGE plpgsql;
```

单个文件的数据装载函数(设置不同的会话参数oss_fdw.rds_read_one_file,可以读取不同的OSS文件,用完重置)

```
CREATE OR REPLACE FUNCTION rds_oss_fdw_load_single_file(taskid int8, t_from text, t_to text, filepath text)
RETURNS void AS
$BODY$
DECLARE
rowscount int8 = 0;
current text;
sql text;
BEGIN
-- 配置 GUC 参数,指定要导入的 OSS 上的文件
perform set_config('oss_fdw.rds_read_one_file',filepath,true);
select current_setting('oss_fdw.rds_read_one_file') into current;
RAISE NOTICE 'begin load %', current;

-- 通过动态 SQL 导入数据  
EXECUTE 'insert into '|| t_to || ' select * from ' || t_from;  
-- 如果写入需要做一些转换,可以使用format函数拼接,例如某些多值列需要加大括号进行格式化。或者需要对来源的多个字段做geometry化
-- 例如OSS外部表c2字段格式为逗号分隔的字符串1,2,3,4,234 , 本地目标表的类型为int[]数组。
-- 例如OSS外部表col_x,col_y两个字段分别表示精度和纬度,  本地目标表使用pos 一个字段表示geometry类型 
-- 转换如下
-- execute format($_$ insert into %s (c1,c2,pos) select c1,('{'||c2||'}')::int[],st_setsrid(st_makepoint(col_x,col_y),4326) from %s $_$, t_to, t_from);
GET DIAGNOSTICS rowscount = ROW_COUNT;

-- 导入完成后,把结果保存到状态表中  
RAISE NOTICE 'end load id % % to % % rows', taskid, filepath, t_to, rowscount;  
update oss_fdw_load_status set rows = rowscount,status = 1 where id = taskid;  
return;

-- EXCEPTION
-- when others then
-- RAISE 'run rds_oss_fdw_load_single_file with error';
END;
$BODY$
LANGUAGE plpgsql;
```

关闭连接不报错函数

create or replace function dis_conn(name) returns void as $$ declare begin perform dblink_disconnect($1); return; exception when others then return; end; $$ language plpgsql strict;

三.使用函数装载数据

1、 创建本地表(目标表)结构

2、 将包含数据的文件写入OSS

3、 在RDS PG中创建OSS外部表

4、 准备需要并行导入的列表

select rds_oss_fdw_load_data_prepare('oss_table','lineitem');

执行后,会看到表 oss_fdw_load_status 中,保存了准备导入的所有文件列表,用户可以做适当的删减定制。

4、 数据装载

select rds_oss_fdw_load_data_execute('oss_table','lineitem',10,'mypassword');

函数 rds_oss_fdw_load_data_execute 会等待数据导入的完成才返回。

5、 查询状态
期间,我们可以通过下列 SQL 查看正在工作的异步会话状态

select application_name, state, pid,query, now() - xact_start as xact from pg_stat_activity where state != 'idle' and application_name='oss_loader' order by xact desc;

6、 管理状态

同时,我们也可以随时中断数据导入工作

select pg_terminate_backend(pid),application_name, state ,query from pg_stat_activity where state != 'idle' and pid != pg_backend_pid() and application_name='oss_loader';

7、 查看进度

我们也很容易看到整个数据装载的进度(单位 MB)

select ( select sum(size)/1024/1024 as complete from oss_fdw_load_status where status = 1 )a, ( select sum(size)/1024/1024 as full from oss_fdw_load_status )b;

8、 性能

使用 TPCC 100GB的数据进行装载测试,耗时 10 分钟,平均 170MB/S

```
select rds_oss_fdw_load_data_prepare('t_oss2','lineitem');

select rds_oss_fdw_load_data_execute('t_oss2','lineitem',10,'123456Zwj');

select sum(size)/1024/1024 from oss_fdw_load_status;
?column?


22561.919849395752
(1 row)

select pg_size_pretty(pg_relation_size(oid)) from pg_class where relname = 'lineitem';
pg_size_pretty


101 GB
(1 row)
```

性能极限扩展

为了提高本地加载速度,用户可以这么做:

1、目标表选择UNLOGGED TABLE,注意如果选择unlogged table,那么数据库崩溃后unlogged table的数据会被清除,并且请记住备库看不到unlogged table的数据。

除非你的数据是定期全量覆盖的,否则不建议用unlogged table来加速。

create unlogged table xxx (xx xx);

2、选择多个目标表

由于单个目标表,在INDEX LOCK,在EXTEND BLOCK方面都有一定的局限性,为了达到极限,可以使用多个目标表。例如每一批OSS文件对应一个本地表分区。

-- 并行 insert into tbl1 select * from oss_tbl1; insert into tbl2 select * from oss_tbl2; ..... insert into tblx select * from oss_tblx;

3、导入前关闭目标表的表级autovacuum

autovacuum会影响导入性能,因为它要消耗一定的IO。

```
alter table tbl_dict set (autovacuum_enabled =off);

alter table tbl_dict set (toast.autovacuum_enabled =off);
```

4、导入后再开启目标表的autovacuum,收集统计信息

```
alter table tbl_dict set (autovacuum_enabled =on);

alter table tbl_dict set (toast.autovacuum_enabled =on);
```

5、后创建索引(可以并行)

索引可以同时创建(单个表的多个索引可以同时创建,不会相互锁等待。多个表的多个索引也可以同时创建),如果创建索引过程中不需要执行DML,建议不要开启concurrently选项,否则建议开启。

同时创建,可以提高资源利用率,达到快速完成数据导入加索引创建的目标。

云端相关产品

阿里云 RDS PostgreSQL

阿里云 HybridDB for PostgreSQL

RDS PG OSS 外部表文档1

RDS PG OSS 外部表文档2

HDB PG OSS 外部表文档

《阿里云RDS PostgreSQL OSS 外部表实践 - (dblink异步调用封装并行) 数据并行导出到OSS》

PostgreSQL 许愿链接

您的愿望将传达给PG kernel hacker、数据库厂商等, 帮助提高数据库产品质量和功能, 说不定下一个PG版本就有您提出的功能点. 针对非常好的提议,奖励限量版PG文化衫、纪念品、贴纸、PG热门书籍等,奖品丰富,快来许愿。开不开森.

9.9元购买3个月阿里云RDS PostgreSQL实例

PostgreSQL 解决方案集合

德哥 / digoal's github - 公益是一辈子的事.

digoal's wechat

文章转载自digoal,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论