暂无图片
暂无图片
1
暂无图片
暂无图片
暂无图片

PostgreSQL 与openGauss/MogDB 定时任务

原创 Maleah 2022-11-01
2292

PostgreSQL

可以通过元命令\watchpg_cron插件以及依赖于pgAdmin图形化工具执行定时任务

下面介绍的是通过pg_cron插件来实现

1、安装

# 下载安装包 wget https://github.com/citusdata/pg_cron/archive/refs/tags/v1.4.2.tar.gz # 解压 tar -xf pg_cron-1.4.2.tar.gz -C /home/postgres/ # 编译安装 cd /home/postgres/pg_cron-1.4.2 make make install # 修改 postgresql.auto.conf 文件 shared_preload_libraries='pg_cron' cron.database_name = 'postgres' -- 存储pg_cron元数据的数据库,默认是 poTgres # 数据库添加扩展:只能在 cron.database_name 指定的数据库下创建 create extension pg_cron;

添加完扩展之后,会在指定的数据库下创建cron.job和cron.job_run_details这两张表

2、使用

--# 测试表
postgres=# \d cron_test
                        Table "public.cron_test"
   Column    |           Type           | Collation | Nullable | Default
-------------+--------------------------+-----------+----------+---------
 id          | integer                  |           |          |
 insert_time | timestamp with time zone |           |          | now()

--#1. 创建定时任务:每分钟插入一条数据
postgres=# select cron.schedule('postgres cron','* * * * *','insert into cron_test(id) values((random()*100)::int)') ;
 schedule
----------
        4
(1 row)

--#2. 查看定时任务
postgres=# select jobid,jobname,schedule,command from cron.job ;
 jobid |    jobname    | schedule  |                     command
-------+---------------+-----------+--------------------------------------------------
     4 | postgres cron | * * * * * | insert into test(id) values((random()*100)::int)
(1 row)

--#3. 查看定时任务运行情况
postgres=# select * from cron_test ;
 id |          insert_time
----+-------------------------------
 33 | 2022-11-01 15:18:00.007206+08
 12 | 2022-11-01 15:19:00.010899+08
(2 rows)

--#4. 修改定时任务:每两分钟执行一次
--##1) cron.schedule 修改
postgres=# select cron.schedule('postgres cron','0 */2 * * *','insert into cron_test(id) values((random()*100)::int)') ;
 schedule
----------
        4
(1 row)
### 注意:使用cron.schedule修改定时任务需要在jobname已经指定的情况下,否则会作为一条新的定时任务

--##2) 修改 cron.job 表 
update cron.job set schedule = '*/2 * * * *' ;

--##3) cron.alter_job 修改
select cron.alter_jo(4,'0 */2 * * *','insert into cron_test(id) values((random()*100)::int)') ;

postgres=# select jobid,jobname,schedule,command from cron.job ;
 jobid |    jobname    |  schedule   |                        command
-------+---------------+-------------+-------------------------------------------------------
     4 | postgres cron | */2 * * * * | insert into cron_test(id) values((random()*100)::int)
(1 row)

--#5. 取消定时任务
postgres=# select cron.unschedule(4) ;
 unschedule
------------
 t
(1 row)

postgres=# select * from cron.job where jobid = 4 ;
 jobid | schedule | command | nodename | nodeport | database | username | active | jobname
-------+----------+---------+----------+----------+----------+----------+--------+---------
(0 rows)

可以在cron.job_run_details表中查看每个job触发时的详细情况:包括执行的命令,命令执行的状态,开始时间和结束时间

image-20221101152059097

在数据库日志里也有相应的体现。

image-20221101152059097

3、疑问

1)如何在其他数据库创建定时任务?

使用cron.schedule()添加定时任务时,我们看到默认是在当前数据库中,现在想要在其他数据库创建怎么办呢?

extension是针对单个数据库来说的,那我们考虑在其他数据库也添加extension试试?

postgres=# \c maleah_db 
You are now connected to database "maleah_db" as user "postgres".
maleah_db=# create extension pg_cron ;
ERROR:  can only create extension in database postgres
DETAIL:  Jobs must be scheduled from the database configured in cron.database_name, since the pg_cron background worker reads job descriptions from this database.
HINT:  Add cron.database_name = 'maleah_db' in postgresql.conf to use the current database.
CONTEXT:  PL/pgSQL function inline_code_block line 4 at RAISE

