
一、背景

在 Python 的众多优秀库中,Celery 是一个强大的分布式任务队列框架,它能帮助我们轻松地在后台异步执行任务,特别适用于那些需要在特定时间或间隔内运行的任务,比如定时发送邮件、定期数据处理等。
在目前流行的自动化测试平台中,Celery也是必不可少的一环,通常用于开发实现定时运行自动化测试任务。本文将向你展示新手如何一步步创建一个简单的 Celery Demo,帮助理解如何在测试平台中使用 Celery 来完成定时任务的调度。

二、环境准备

2.1 新建Pycharm工程
新建一个项目名称为:celery_test
2.2 安装依赖
Pycharm的Terminal终端下,安装 Celery 和 Redis
pip install celery redis

2.3 启动Redis
Pycharm的Terminal终端下,输入下面的命令启动Redis:
redis-server


三、创建 Celery 项目

3.1 创建 tasks.py
在你的项目文件夹中创建一个名为 tasks.py 的文件,并写入以下代码:
from celery import Celeryimport time# 创建 Celery 实例app = Celery('tasks',broker='redis://localhost:6379/0', # 消息代理backend='redis://localhost:6379/0') # 结果后端# 定义一个简单的任务@app.taskdef add(x, y):time.sleep(5) # 模拟长时间运行的任务return x + y
上面我们定义了一个名称叫tasks的Celery实例,broker参数指定了 Redis 的连接地址,这里Redis 运行在本地,端口为 6379,使用默认的数据库 0。
然后定义了一个简单的加法任务,它接受两个参数x和y,并返回它们的和。通过@app.task装饰器,将这个函数标记为一个 Celery 任务。
3.2 创建 main.py
在同一目录下创建一个名为 main.py 的文件,内容如下:
from tasks import addif __name__ == '__main__':result = add.delay(4, 6) # 异步调用任务print('Task submitted!')# 等待任务完成并获取结果print('Result:', result.get(timeout=10)) # 增加超时时间
这里的delay方法用于将任务发送到 Celery 队列中进行异步执行。result.get()方法用于获取任务的执行结果,如果任务还没有执行完成,它会阻塞直到任务完成并返回结果。

四、运行 Celery Worker

pycharm中新开一个Terminal窗口,启动一个 Celery worker 进程,以便处理任务:
celery -A tasks worker --loglevel=info


五、运行主程序

pycharm中再新开一个Terminal窗口,运行 main.py 文件:
python main.py
但是这里你会发现,运行后报错:
原因是win10上运行celery就会出现这个问题,开启任务队列一切正常(显示ready提示),但一旦接受任务,就报ValueError: not enough values to unpack (expected 3, got 0)错误。

六、报错解决方案

6.1 安装一个扩展 eventlet:
pip install eventlet
6.2 启动worker的时候加一个参数-P eventlet
pip install eventlet

6.3 再运行main.py
python main.py
运行成功,先出现“Task submitted!”,5秒后出现计算结果“Result: 10”:


七、定时任务调度

如果你想要实现定时任务,可以使用 Celery Beat 来定义周期性任务。
7.1 修改 tasks.py
添加一个定时任务方法scheduled_task:
from celery import Celeryimport time# 创建 Celery 实例app = Celery('tasks',broker='redis://localhost:6379/0', # 消息代理backend='redis://localhost:6379/0') # 结果后端# 定义一个简单的任务@app.taskdef add(x, y):time.sleep(5) # 模拟长时间运行的任务return x + y# 定义一个定时打印一句话的任务@app.taskdef scheduled_task():print("Scheduled task executed!")
7.2 创建 celerybeat_schedule.py
为了设置定时任务,创建一个名为 celerybeat_schedule.py 的文件,内容如下:
from tasks import app # 从 tasks.py 导入现有的 Celery 实例from celery.schedules import crontab# 创建Celery Beat 的配置字典,定义定时任务的调度计划app.conf.beat_schedule = {'execute-every-10-seconds': {'task': 'tasks.scheduled_task', # 调用 tasks 模块中的 scheduled_task 函数'schedule': 10.0, # 每 10 秒执行一次},}
7.3 重新启动Celery worker并启动Celery Beat
由于新增了方法scheduled_task,因此需要在原来运行Celery worker的Terminal窗口,按键Ctrl+C结束原来运行的Celery worker,然后重新启动worker:
celery -A tasks worker -l info -P eventlet
然后新开一个Terminal窗口,启动Celery Beat,开始调度你的定时任务:
celery -A celerybeat_schedule beat --loglevel=info
在Celery Beat窗口,你会看到每10秒都会执行一次任务调度:
然后在Celery Worker窗口,你会看到对应的每隔10秒都会正确调用我们上面写的"scheduled_task"方法,打印文案"Scheduled task executed!":

八、每天指定时间任务

