暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

Python分布式任务框架Celery的定时任务实现

稻壳编程 2020-05-21
511

作者简介

211工程院校贵州大学管理学院硕士研究生、互联网金融行业资深DevOps研发工程师. 曾在国内多家知名互联网公司 平安科技、微众银行、顺丰科技、魅族任职. 具有多年国内一线互联网公司自动化运维平台设计与开发经验。

 1 

Celey beat简介

Celery beat是一个可以用来在规律的时间间隔在集群中的worker节点上执行任务的调度器。所有的定时任务都要配置在变量参数CELERYBEAT_SCHEDULE中,也可以把定时任务配置在数据库中,在本文中我们使用变量的方式来存储定时任务。默认情况下,默认周期性的任务使用的时区是UTC,可以通过修改Celery的配置文件,通过CELERY_TIMEZONE变量来更改时区。

例如: CELERY_TIMEZONE=’Asia/Shanghai’

Celery定时任务使用的计时器有两种分别为timedelta和crontab。

 2 

第一个Celery beat例子

2.1 代码结构:

2.2 __init__.py

import time
from celery import Celery, platforms
broker = 'redis://127.0.0.1:16379'
backend = 'redis://127.0.0.1:16379'
platforms.C_FORCE_ROOT = True
#允许在root下运行
app = Celery(__file__, broker=broker, backend=backend)
app.config_from_object('celery_app.config’)

2.3 task1.py

from celery_app import app
import time
@app.task(name='celery_app.task1.add')
def add(x, y):
time.sleep(5)
return x + y

2.4 task2.py

from celery_app import app
@app.task(name='celery_app.task2.say')
def say():
return 'hello world'


2.5 timedelta计时器

定义每隔多少秒、分、小时、天执行一次任务,如果设置timedelta(seconds=30),意味着该任务每隔30秒执行一次,当Celery beats启动后第一个任务将在30秒后执行。

2.5.1 示例代码

config.py

from datetime import timedelta
CELERY_TIMEZONE='Asia/Shanghai'
CELERY_IMPORTS = (
'celery_app.task1',
'celery_app.task2'
)
CELERYBEAT_SCHEDULE = {
"sched-every-1-seconds": {
"task": "celery_app.task1.add",
"schedule": timedelta(seconds=1),
"args":(1, 2),
},
"sched-every-2-seconds": {
"task": "celery_app.task2.say",
"schedule": timedelta(seconds=2),
"args":(),
},
}

注: CELERYBEAT_SCHEDULE参数配置:

sched-every-1-seconds: 为欲被调度的定时任务设置起个唯一的名字。

task:指定一个要调度的任务

schedule: 指定计时器类型, timedelta或crontab

args: 调用函数时需传入的位置参数,参数类型为list或tuple

kwargs: 调用函数时需传入的关键字参数,参数类型为dict

2.5.2 启动worker实例

celery -B -A celery_app worker --loglevel=info

注:celery beat将会存储定时任务数据数据在本地数据库文件中,默认情况下会当前路径下创建文件名为celerybeat-schedule的数据库文件,如果想要将这个数据库文件保存到指定位置,可以在启动celery beat自动时加入-s参数。

worker实例启动后,celery会打印出如下输出:

2.6 crontab定时器

如果想要更加精确的控制定时任务调度,比如定义任务在指定的周、天、小时、分钟时,可以使用Crontab调度器。

2.6.1 示例代码:

from celery.schedules import crontab
CELERYBEAT_SCHEDULE = {
# Executes every Monday morning at 7:30 A.M
'add-every-monday-morning': {
'task': 'tasks.add',
'schedule': crontab(hour=7, minute=30, day_of_week=1),
'args': (16, 16),
},
}

2.6.2 crontab的参数如下表所示:

crontab()

每分钟执行

crontab(minute=0, hour=0)

每天凌晨12点执行

crontab(minute=0, hour='*/3')

从凌晨12点开始,每隔3小时执行一次
crontab(minute='*/15')每隔15分钟执行一次 


 3 

结束语

如果本文可以对您的工作学习带来帮助,请扫描左侧赞赏码以资鼓励作者;文章勘误请扫描右侧二维码联系作者。

文章转载自稻壳编程,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论