啊哦!报错。再次查看,发现还有一个函数cron.schedule_in_database()。使用这个函数尝试一下:

cron.schedule_in_database(job_name text, schedule text, command text, database text, username text DEFAULT NULL::text, active b
oolean DEFAULT true)

postgres=# select cron.schedule_in_database('maleah_db cron','* * * * *','insert into cron_test01(id) values((random()*100)::int)','maleah_db','maleah') ;
 schedule_in_database
----------------------
                    7
(1 row)

postgres=# select * from cron.job ;
 jobid | schedule  |                         command                         | nodename  | nodeport | database  | username | active |    jobname
-------+-----------+---------------------------------------------------------+-----------+----------+-----------+----------+--------+----------------
     7 | * * * * * | insert into cron_test01(id) values((random()*100)::int) | localhost |     6000 | maleah_db | maleah   | t      | maleah_db cron
(1 row)

maleah_db=# select * from cron_test01 ;
 id  |          insert_time
-----+-------------------------------
  40 | 2022-11-01 15:35:00.010296+08
  13 | 2022-11-01 15:36:00.015123+08

成功!extension只是打包在一个数据库里,那为什么可以连接到其他数据库也适用呢?

研究原理时发现,在触发job时,pg_cron会通过读取cron.job的元数据,以libpq的方式和数据库server建立连接,执行job中的command命令。这也就可以解释可以连接其他数据库执行对应的命令的原因。

2)定时任务不定时?

修改schedule使其在2022-11-01 16:00:00 执行定时任务

postgres=# update cron.job set schedule = '00 16 1 11 *' ;
UPDATE 1
postgres=# select jobid,jobname,schedule,command,database,username,active from cron.job ;
 jobid |    jobname     |   schedule   |                         command                         | database  | username | active
-------+----------------+--------------+---------------------------------------------------------+-----------+----------+--------
     7 | maleah_db cron | 00 16 1 11 * | insert into cron_test01(id) values((random()*100)::int) | maleah_db | maleah   | t
(1 row)

发现对应的表中未插入数据

原来是pg_cron插件采用GMT时间,如果数据库使用的是CST时区,定时任务的时间需要减去8小时

修改schedule为2022-11-01 08:01:00执行

postgres=# update cron.job set schedule = '01 08 1 11 *' ;
UPDATE 1

再次查看

maleah_db=# select * from cron_test01 ;
 id  |          insert_time
-----+-------------------------------
...
  61 | 2022-11-01 16:01:00.013264+08
...

MogDB/OpenGauss

PKG_SERVICE接口来实现定时任务管理

使用

下面的示例测试每30s从dbe_per.statement取数据插入到另一张表

--# 测试表
openGauss=# \d+ stat_tpcc
                                      Table "public.stat_tpcc"
      Column       |           Type           |  Modifiers  | Storage  | Stats target | Description
-------------------+--------------------------+-------------+----------+--------------+-------------
 version           | text                     | default 302 | extended |              |
 user_name         | name                     |             | plain    |              |
 query             | text                     |             | extended |              |
 n_calls           | bigint                   |             | plain    |              |
 total_elapse_time | bigint                   |             | plain    |              |
 insert_time       | timestamp with time zone |             | plain    |              |
Has OIDs: no
Options: orientation=row, compression=no

--# 创建存储过程
create or replace procedure pro_test
  as 
begin 
  insert into stat_tpcc(node_name,query,n_calls,total_elapse_time,insert_time)
  select node_name,query,n_calls,total_elapse_time,now() as insert_time from dbe_perf.statement;
end;
/

--#1. 创建定时任务:每30s从dbe_perf.statement中收集数据
select pkg_service.job_submit(null, 'call pro_test() ;',sysdate,'sysdate + 1/2880');
### 当创建一个定时任务(JOB)时,系统默认将当前数据库和用户名与当前创建的定时任务绑定起来。

--#2. 查看定时任务运行情况
openGauss=# select insert_time,count(*) from stat_tpcc group by insert_time order by insert_time ;
          insert_time          | count
-------------------------------+-------
 2022-10-28 11:33:50.958302+08 |     8
 2022-10-28 11:34:21.996411+08 |     8
 2022-10-28 11:34:52.03286+08  |     8
 2022-10-28 11:35:22.065866+08 |     8
 2022-10-28 11:35:52.153638+08 |    64

