摘要
总所周知,阿里云的 PostgreSQL 和 HybridDB for PostgreSQL 和 oss 是全面互通的。HybridDB for PostgreSQL 由于是 MPP 架构天生包括多个计算节点,能够以为并发的方式读写 oss 上的数据。PostgreSQL 在这方面要差一点,默认情况下只能单进程读写 OSS,不过通过 dblink 的加持,我们也能让 OSS 中的数据快速装载到 PostgreSQL。本文就给大家讲讲这其中的黑科技。
一.准备工作
create extension dblink;create extension oss_fdw;
二.创建异步化存储过程
-- 异步数据装载的准备工作CREATE OR REPLACE FUNCTION rds_oss_fdw_load_data_prepare(t_from text, t_to text)RETURNS bool AS$BODY$DECLAREt_exist int;curs1 refcursor;r record;filepath text;fileindex int8;s1 text;s2 text;s3 text;c int = 0;s4 text;s5 text;ss4 text;ss5 text;sql text;BEGINcreate table if not exists oss_fdw_load_status(id BIGSERIAL primary key, filename text, size int8, rows int8 default 0, status int default 0);select count(*) into t_exist from oss_fdw_load_status;if t_exist != 0 thenRAISE NOTICE 'oss_fdw_load_status not empty';return false;end if;-- 通过 oss_fdw_list_file 函数,把外部表 t_from 匹配的 OSS 中的文件列到表中insert into oss_fdw_load_status (filename, size) select name,size from oss_fdw_list_file(t_from) order by size desc;select count(*) into t_exist from oss_fdw_load_status;if t_exist = 0 thenRAISE NOTICE 'oss_fdw_load_status empty,not task found';return false;end if;return true;END;$BODY$LANGUAGE plpgsql;-- 数据装载的工作函数CREATE OR REPLACE FUNCTION rds_oss_fdw_load_data_execute(t_from text, t_to text, num_work int, pass text)RETURNS bool AS$BODY$DECLAREt_exist int;curs1 refcursor;r record;filepath text;fileindex int8;s1 text;s2 text;s3 text;c int = 0;s4 text;s5 text;ss4 text;ss5 text;sql text;db text;user text;BEGINselect count(*) into t_exist from oss_fdw_load_status;if t_exist = 0 thenRAISE NOTICE 'oss_fdw_load_status empty';return false;end if;s4 = 'oss_loader';s5 = 'idle';ss4 = '''' || s4 ||'''';ss5 = '''' || s5 ||'''';sql = 'select count(*) from pg_stat_activity where application_name = ' || ss4 || ' and state != ' || ss5;select current_database() into db;select current_user into user;-- 通过游标,不断获取单个任务OPEN curs1 FOR SELECT id, filename FROM oss_fdw_load_status order by id;loopfetch curs1 into r;if not found thenexit;end if;fileindex = r.id;filepath = r.filename;s1 = '''' || t_from ||'''';s2 = '''' || t_to ||'''';s3 = '''' || filepath ||'''';LOOP-- 查看当前正在工作的任务数,过达到并发数就在这里等待select a into c from dblink('dbname='||db ||' user='||user || ' password='||pass ,sql)as t(a int);IF c < num_work THENEXIT;END IF;RAISE NOTICE 'current runing % loader', c;perform pg_sleep(1);END LOOP;-- 通过 DBLINK 创建异步任务perform dis_conn('oss_loader_'||fileindex);perform dblink_connect('oss_loader_'||fileindex, 'dbname='||db ||' user='||user || ' application_name=oss_loader' || ' password='||pass);perform dblink_send_query('oss_loader_'||fileindex, format('begin;select rds_oss_fdw_load_single_file(%s,%s,%s,%s);end;', fileindex, s1, s2, s3));RAISE NOTICE 'runing loader task % filename %',fileindex, filepath;end loop;close curs1;-- 任务分配完成,等待所有任务完成LOOPselect a into c from dblink('dbname='||db ||' user='||user || ' password='||pass ,sql)as t(a int);IF c = 0 THENEXIT;END IF;RAISE NOTICE 'current runing % loader', c;perform pg_sleep(1);END LOOP;return true;END;$BODY$LANGUAGE plpgsql;-- 单个文件的数据装在函数CREATE OR REPLACE FUNCTION rds_oss_fdw_load_single_file(taskid int8, t_from text, t_to text, filepath text)RETURNS void AS$BODY$DECLARErowscount int8 = 0;current text;sql text;BEGIN-- 配置 GUC 参数,指定要导入的 OSS 上的文件perform set_config('oss_fdw.rds_read_one_file',filepath,true);select current_setting('oss_fdw.rds_read_one_file') into current;RAISE NOTICE 'begin load %', current;-- 通过动态 SQL 导入数据EXECUTE 'insert into '|| t_to || ' select * from ' || t_from;GET DIAGNOSTICS rowscount = ROW_COUNT;-- 导入完成后,把结果保存到状态表中RAISE NOTICE 'end load id % % to % % rows', taskid, filepath, t_to, rowscount;update oss_fdw_load_status set rows = rowscount,status = 1 where id = taskid;return;EXCEPTIONwhen others thenRAISE 'run rds_oss_fdw_load_single_file with error';END;$BODY$LANGUAGE plpgsql;-- 关闭连接不报错create or replace function dis_conn(name) returns void as $$declarebeginperform dblink_disconnect($1);return;exception when others thenreturn;end;$$ language plpgsql strict;
三.使用函数装载数据
准备数据
select rds_oss_fdw_load_data_prepare('oss_table','lineitem');
2. 数据装载
select rds_oss_fdw_load_data_execute('oss_table','lineitem',10,'mypassword');
3. 查询状态
select application_name, state, pid,query, now() - xact_start as xact from pg_stat_activity where state != 'idle' and application_name='oss_loader' order by xact desc;
4.管理状态
select pg_terminate_backend(pid),application_name, state ,query from pg_stat_activity where state != 'idle' and pid != pg_backend_pid() and application_name='oss_loader';
5. 查看进度
select(select sum(size)/1024/1024 as complete from oss_fdw_load_status where status = 1)a,(select sum(size)/1024/1024 as full from oss_fdw_load_status)b;
6. 性能
select rds_oss_fdw_load_data_prepare('t_oss2','lineitem');select rds_oss_fdw_load_data_execute('t_oss2','lineitem',10,'123456Zwj');select sum(size)/1024/1024 from oss_fdw_load_status;?column?--------------------22561.919849395752(1 row)select pg_size_pretty(pg_relation_size(oid)) from pg_class where relname = 'lineitem';pg_size_pretty----------------101 GB(1 row)
总结
本文使用 plsql + dblink 的方式加速了 OSS 的数据导入。另外,大家也可以关注到以下三点
1. PostgreSQL 默认的过程语言 pl/pgsql 相当好用,和 SQL 引擎紧密结合且学习成本低。我们推荐用户把业务逻辑用它实现。使用过程语言相对于在客户端执行 SQL,消除了服务器到和客户端的网络开销,有天然的性能优势。 2. dblink 的异步接口非常适合做性能加速,且和过程语言紧密结合。推荐在 SQL 和 过程语言中使用。 3. 阿里云开发的 oss_fdw 能在 PostgreSQL 和 OSS 之间做快速的数据交换。oss_fdw 支持 CSV 和压缩方式 CSV 数据的读和写,且很容易用并行加速。oss_fdw 的性能相对于 jdbc insert 和 copy 有压倒的性能优势。
PostgreSQL中文社区欢迎广大技术人员投稿
投稿邮箱:press@postgres.cn
文章转载自PostgreSQL中文社区,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。




