1、创建被调用的子存过
CREATE OR REPLACE PROCEDURE sub_proc(p_id int)
AS
BEGIN
RAISE NOTICE 'Start sub_proc(%) at %', p_id, clock_timestamp();
PERFORM pg_sleep(5); -- 模拟耗时任务
RAISE NOTICE 'End sub_proc(%) at %', p_id, clock_timestamp();
END;
/
2、创建主存过
CREATE OR REPLACE PROCEDURE run_task_pool()
AS
DECLARE
task_ids int[] := ARRAY[1,2,3,4,5,6,7,8,9,10,11]; -- 模拟任务列表
max_concurrent int := 5;
running_conns text[] := '{}'; -- 当前运行中的连接名
task_id int;
conn_name text;
finished boolean;
BEGIN
FOREACH task_id IN ARRAY task_ids LOOP
-- 如果池子已满,就等待其中一个完成
WHILE array_length(running_conns,1) >= max_concurrent LOOP
finished := false;
FOR conn_name IN SELECT unnest(running_conns) LOOP
IF dblink_is_busy(conn_name) = false THEN
-- 取结果并释放连接
RAISE NOTICE 'Task finished on %', conn_name ;
PERFORM dblink_disconnect(conn_name);
running_conns := array_remove(running_conns, conn_name);
finished := true;
EXIT; -- 只释放一个就够了
END IF;
END LOOP;
IF NOT finished THEN
PERFORM pg_sleep(0.5); -- 等待再轮询
END IF;
END LOOP;
-- 启动新的任务
conn_name := 'job_' || task_id;
PERFORM dblink_connect(conn_name, 'dbname=' || current_database() || ' user=' || current_user);
PERFORM dblink_send_query(conn_name, 'CALL sub_proc(' || task_id || ')');
running_conns := array_append(running_conns, conn_name);
RAISE NOTICE 'Task % started on %', task_id, conn_name;
END LOOP;
-- 等待所有剩余任务完成
FOR conn_name IN SELECT unnest(running_conns) LOOP
LOOP
EXIT WHEN dblink_is_busy(conn_name) = false;
PERFORM pg_sleep(0.5);
END LOOP;
RAISE NOTICE 'Task finished on %', conn_name;
PERFORM dblink_disconnect(conn_name);
END LOOP;
END;
/
3、调用主存过执行
3.1 先创建dblink插件
testdb=# create extension dblink ;
CREATE EXTENSION
3.2 执行主存过
testdb=# call run_task_pool() ;
NOTICE: Task 1 started on job_1
NOTICE: Task 2 started on job_2
NOTICE: Task 3 started on job_3
NOTICE: Task 4 started on job_4
NOTICE: Task 5 started on job_5
NOTICE: Task finished on job_1
NOTICE: Task 6 started on job_6
NOTICE: Task finished on job_2
NOTICE: Task 7 started on job_7
NOTICE: Task finished on job_3
NOTICE: Task 8 started on job_8
NOTICE: Task finished on job_4
NOTICE: Task 9 started on job_9
NOTICE: Task finished on job_5
NOTICE: Task 10 started on job_10
NOTICE: Task finished on job_6
NOTICE: Task 11 started on job_11
NOTICE: Task finished on job_7
NOTICE: Task finished on job_8
NOTICE: Task finished on job_9
NOTICE: Task finished on job_10
NOTICE: Task finished on job_11
run_task_pool
---------------
(1 row)
4、其他参考
--查询当前在用的conn_text
SELECT dblink_get_connections();
--查询指定的conn_text 是否在使用
SELECT dblink_is_busy('job_1');
t = 还在执行 SQL
f = 已经空闲,可以 disconnect
--关闭连接
SELECT dblink_disconnect('job_1');「喜欢这篇文章,您的关注和赞赏是给作者最好的鼓励」
关注作者
【版权声明】本文为墨天轮用户原创内容,转载时必须标注文章的来源(墨天轮),文章链接,文章作者等基本信息,否则作者和墨天轮有权追究责任。如果您发现墨天轮中有涉嫌抄袭或者侵权的内容,欢迎发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。