--#3. 查看定时任务运行状态
openGauss=# select job_id,next_run_date,job_status from pg_job ;
 job_id |    next_run_date    | job_status
--------+---------------------+------------
  10743 | 2022-10-28 11:33:50 | s
(1 row)

openGauss=# select * from pg_catalog.pg_job_proc ;
 job_id |       what        | job_name
--------+-------------------+----------
  10743 | call pro_tpcc() ; |
(1 row)

--#4. 修改定时任务:修改启动时间
select pkg_service.job_update(10743,'2022-10-26 15:30:00','sysdate + 1/2880',null);

--#5. 禁用定时任务。job_status - d
select pkg_service.job_finish(10743,true,null);

--#6. 启用定时任务。job_status - s
select pkg_service.job_finish(10743,false,null);
openGauss=# select job_id,next_run_date,job_status from pg_job ;
 job_id |    next_run_date    | job_status
--------+---------------------+------------
  10743 | 4000-01-01 00:00:00 | s
(1 row)
### 注意:如果重新启用任务的时候,没有指定下次运行时间,那么下次运行时间会始终保持在4000年,意味着仍然不会启动,所以如果禁用任务之后再重新启动,需要手动显式指定下次运行时间。

--#7. 删除定时任务
select pkg_service.job_cancel(10743);

定时任务对比

PostgreSQL openGauss/MogDB
方法 pg_cron插件 PKG_SERVICE接口
schedule * * * * * 执行时间next_time+执行间隔interval_time
相关表 cron.job | cron.job_run_details pg_catalog.pg_job | pg_catalog.pg_job_proc
增加定时任务 cron.schedule pkg_service.job_submit
修改定时任务 cron.alter_job pkg_service.job_update
禁用/启用 cron.alter_job pkg_service.job_finish
删除定时任务 cron.unschedule pkg_service.job_cancel

附录

相关函数的使用

pg_cron

cron.schedule

创建一个定时任务

cron.schedule([job_name text,] schedule text, command tex)
alter_job

通过指定job_id来修改定时任务的属性,包括schedule,command,数据库,用户名,active。默认是NULL的表示不修改

cron.alter_job(job_id bigint, schedule text DEFAULT NULL::text, command text DEFAULT NULL::text, database text DEFAU
LT NULL::text, username text DEFAULT NULL::text, active boolean DEFAULT NULL::boolean)
unschedule

删除指定的定时任务。可以指定job_id或者job_name

cron.alter_job([job_id bigint] | [job_name name])
schedule_in_database

其他数据库创建定时任务

cron.schedule_in_database(job_name text, schedule text, command text, database text, username text DEFAULT NULL::text, active boolean DEFAULT true)

PKG_SERVICE

PKG_SERVICE.JOB_SUBMIT

提交一个系统提供的定时任务

PKG_SERVICE.JOB_SUBMIT(
	id IN BIGINT DEFAULT, 
	content IN TEXT, 
	next_date IN TIMESTAMP DEFAULT sysdate, 
	interval_time IN TEXT DEFAULT 'null', 
	job OUT INTEGER);
PKG_SERVICE.JOB_UPDATE

修改定时任务的属性,包括任务内容、下次执行时间、执行间隔

PKG_SERVICE.JOB_UPDATE(
	id IN BIGINT, 
	next_time IN TIMESTAMP, 
	interval_time IN TEXT, 
	content IN TEXT);
PKG_SERVICE.JOB_FINISH

禁用/启用定时任务

PKG_SERVICE.JOB_FINISH(
	id IN BIGINT,
	broken IN BOOLEAN, 
	next_time IN TIMESTAMP DEFAULT sysdate);
PKG_SERVICE.JOB_CANCEL

删除指定的定时任务

PKG_SERVICE.JOB_CANCEL( job IN INTEGER);
最后修改时间:2022-11-21 13:56:27
「喜欢这篇文章,您的关注和赞赏是给作者最好的鼓励」
关注作者
【版权声明】本文为墨天轮用户原创内容,转载时必须标注文章的来源(墨天轮),文章链接,文章作者等基本信息,否则作者和墨天轮有权追究责任。如果您发现墨天轮中有涉嫌抄袭或者侵权的内容,欢迎发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论