8.1 tasks.py文件中,新增一个任务daily_task:
from celery import Celeryimport time# 创建 Celery 实例app = Celery('tasks',broker='redis://localhost:6379/0', # 消息代理backend='redis://localhost:6379/0') # 结果后端# 定义一个简单的任务@app.taskdef add(x, y):time.sleep(5) # 模拟长时间运行的任务return x + y# 定义一个定时打印一句话的任务@app.taskdef scheduled_task():print("Scheduled task executed!")# 定义一个每天指定时间触发打印的任务@app.taskdef daily_task():print("Daily task executed at 14:30!")
这个任务就是单纯的打印一句话 “Daily task executed at 14:30!”,检验是否定时触发打印。
8.2 celerybeat_schedule.py文件中新增crontab定时任务:
from tasks import app # 从 tasks.py 导入现有的 Celery 实例from celery.schedules import crontab# 创建Celery Beat 的配置字典,定义定时任务的调度计划app.conf.beat_schedule = {# 'execute-every-10-seconds': {# 'task': 'tasks.scheduled_task', # 调用 tasks 模块中的 scheduled_task 函数# 'schedule': 10.0, # 每 10 秒执行一次# },'execute-daily-at-14-30pm': {'task': 'tasks.daily_task', # 指定每天执行的任务'schedule': crontab(hour=14, minute=30), # 每天 14:30 执行},}
8.3 重启Celery worker和Celery Beat:
celery -A tasks worker -l info -P eventlet
celery -A celerybeat_schedule beat --loglevel=info
然后等时间到了14:30的时候,你会发现,并没有按照预期的效果调用"daily_task",输出 “Daily task executed at 14:30!”。
8.4 解决问题:
原因是因为celery默认使用的是UTC时区,我们需要将时区设置我们中国使用的东八区的时间:
# 设置时区app.conf.timezone = 'Asia/Shanghai'app.conf.enable_utc = False
在tasks.py文件中,增加时区设置代码:
from celery import Celeryimport time# 创建 Celery 实例app = Celery('tasks',broker='redis://localhost:6379/0', # 消息代理backend='redis://localhost:6379/0') # 结果后端# 设置时区为 Asia/Shanghaiapp.conf.timezone = 'Asia/Shanghai'app.conf.enable_utc = False # 如果不使用 UTC,可以将此设置为 False# 定义一个简单的任务@app.taskdef add(x, y):time.sleep(5) # 模拟长时间运行的任务return x + y# 定义一个定时打印一句话的任务@app.taskdef scheduled_task():print("Scheduled task executed!")# 定义一个每天指定时间触发打印的任务@app.taskdef daily_task():print("Daily task executed at 14:30!")
celerybeat_schedule.py文件中,增加时区设置代码,并将时间指定为未来的一个时间点:
from tasks import app # 从 tasks.py 导入现有的 Celery 实例from celery.schedules import crontab# 设置时区app.conf.timezone = 'Asia/Shanghai'app.conf.enable_utc = False# 创建Celery Beat 的配置字典,定义定时任务的调度计划app.conf.beat_schedule = {# 'execute-every-10-seconds': {# 'task': 'tasks.scheduled_task', # 调用 tasks 模块中的 scheduled_task 函数# 'schedule': 10.0, # 每 10 秒执行一次# },'execute-daily-at-14-30pm': {'task': 'tasks.daily_task', # 指定每天执行的任务'schedule': crontab(hour=14, minute=30), # 每天 14:30 执行},}
再次重启Celery worker和Celery Beat,会发现在指定的时间点定时触发了任务:


8.5 crontab表达式说明:
顺带说一下crontab表达式接受的入参格式,方便大家可以自由进行各类设置:
minute 表示分钟,接收整数或者整数列表,范围在0-59,或者字符串表示配置的时间模式hour 表示小时,接收整数或者整数列表,范围在0-23,或者接收字符串表示配置的时间模式day_of_week 表示周几,接收整数或者整数列表,范围在0-6,其中周日是0,周六是6,或者接收字符串表示配置的时间模式day_of_month 表示一个月的第几天,接收整数或者整数列表,范围在1-31,或者接收字符串表示配置的时间模式month_of_year 表示一年的第几个月,接收整数或者整数列表,范围在1-12,或者接收字符串表示配置的时间模式
常用示例:指定某一些分钟,比如分别在 14点15分,14点30分,14点45分钟分别执行一次,可以如下操作:
crontab(minute="15,30,45", hour=14)
14点之内,每隔5分钟执行一次函数,可以如下操作:
crontab(minute="*/5", hour=14)
指定0点,8点,12点,18点的零分执行一次:
crontab(minute=0, hour="0,8,12,18")
每个小时执行一次:
crontab(minute=0, hour="*/1")
每2个小时执行一次:
crontab(minute=0, hour="*/2")
周一,周三,周五三天的零点执行一次:
crontab(minute=0, hour=0, day_of_week="1,3,5")
每个月的1号,5号,10号的零点执行一次:
crontab(minute=0, hour=0, day_of_month="1,5,10")

九、总结

总结一下,本篇文章主要讲解了如何使用从零到一,使用Python Celery库,实现按指定时间间隔(每隔10秒钟)调用某一个方法,以及按指定时间(每天的14:30)调用某一个方法。主要内容包括:
环境准备
创建项目
运行 Celery Worker
运行主程序
主程序报错解决
创建按时间间隔调度的定时任务
创建按每天指定时间调度的定时任务
按指定时间调度的定时任务不生效问题解决
crontab表达式常用说明
END
以上就是本次的全部内容,如果对你有帮助,麻烦点赞+分享,你的支持就是作者更新最大的动力!
欢迎加入杨叔的测试交流群,沟通交流日常测试工作相关内容,2024一起升职加薪,学习进步!进群方式:扫下方二维码添加杨叔的微信号,备注:进群

往期精彩文章推荐




