暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

在PostgreSQL中跑后台长任务的方法 - 使用dblink异步接口

digoal 2018-06-21
735

作者

digoal

日期

2018-06-21

标签

PostgreSQL , dblink , 长任务


背景

如果业务上需要在数据库中跑LONG SQL,并且不希望跑的过程中因为窗口断开,导致数据库任务用户主动cancel query。有什么方法?

使用DBLINK异步调用是不错的方法,相当于数据库内部建立了连接在后台跑。

方法

1、创建任务表,方便观察任务状态

create table tbl_task ( id serial8 primary key, -- 任务ID client_info jsonb, -- 客户端描述(usename, datname, search_path, client_addr, client_port) sql text, -- SQL信息 start_time timestamp, -- 开始时间 end_time timestamp default clock_timestamp(), -- 结束时间 info text -- 描述信息 );

2、创建dblink 插件

create extension dblink;

3、创建生成dblink连接的函数,重复创建不报错。

create or replace function conn( name, -- dblink名字 text -- 连接串,URL ) returns void as $$ declare begin perform dblink_connect($1, $2); return; exception when others then return; end; $$ language plpgsql strict;

4、创建异步调用封装函数

```
create or replace function run_task(
sql text, -- 要执行的SQL
info text, -- 任务描述
conn_name name default 'link_for_task', -- dblink 名字
conn text default format('hostaddr=%s port=%s user=%s dbname=%s application_name=run_task', '127.0.0.1', current_setting('port'), current_user, current_database()) , -- 连接串
client_info jsonb default format('{"client_addr":"%s", "client_pot":"%s", "search_path":"%s", "usename":"%s", "datname":"%s"}', inet_client_addr(), inet_client_port(), replace(current_setting('search_path'),'"','\"'), current_user, current_database())::jsonb -- 客户端信息
) returns void as $$
declare
begin
perform conn(conn_name, conn); -- 连接。
-- perform dblink_get_result(conn_name); -- 消耗掉上一次异步连接的结果,否则会报错。

-- 发送异步DBLINK调用
perform dblink_send_query(conn_name, sql||format('; insert into tbl_task(client_info,sql,start_time,info) values (%L, %L, %L, %L);', client_info, sql, clock_timestamp(),info ));
end;
$$ language plpgsql strict;
```

5、调用异步调用封装函数

select run_task( 'select count(*) from test', -- 要RUN的SQL 'test dblink async call' -- 任务描述 );

6、查看当前正在跑的后台任务

postgres=# select * from pg_stat_activity where application_name='run_task'; -[ RECORD 1 ]----+--------------------------------------------------------------------------------------------- datid | 13285 datname | postgres pid | 1510 usesysid | 10 usename | postgres application_name | run_task client_addr | 127.0.0.1 client_hostname | client_port | 55088 backend_start | 2018-06-21 18:04:20.964586+08 xact_start | query_start | 2018-06-21 18:04:20.967177+08 state_change | 2018-06-21 18:04:20.969363+08 wait_event_type | Client wait_event | ClientRead state | idle backend_xid | backend_xmin | query | select count(*) from test; insert into tbl_task(client_info,sql,start_time,info) values (E'{"datname": "postgres", "usename": "postgres", "client_pot": "", "client_addr": "", "search_path": "\\"$user\\", public"}', 'select count(*) from test', '2018-06-21 18:04:20.967118+08', 'test dblink async call') backend_type | client backend

7、查看任务状态

postgres=# select * from tbl_task; -[ RECORD 1 ]------------------------------------------------------------------------------------------------------------------------ id | 1 client_info | {"datname": "postgres", "usename": "postgres", "client_pot": "", "client_addr": "", "search_path": "\"$user\", public"} sql | select count(*) from test start_time | 2018-06-21 18:07:39.60439 end_time | 2018-06-21 18:07:47.331041 info | test dblink async call

如果是数据库普通用户调用,不支持trust认证

如果是普通用户,请使用密码认证,同时请务必保障pg_hba.conf使用的是密码认证。

比如阿里云RDS PPAS的用户:

select run_task( 'select count(*) from test', -- 要RUN的SQL 'test dblink async call' , -- 任务描述 'link_task', format('hostaddr=%s port=%s user=%s dbname=%s application_name=run_task, password=%s', '127.0.0.1', current_setting('port'), current_user, current_database(), '当前用户密码') );

如果是阿里云RDS PG 9.4的用户,内核做过修改,请使用如下方法

select run_task( 'select count(*) from test', -- 要RUN的SQL 'test dblink async call' , -- 任务描述 'link_task', format('user=%s dbname=%s application_name=run_task, password=%s', current_user, current_database(), '密码') );

注意

1、DBLINK异步连接会占用连接,算连接数的。

2、单个DBLINK连接,如果异步调用的SQL没有执行完,不能发起第二次请求。

NOTICE: could not send query: another command is already in progress

那么你需要看看这个DBLINK的后台任务是否还在执行(通过前面查询pg_stat_activity的方法, 执行完state状态为idle),如果执行完了,那么执行以下SQL取一下结果,就可以继续使用这个DBLINK NAME发送异步请求了。

select * from dblink_get_result('link_for_task') as t(id text);

或者,你可以不用等这个DBLINK的异步任务执行完,马上想发起另一个异步任务,那么你需要使用一个新的DBLINK NAME。

select run_task( $$select count(*) from test where c1='abc'$$, -- 要RUN的SQL 'test dblink async call', -- 任务描述 'new_dblink_name' -- 有别于前面已使用的DBLINK NAME );

3、单个DBLINK连接,如果异步调用的SQL执行完了,调用dblink_get_result后,才能发起第二次请求。

NOTICE: could not send query: another command is already in progress

4、单个DBLINK连接,如果异步调用的SQL没有执行完,调用dblink_get_result时,会等待异步调用执行完,才会有返回。等待过程中堵塞当前调用dblink_get_result的会话。

5、调用DBLINK异步接口的会话如果断开了,那么它发起的dblink异步调用后台任务执行完成后,连接会自动释放。

6、如果SQL中包含单引号,可以使用转义的写法,也可以使用没有符号的写法,不需要转义。

select run_task( $$select count(*) from test where c1='abc'$$, -- 要RUN的SQL 'test dblink async call' -- 任务描述 );

select run_task( 'select count(*) from test where c1=''abc''', -- 要RUN的SQL 'test dblink async call' -- 任务描述 );

select run_task( E'select count(*) from test where c1=\'abc\'', -- 要RUN的SQL 'test dblink async call' -- 任务描述 );

美元符号内可以输入任意个字符,成对出现即可。

select run_task( $_qqq_$select count(*) from test where c1='abc'$_qqq_$, -- 要RUN的SQL 'test dblink async call' -- 任务描述 );

参考

https://www.postgresql.org/docs/10/static/dblink.html

《PostgreSQL 批量导入性能 (采用dblink 异步调用)》

PostgreSQL 许愿链接

您的愿望将传达给PG kernel hacker、数据库厂商等, 帮助提高数据库产品质量和功能, 说不定下一个PG版本就有您提出的功能点. 针对非常好的提议,奖励限量版PG文化衫、纪念品、贴纸、PG热门书籍等,奖品丰富,快来许愿。开不开森.

9.9元购买3个月阿里云RDS PostgreSQL实例

PostgreSQL 解决方案集合

德哥 / digoal's github - 公益是一辈子的事.

digoal's wechat

文章转载自digoal,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论