暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

Python Celery:自动化测试平台定时任务必备的三方库

程序员杨叔 2024-10-10
421

一、背景

在 Python 的众多优秀库中,Celery 是一个强大的分布式任务队列框架,它能帮助我们轻松地在后台异步执行任务,特别适用于那些需要在特定时间或间隔内运行的任务,比如定时发送邮件、定期数据处理等。

在目前流行的自动化测试平台中,Celery也是必不可少的一环,通常用于开发实现定时运行自动化测试任务。本文将向你展示新手如何一步步创建一个简单的 Celery Demo,帮助理解如何在测试平台中使用 Celery 来完成定时任务的调度。

二、环境准备

2.1 新建Pycharm工程



新建一个项目名称为:celery_test

2.2 安装依赖




Pycharm的Terminal终端下,安装 Celery 和 Redis

    pip install celery redis

    2.3 启动Redis




    Pycharm的Terminal终端下,输入下面的命令启动Redis:

      redis-server

      三、创建 Celery 项目

      3.1 创建 tasks.py




      在你的项目文件夹中创建一个名为 tasks.py 的文件,并写入以下代码:

        from celery import Celery
        import time


        # 创建 Celery 实例
        app = Celery('tasks',
        broker='redis://localhost:6379/0', # 消息代理
        backend='redis://localhost:6379/0') # 结果后端




        # 定义一个简单的任务
        @app.task
        def add(x, y):
        time.sleep(5) # 模拟长时间运行的任务
        return x + y

        上面我们定义了一个名称叫tasks的Celery实例,broker参数指定了 Redis 的连接地址,这里Redis 运行在本地,端口为 6379,使用默认的数据库 0。

        然后定义了一个简单的加法任务,它接受两个参数x和y,并返回它们的和。通过@app.task装饰器,将这个函数标记为一个 Celery 任务。

        3.2 创建 main.py




        在同一目录下创建一个名为 main.py 的文件,内容如下:

          from tasks import add


          if __name__ == '__main__':
          result = add.delay(4, 6) # 异步调用任务
          print('Task submitted!')


          # 等待任务完成并获取结果
          print('Result:', result.get(timeout=10)) # 增加超时时间

          这里的delay方法用于将任务发送到 Celery 队列中进行异步执行。result.get()方法用于获取任务的执行结果,如果任务还没有执行完成,它会阻塞直到任务完成并返回结果。

          四、运行 Celery Worker

          pycharm中新开一个Terminal窗口,启动一个 Celery worker 进程,以便处理任务:

            celery -A tasks worker --loglevel=info

            五、运行主程序

            pycharm中再新开一个Terminal窗口,运行 main.py 文件:

              python main.py

              但是这里你会发现,运行后报错:



              原因是win10上运行celery就会出现这个问题,开启任务队列一切正常(显示ready提示),但一旦接受任务,就报ValueError: not enough values to unpack (expected 3, got 0)错误。

              六、报错解决方案

              6.1 安装一个扩展 eventlet:



                pip install eventlet

                6.2 启动worker的时候加一个参数-P eventlet



                  pip install eventlet

                  6.3 再运行main.py



                    python main.py

                    运行成功,先出现“Task submitted!”,5秒后出现计算结果“Result: 10”:

                    七、定时任务调度

                    如果你想要实现定时任务,可以使用 Celery Beat 来定义周期性任务。

                    7.1 修改 tasks.py




                    添加一个定时任务方法scheduled_task:

                      from celery import Celery
                      import time


                      # 创建 Celery 实例
                      app = Celery('tasks',
                      broker='redis://localhost:6379/0', # 消息代理
                      backend='redis://localhost:6379/0') # 结果后端




                      # 定义一个简单的任务
                      @app.task
                      def add(x, y):
                      time.sleep(5) # 模拟长时间运行的任务
                      return x + y




                      # 定义一个定时打印一句话的任务
                      @app.task
                      def scheduled_task():
                      print("Scheduled task executed!")

                      7.2 创建 celerybeat_schedule.py




                      为了设置定时任务,创建一个名为 celerybeat_schedule.py 的文件,内容如下:

                        from tasks import app  # 从 tasks.py 导入现有的 Celery 实例
                        from celery.schedules import crontab


                        # 创建Celery Beat 的配置字典,定义定时任务的调度计划
                        app.conf.beat_schedule = {
                        'execute-every-10-seconds': {
                        'task': 'tasks.scheduled_task', # 调用 tasks 模块中的 scheduled_task 函数
                        'schedule': 10.0, # 每 10 秒执行一次
                        },
                        }

                        7.3 重新启动Celery worker并启动Celery Beat




                        由于新增了方法scheduled_task,因此需要在原来运行Celery worker的Terminal窗口,按键Ctrl+C结束原来运行的Celery worker,然后重新启动worker:

                          celery -A tasks worker -l info -P eventlet

                          然后新开一个Terminal窗口,启动Celery Beat,开始调度你的定时任务:

                            celery -A celerybeat_schedule beat --loglevel=info

                            在Celery Beat窗口,你会看到每10秒都会执行一次任务调度:



                            然后在Celery Worker窗口,你会看到对应的每隔10秒都会正确调用我们上面写的"scheduled_task"方法,打印文案"Scheduled task executed!":

                            八、每天指定时间任务

                            8.1 tasks.py文件中,新增一个任务daily_task:



                              from celery import Celery
                              import time


                              # 创建 Celery 实例
                              app = Celery('tasks',
                              broker='redis://localhost:6379/0', # 消息代理
                              backend='redis://localhost:6379/0') # 结果后端




                              # 定义一个简单的任务
                              @app.task
                              def add(x, y):
                              time.sleep(5) # 模拟长时间运行的任务
                              return x + y




                              # 定义一个定时打印一句话的任务
                              @app.task
                              def scheduled_task():
                              print("Scheduled task executed!")




                              # 定义一个每天指定时间触发打印的任务
                              @app.task
                              def daily_task():
                              print("Daily task executed at 14:30!")

                              这个任务就是单纯的打印一句话 “Daily task executed at 14:30!”,检验是否定时触发打印。

                              8.2 celerybeat_schedule.py文件中新增crontab定时任务:




                                from tasks import app  # 从 tasks.py 导入现有的 Celery 实例
                                from celery.schedules import crontab


                                # 创建Celery Beat 的配置字典,定义定时任务的调度计划
                                app.conf.beat_schedule = {
                                # 'execute-every-10-seconds': {
                                # 'task': 'tasks.scheduled_task', # 调用 tasks 模块中的 scheduled_task 函数
                                # 'schedule': 10.0, # 每 10 秒执行一次
                                # },
                                'execute-daily-at-14-30pm': {
                                'task': 'tasks.daily_task', # 指定每天执行的任务
                                'schedule': crontab(hour=14, minute=30), # 每天 14:30 执行
                                },
                                }

                                8.3 重启Celery worker和Celery Beat:




                                  celery -A tasks worker -l info -P eventlet
                                    celery -A celerybeat_schedule beat --loglevel=info

                                    然后等时间到了14:30的时候,你会发现,并没有按照预期的效果调用"daily_task",输出 “Daily task executed at 14:30!”。

                                    8.4 解决问题:




                                    原因是因为celery默认使用的是UTC时区,我们需要将时区设置我们中国使用的东八区的时间:

                                      # 设置时区
                                      app.conf.timezone = 'Asia/Shanghai'
                                      app.conf.enable_utc = False

                                      在tasks.py文件中,增加时区设置代码:

                                        from celery import Celery
                                        import time


                                        # 创建 Celery 实例
                                        app = Celery('tasks',
                                        broker='redis://localhost:6379/0', # 消息代理
                                        backend='redis://localhost:6379/0') # 结果后端


                                        # 设置时区为 Asia/Shanghai
                                        app.conf.timezone = 'Asia/Shanghai'
                                        app.conf.enable_utc = False # 如果不使用 UTC,可以将此设置为 False




                                        # 定义一个简单的任务
                                        @app.task
                                        def add(x, y):
                                        time.sleep(5) # 模拟长时间运行的任务
                                        return x + y




                                        # 定义一个定时打印一句话的任务
                                        @app.task
                                        def scheduled_task():
                                        print("Scheduled task executed!")




                                        # 定义一个每天指定时间触发打印的任务
                                        @app.task
                                        def daily_task():
                                        print("Daily task executed at 14:30!")

                                        celerybeat_schedule.py文件中,增加时区设置代码,并将时间指定为未来的一个时间点:

                                          from tasks import app  # 从 tasks.py 导入现有的 Celery 实例
                                          from celery.schedules import crontab


                                          # 设置时区
                                          app.conf.timezone = 'Asia/Shanghai'
                                          app.conf.enable_utc = False


                                          # 创建Celery Beat 的配置字典,定义定时任务的调度计划
                                          app.conf.beat_schedule = {
                                          # 'execute-every-10-seconds': {
                                          # 'task': 'tasks.scheduled_task', # 调用 tasks 模块中的 scheduled_task 函数
                                          # 'schedule': 10.0, # 每 10 秒执行一次
                                          # },
                                          'execute-daily-at-14-30pm': {
                                          'task': 'tasks.daily_task', # 指定每天执行的任务
                                          'schedule': crontab(hour=14, minute=30), # 每天 14:30 执行
                                          },
                                          }

                                          再次重启Celery worker和Celery Beat,会发现在指定的时间点定时触发了任务:


                                          8.5 crontab表达式说明:




                                          顺带说一下crontab表达式接受的入参格式,方便大家可以自由进行各类设置:

                                            minute 表示分钟,接收整数或者整数列表,范围在0-59,或者字符串表示配置的时间模式
                                            hour 表示小时,接收整数或者整数列表,范围在0-23,或者接收字符串表示配置的时间模式
                                            day_of_week 表示周几,接收整数或者整数列表,范围在0-6,其中周日是0,周六是6,或者接收字符串表示配置的时间模式
                                            day_of_month 表示一个月的第几天,接收整数或者整数列表,范围在1-31,或者接收字符串表示配置的时间模式
                                            month_of_year 表示一年的第几个月,接收整数或者整数列表,范围在1-12,或者接收字符串表示配置的时间模式

                                            常用示例:指定某一些分钟,比如分别在 14点15分,14点30分,14点45分钟分别执行一次,可以如下操作:

                                              crontab(minute="15,30,45", hour=14)

                                              14点之内,每隔5分钟执行一次函数,可以如下操作:

                                                crontab(minute="*/5", hour=14)

                                                指定0点,8点,12点,18点的零分执行一次:

                                                  crontab(minute=0, hour="0,8,12,18")

                                                  每个小时执行一次:

                                                    crontab(minute=0, hour="*/1")

                                                    每2个小时执行一次:

                                                      crontab(minute=0, hour="*/2")

                                                      周一,周三,周五三天的零点执行一次:

                                                        crontab(minute=0, hour=0, day_of_week="1,3,5")

                                                        每个月的1号,5号,10号的零点执行一次:

                                                          crontab(minute=0, hour=0, day_of_month="1,5,10")

                                                          九、总结

                                                          总结一下,本篇文章主要讲解了如何使用从零到一,使用Python Celery库,实现按指定时间间隔(每隔10秒钟)调用某一个方法,以及按指定时间(每天的14:30)调用某一个方法。主要内容包括:

                                                          • 环境准备

                                                          • 创建项目

                                                          • 运行 Celery Worker

                                                          • 运行主程序

                                                          • 主程序报错解决

                                                          • 创建按时间间隔调度的定时任务

                                                          • 创建按每天指定时间调度的定时任务

                                                          • 按指定时间调度的定时任务不生效问题解决

                                                          • crontab表达式常用说明



                                                          END



                                                          以上就是本次的全部内容,如果对你有帮助,麻烦点赞+分享,你的支持就是作者更新最大的动力!

                                                          欢迎加入杨叔的测试交流群,沟通交流日常测试工作相关内容,2024一起升职加薪,学习进步!进群方式:扫下方二维码添加杨叔的微信号,备注:进群


                                                          往期精彩文章推荐





                                                          文章转载自程序员杨叔,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

                                                          评论