暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

磐维数据库存过并发调用子存过

原创 feilunshuai 2025-08-26
56

1、创建被调用的子存过

CREATE OR REPLACE PROCEDURE sub_proc(p_id int)
AS 
BEGIN
    RAISE NOTICE 'Start sub_proc(%) at %', p_id, clock_timestamp();
    PERFORM pg_sleep(5);  -- 模拟耗时任务
    RAISE NOTICE 'End sub_proc(%) at %', p_id, clock_timestamp();
END;
/


2、创建主存过

CREATE OR REPLACE PROCEDURE run_task_pool()
AS 
DECLARE
    task_ids int[] := ARRAY[1,2,3,4,5,6,7,8,9,10,11]; -- 模拟任务列表
    max_concurrent int := 5;
    running_conns text[] := '{}'; -- 当前运行中的连接名
    task_id int;
    conn_name text;
    finished boolean;
BEGIN
    FOREACH task_id IN ARRAY task_ids LOOP
        -- 如果池子已满,就等待其中一个完成
        WHILE array_length(running_conns,1) >= max_concurrent LOOP
            finished := false;
            FOR conn_name IN SELECT unnest(running_conns) LOOP
                IF dblink_is_busy(conn_name) = false THEN
                    -- 取结果并释放连接
                    RAISE NOTICE 'Task finished on %', conn_name ;
                    PERFORM dblink_disconnect(conn_name);
                    running_conns := array_remove(running_conns, conn_name);
                    finished := true;
                    EXIT; -- 只释放一个就够了
                END IF;
            END LOOP;
            IF NOT finished THEN
                PERFORM pg_sleep(0.5); -- 等待再轮询
            END IF;
        END LOOP;

        -- 启动新的任务
        conn_name := 'job_' || task_id;
        PERFORM dblink_connect(conn_name, 'dbname=' || current_database() || ' user=' || current_user);
        PERFORM dblink_send_query(conn_name, 'CALL sub_proc(' || task_id || ')');
        running_conns := array_append(running_conns, conn_name);
        RAISE NOTICE 'Task % started on %', task_id, conn_name;
    END LOOP;

    -- 等待所有剩余任务完成
    FOR conn_name IN SELECT unnest(running_conns) LOOP
        LOOP
            EXIT WHEN dblink_is_busy(conn_name) = false;
            PERFORM pg_sleep(0.5);
        END LOOP;
        RAISE NOTICE 'Task finished on %', conn_name;
        PERFORM dblink_disconnect(conn_name);
    END LOOP;
END;
/



3、调用主存过执行

3.1 先创建dblink插件

testdb=# create extension  dblink ; 
CREATE EXTENSION

3.2 执行主存过

testdb=# call run_task_pool() ; 
NOTICE:  Task 1 started on job_1
NOTICE:  Task 2 started on job_2
NOTICE:  Task 3 started on job_3
NOTICE:  Task 4 started on job_4
NOTICE:  Task 5 started on job_5
NOTICE:  Task finished on job_1
NOTICE:  Task 6 started on job_6
NOTICE:  Task finished on job_2
NOTICE:  Task 7 started on job_7
NOTICE:  Task finished on job_3
NOTICE:  Task 8 started on job_8
NOTICE:  Task finished on job_4
NOTICE:  Task 9 started on job_9
NOTICE:  Task finished on job_5
NOTICE:  Task 10 started on job_10
NOTICE:  Task finished on job_6
NOTICE:  Task 11 started on job_11
NOTICE:  Task finished on job_7
NOTICE:  Task finished on job_8
NOTICE:  Task finished on job_9
NOTICE:  Task finished on job_10
NOTICE:  Task finished on job_11
 run_task_pool 
---------------
 
(1 row)


4、其他参考

--查询当前在用的conn_text
SELECT dblink_get_connections();

--查询指定的conn_text 是否在使用
SELECT dblink_is_busy('job_1');
t = 还在执行 SQL
f = 已经空闲,可以 disconnect

--关闭连接
SELECT dblink_disconnect('job_1');



「喜欢这篇文章,您的关注和赞赏是给作者最好的鼓励」
关注作者
【版权声明】本文为墨天轮用户原创内容,转载时必须标注文章的来源(墨天轮),文章链接,文章作者等基本信息,否则作者和墨天轮有权追究责任。如果您发现墨天轮中有涉嫌抄袭或者侵权的内容,欢迎发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论