暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

Django:通过「Celery」实现「异步与定时任务」

Nephilim 2024-06-11
235

Tips:一些记录,一些笔记



2024/06/10

MONDAY

To live is to function. That is all there is in living.

活着就要发挥作用,这就是生活的全部内容。




01

通过PIP安装相关的依赖包

Celery的官方网站:

https://docs.celeryq.dev/en/stable/



通过PIP安装相关的依赖包:

    (venv) (base) adamhuan@Leviathan django_daily_media % pip install "celery[redis,auth,msgpack]"
    Looking in indexes: https://mirrors.aliyun.com/pypi/simple/
    Collecting celery[auth,msgpack,redis]
    Downloading https://mirrors.aliyun.com/pypi/packages/90/c4/6a4d3772e5407622feb93dd25c86ce3c0fee746fa822a777a627d56b4f2a/celery-5.4.0-py3-none-any.whl (425 kB)
    ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 426.0/426.0 kB 776.0 kB/s eta 0:00:00
    Collecting billiard<5.0,>=4.2.0 (from celery[auth,msgpack,redis])
    Downloading https://mirrors.aliyun.com/pypi/packages/50/8d/6e9fdeeab04d803abc5a715175f87e88893934d5590595eacff23ca12b07/billiard-4.2.0-py3-none-any.whl (86 kB)
    ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 86.7/86.7 kB 840.3 kB/s eta 0:00:00
    Collecting kombu<6.0,>=5.3.4 (from celery[auth,msgpack,redis])
    Downloading https://mirrors.aliyun.com/pypi/packages/b4/9a/1951f2261271d6994f0df5a55b3e9cdad42ed1fc3020a0dc7f6de80a4566/kombu-5.3.7-py3-none-any.whl (200 kB)
    ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 200.2/200.2 kB 771.2 kB/s eta 0:00:00
    Collecting vine<6.0,>=5.1.0 (from celery[auth,msgpack,redis])
    Downloading https://mirrors.aliyun.com/pypi/packages/03/ff/7c0c86c43b3cbb927e0ccc0255cb4057ceba4799cd44ae95174ce8e8b5b2/vine-5.1.0-py3-none-any.whl (9.6 kB)
    Collecting click<9.0,>=8.1.2 (from celery[auth,msgpack,redis])
    Downloading https://mirrors.aliyun.com/pypi/packages/00/2e/d53fa4befbf2cfa713304affc7ca780ce4fc1fd8710527771b58311a3229/click-8.1.7-py3-none-any.whl (97 kB)
    ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 97.9/97.9 kB 777.2 kB/s eta 0:00:00
    Collecting click-didyoumean>=0.3.0 (from celery[auth,msgpack,redis])
    Downloading https://mirrors.aliyun.com/pypi/packages/1b/5b/974430b5ffdb7a4f1941d13d83c64a0395114503cc357c6b9ae4ce5047ed/click_didyoumean-0.3.1-py3-none-any.whl (3.6 kB)
    Collecting click-repl>=0.2.0 (from celery[auth,msgpack,redis])
    Downloading https://mirrors.aliyun.com/pypi/packages/52/40/9d857001228658f0d59e97ebd4c346fe73e138c6de1bce61dc568a57c7f8/click_repl-0.3.0-py3-none-any.whl (10 kB)
    Collecting click-plugins>=1.1.1 (from celery[auth,msgpack,redis])
    Downloading https://mirrors.aliyun.com/pypi/packages/e9/da/824b92d9942f4e472702488857914bdd50f73021efea15b4cad9aca8ecef/click_plugins-1.1.1-py2.py3-none-any.whl (7.5 kB)
    Collecting tzdata>=2022.7 (from celery[auth,msgpack,redis])
    Downloading https://mirrors.aliyun.com/pypi/packages/65/58/f9c9e6be752e9fcb8b6a0ee9fb87e6e7a1f6bcab2cdc73f02bb7ba91ada0/tzdata-2024.1-py2.py3-none-any.whl (345 kB)
    ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 345.4/345.4 kB 848.9 kB/s eta 0:00:00
    Collecting python-dateutil>=2.8.2 (from celery[auth,msgpack,redis])
    Downloading https://mirrors.aliyun.com/pypi/packages/ec/57/56b9bcc3c9c6a792fcbaf139543cee77261f3651ca9da0c93f5c1221264b/python_dateutil-2.9.0.post0-py2.py3-none-any.whl (229 kB)
    ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 229.9/229.9 kB 758.9 kB/s eta 0:00:00
    Collecting redis!=4.5.5,<6.0.0,>=4.5.2 (from celery[auth,msgpack,redis])
    Downloading https://mirrors.aliyun.com/pypi/packages/d8/c4/a6e54d8139d2806a105cfe444c9cb1666591cd313875b76495eea20fa92b/redis-5.0.5-py3-none-any.whl (251 kB)
    ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 252.0/252.0 kB 773.0 kB/s eta 0:00:00
    Collecting msgpack==1.0.8 (from celery[auth,msgpack,redis])
    Downloading https://mirrors.aliyun.com/pypi/packages/ba/13/d000e53b067aee19d57a4f26d5bffed7890e6896538ac5f97605b0f64985/msgpack-1.0.8-cp310-cp310-macosx_11_0_arm64.whl (84 kB)
    ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 84.9/84.9 kB 828.2 kB/s eta 0:00:00
    Collecting cryptography==42.0.5 (from celery[auth,msgpack,redis])
    Downloading https://mirrors.aliyun.com/pypi/packages/6d/4d/f7c14c7a49e35df829e04d451a57b843208be7442c8e087250c195775be1/cryptography-42.0.5-cp39-abi3-macosx_10_12_universal2.whl (5.9 MB)
    ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 5.9/5.9 MB 794.9 kB/s eta 0:00:00
    Requirement already satisfied: cffi>=1.12 in ./venv/lib/python3.10/site-packages (from cryptography==42.0.5->celery[auth,msgpack,redis]) (1.16.0)
    Collecting prompt-toolkit>=3.0.36 (from click-repl>=0.2.0->celery[auth,msgpack,redis])
    Downloading https://mirrors.aliyun.com/pypi/packages/e8/23/22750c4b768f09386d1c3cc4337953e8936f48a888fa6dddfb669b2c9088/prompt_toolkit-3.0.47-py3-none-any.whl (386 kB)
    ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 386.4/386.4 kB 844.0 kB/s eta 0:00:00
    Collecting amqp<6.0.0,>=5.1.1 (from kombu<6.0,>=5.3.4->celery[auth,msgpack,redis])
    Downloading https://mirrors.aliyun.com/pypi/packages/b3/f0/8e5be5d5e0653d9e1d02b1144efa33ff7d2963dfad07049e02c0fa9b2e8d/amqp-5.2.0-py3-none-any.whl (50 kB)
    ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 50.9/50.9 kB 823.6 kB/s eta 0:00:00
    Requirement already satisfied: six>=1.5 in ./venv/lib/python3.10/site-packages (from python-dateutil>=2.8.2->celery[auth,msgpack,redis]) (1.16.0)
    Collecting async-timeout>=4.0.3 (from redis!=4.5.5,<6.0.0,>=4.5.2->celery[auth,msgpack,redis])
    Downloading https://mirrors.aliyun.com/pypi/packages/a7/fa/e01228c2938de91d47b307831c62ab9e4001e747789d0b05baf779a6488c/async_timeout-4.0.3-py3-none-any.whl (5.7 kB)
    Requirement already satisfied: pycparser in ./venv/lib/python3.10/site-packages (from cffi>=1.12->cryptography==42.0.5->celery[auth,msgpack,redis]) (2.22)
    Collecting wcwidth (from prompt-toolkit>=3.0.36->click-repl>=0.2.0->celery[auth,msgpack,redis])
    Downloading https://mirrors.aliyun.com/pypi/packages/fd/84/fd2ba7aafacbad3c4201d395674fc6348826569da3c0937e75505ead3528/wcwidth-0.2.13-py2.py3-none-any.whl (34 kB)
    Installing collected packages: wcwidth, vine, tzdata, python-dateutil, prompt-toolkit, msgpack, click, billiard, async-timeout, redis, cryptography, click-repl, click-plugins, click-didyoumean, amqp, kombu, celery
    Attempting uninstall: cryptography
    Found existing installation: cryptography 42.0.7
    Uninstalling cryptography-42.0.7:
    Successfully uninstalled cryptography-42.0.7
    ERROR: pip's dependency resolver does not currently take into account all the packages that are installed. This behaviour is the source of the following dependency conflicts.
    social-auth-core 4.5.4 requires PyJWT>=2.7.0, but you have pyjwt 1.7.1 which is incompatible.
    Successfully installed amqp-5.2.0 async-timeout-4.0.3 billiard-4.2.0 celery-5.4.0 click-8.1.7 click-didyoumean-0.3.1 click-plugins-1.1.1 click-repl-0.3.0 cryptography-42.0.5 kombu-5.3.7 msgpack-1.0.8 prompt-toolkit-3.0.47 python-dateutil-2.9.0.post0 redis-5.0.5 tzdata-2024.1 vine-5.1.0 wcwidth-0.2.13
    (venv) (base) adamhuan@Leviathan django_daily_media %
    (venv) (base) adamhuan@Leviathan django_daily_media % pip list | grep celery
    celery 5.4.0
    (venv) (base) adamhuan@Leviathan django_daily_media %


    02

    代码

    代码:


    文件「tasks.py」

      1  # !/usr/bin/env python
      # -*- coding: UTF-8 -*-


      # ——————————————————
      # 脚本说明:
      # xxxxxxxxx
      # ——————————————————


      # ========================================
      # 开始
      # ))))))))) 模块包导入
      from celery import Celery


      # ))))))))) 类名
      app = Celery(
      'tasks',
      backend='redis://127.0.0.1:6379',
      broker='redis://127.0.0.1:6379',
      )


      @app.task
      def add(x, y):
      return x + y


      # ))))))))) 执行阶段


      # ))))))))) 结束


      # ========================================
      # 结束



      文件「run_tasks.py」

        #!/usr/bin/env python
        # -*- coding: UTF-8 -*-


        # ——————————————————
        # 脚本说明:
        # xxxxxxxxx
        # ——————————————————


        # ========================================
        # 开始
        # ))))))))) 模块包导入
        from tasks import add


        # ))))))))) 类名
        result = add.delay(4,4)
        print(f"任务是否就绪:{result.ready()}")


        run_result = result.get(timeout=1)
        print(f"任务结果:{run_result}")
        # ))))))))) 执行阶段


        # ))))))))) 结束


        # ========================================
        # 结束



        03

        运行


        启动Celery:

          (venv) (base) adamhuan@Leviathan django_daily_media % pwd
          /Users/adamhuan/PycharmProjects/django_daily_media
          (venv) (base) adamhuan@Leviathan django_daily_media %
          (venv) (base) adamhuan@Leviathan django_daily_media % cd celery
          (venv) (base) adamhuan@Leviathan celery %
          (venv) (base) adamhuan@Leviathan celery % celery -A tasks worker -l INFO

          -------------- celery@Leviathan.local v5.4.0 (opalescent)
          --- ***** -----
          -- ******* ---- macOS-14.4-arm64-arm-64bit 2024-06-10 22:35:51
          - *** --- * ---
          - ** ---------- [config]
          - ** ---------- .> app: tasks:0x101fbb340
          - ** ---------- .> transport: redis://127.0.0.1:6379//
          - ** ---------- .> results: redis://127.0.0.1:6379/
          - *** --- * --- .> concurrency: 10 (prefork)
          -- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
          --- ***** -----
          -------------- [queues]
          .> celery exchange=celery(direct) key=celery



          [tasks]
          . tasks.add


          [2024-06-10 22:35:51,264: WARNING/MainProcess] Users/adamhuan/PycharmProjects/django_daily_media/venv/lib/python3.10/site-packages/celery/worker/consumer/consumer.py:508: CPendingDeprecationWarning: The broker_connection_retry configuration setting will no longer determine
          whether broker connection retries are made during startup in Celery 6.0 and above.
          If you wish to retain the existing behavior for retrying connections on startup,
          you should set broker_connection_retry_on_startup to True.
          warnings.warn(


          [2024-06-10 22:35:51,277: INFO/MainProcess] Connected to redis://127.0.0.1:6379//
          [2024-06-10 22:35:51,277: WARNING/MainProcess] Users/adamhuan/PycharmProjects/django_daily_media/venv/lib/python3.10/site-packages/celery/worker/consumer/consumer.py:508: CPendingDeprecationWarning: The broker_connection_retry configuration setting will no longer determine
          whether broker connection retries are made during startup in Celery 6.0 and above.
          If you wish to retain the existing behavior for retrying connections on startup,
          you should set broker_connection_retry_on_startup to True.
          warnings.warn(


          [2024-06-10 22:35:51,279: INFO/MainProcess] mingle: searching for neighbors
          [2024-06-10 22:35:52,285: INFO/MainProcess] mingle: all alone
          [2024-06-10 22:35:52,296: INFO/MainProcess] celery@Leviathan.local ready.





          运行前面的「run_tasks.py」

            (venv) (base) adamhuan@Leviathan django_daily_media % cd celery 
            (venv) (base) adamhuan@Leviathan celery %
            (venv) (base) adamhuan@Leviathan celery % python run_tasks.py
            任务是否就绪:False
            任务结果:8
            (venv) (base) adamhuan@Leviathan celery %



            Celery的日志输出:

              [2024-06-10 22:38:11,938: INFO/MainProcess] Task tasks.add[02d50b42-67d1-4cce-947e-17ae9ec8c5fd] received
              [2024-06-10 22:38:11,945: INFO/ForkPoolWorker-8] Task tasks.add[02d50b42-67d1-4cce-947e-17ae9ec8c5fd] succeeded in 0.00490104197524488s: 8



              以上,是Celery的最基本的使用。


              04

              在Django中使用Celery


              官方给出了如何在Django中使用Celery的方法:

              https://docs.celeryq.dev/en/stable/django/index.html


              下面参考官方文档,在我的项目中演示一遍。


              一、在Django的全局配置目录中新增文件「proj/proj/celery.py


              文件「celery.py」

                import os


                from celery import Celery


                # Set the default Django settings module for the 'celery' program.
                os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'django_daily_media.settings')


                app = Celery('django_daily_media')


                # Using a string here means the worker doesn't have to serialize
                # the configuration object to child processes.
                # - namespace='CELERY' means all celery-related configuration keys
                # should have a `CELERY_` prefix.
                app.config_from_object('django.conf:settings', namespace='CELERY')


                # Load task modules from all registered Django apps.
                app.autodiscover_tasks()




                @app.task(bind=True, ignore_result=True)
                def debug_task(self):
                print(f'Request: {self.request!r}')


                然后,在全局配置文件目录的初始化文件「__init__.py」中进行导入:

                  # 处理「MySQL数据库的驱动支持」
                  import pymysql
                  pymysql.install_as_MySQLdb()


                  # 处理「Celery定时任务支持」
                  from .celery import app as celery_app
                  __all__ = ('celery_app',)



                  接下来,配置全局配置文件「settings.py」,在其中添加Celery的相关配置项:

                    # Celery Configuration Options
                    # - 时区
                    # CELERY_TIMEZONE = "Australia/Tasmania"
                    CELERY_TIMEZONE = "Asia/Shanghai"
                    # - 开启任务追踪
                    CELERY_TASK_TRACK_STARTED = True
                    # - 间隔时间:60(秒)
                    CELERY_TASK_TIME_LIMIT = 30 * 60
                    # - CELERY 消息代理
                    CELERY_BROKER_URL = "redis://localhost:6379/0"
                    # - CELERY 任务结果的存放配置
                    CELERY_RESULT_BACKEND = "redis://localhost:6379/1"


                    到这里Django中的Celery的配置就完成了。


                    接下来可以编写Celery的任务了。


                    05

                    Django「Celery」编写任务


                    在Django中,Celery的任务应该编写在不同的应用(App)的目录中。


                    文件「trade/celery_tasks.py」

                      #!/usr/bin/env python
                      # -*- coding: UTF-8 -*-


                      # ——————————————————
                      # 脚本说明:
                      # xxxxxxxxx
                      # ——————————————————


                      # ========================================
                      # 开始
                      # ))))))))) 模块包导入
                      from celery import shared_task


                      from datetime import timedelta
                      from django.utils import timezone


                      from trade.models import *


                      # ))))))))) 类名
                      @shared_task
                      def add(x, y):
                      return x + y


                      @shared_task
                      def mul(x, y):
                      return x * y


                      @shared_task
                      def xsum(numbers):
                      return sum(numbers)


                      # ))))))))) 执行阶段


                      # ))))))))) 结束


                      # ========================================
                      # 结束



                      06

                      创建路由


                      在Django的全局路由中新增记录:

                        """
                        URL configuration for django_daily_media project.


                        The `urlpatterns` list routes URLs to views. For more information please see:
                        https://docs.djangoproject.com/en/5.0/topics/http/urls/
                        Examples:
                        Function views
                        1. Add an import: from my_app import views
                        2. Add a URL to urlpatterns: path('', views.home, name='home')
                        Class-based views
                        1. Add an import: from other_app.views import Home
                        2. Add a URL to urlpatterns: path('', Home.as_view(), name='home')
                        Including another URLconf
                        1. Import the include() function: from django.urls import include, path
                        2. Add a URL to urlpatterns: path('blog/', include('blog.urls'))
                        """
                        from django.contrib import admin
                        from django.urls import path, include


                        from rest_framework.routers import DefaultRouter


                        # from movie.views import *
                        # from account.views import *


                        from movie import views as movie_view
                        from account import views as account_view
                        from trade import views as trade_view


                        # 这里注册的地址,在后期访问的时候,都需要加上前缀「/api/xxxx」
                        router = DefaultRouter()
                        router.register(
                        'movie', movie_view.MovieViewSet, 'movie'
                        )
                        router.register(
                        'category', movie_view.CategoryViewSet, 'category'
                        )
                        router.register(
                        'collects', account_view.CollectViewSet, 'collects'
                        )
                        router.register(
                        'card', trade_view.CardViewSet, 'card'
                        )
                        router.register(
                        'order', trade_view.OrderViewset, 'order'
                        )


                        urlpatterns = [


                        # Django Admin
                        path("admin/", admin.site.urls),


                        # Django Rest Framework
                        path("api/", include(router.urls)),


                        # Django Djoser
                        path('auth/', include('djoser.urls')),


                        # Django Token Based Authentication
                        # path('auth/', include('djoser.urls.authtoken')),


                        # Django Djoser SWT
                        path("jwt/", include("djoser.urls.jwt")),


                        # Account: Activation
                        path("activate/<str:uid>/<str:token>/", account_view.account_activate),


                        # Account: Password Reset
                        # path("password_reset/<str:uid>/<str:token>/", account_password_reset),


                        # 支付博快:Alipay
                        path("api/alipay/", trade_view.AlipayAPIView.as_view()),
                        path("api/callback/", trade_view.AlipayCallbackAPIView.as_view()),


                        # Celery
                        path('api/tasks/', trade_view.TaskAPIView.as_view()),


                        ]



                        07

                        根据路由配置视图(APIView)

                        代码如下:

                          from django.shortcuts import render
                          from django.utils import timezone
                          from django_filters import rest_framework as filters
                          from django.conf import settings
                          from django.db import transaction


                          from datetime import datetime, timedelta


                          from rest_framework import viewsets,status
                          from rest_framework.views import APIView
                          from rest_framework.response import Response
                          from rest_framework import pagination
                          from rest_framework.permissions import IsAdminUser, IsAuthenticated, SAFE_METHODS


                          from trade.models import *
                          from trade.serializers import *
                          from trade.permissions import *
                          from trade.celery_tasks import *


                          from account.models import *


                          from utils.error import *
                          from utils.common import *


                          from utils.payment_alipay import *
                          # from utils.payment_alipay_new import *


                          # Create your views here.


                          # Card


                          class CardFilter(filters.FilterSet):
                          card_name = filters.CharFilter(lookup_expr='icontains')
                          duration = filters.NumberFilter()


                          class Meta:
                          model = Card
                          fields = ['card_name', 'duration']


                          class CardPagination(pagination.PageNumberPagination):
                          page_size = 12
                          max_page_size = 20
                          page_size_query_param = 'page_size'


                          class CardViewSet(viewsets.ModelViewSet):
                          queryset = Card.objects.all()
                          serializer_class = CardSerializer
                          filter_backends = (filters.DjangoFilterBackend,)
                          filterset_class = CardFilter
                          pagination_class = CardPagination


                          # permission_classes = [
                          # IsAdminUserOrReadonly,
                          # ]


                          # Order


                          class OrderFilter(filters.FilterSet):
                          pay_status = filters.CharFilter(field_name='pay_status',lookup_expr='icontains')
                          order_sn = filters.CharFilter(field_name="order_sn",lookup_expr='icontains')


                          class Meta:
                          model = Order
                          fields = ['pay_status', 'order_sn']


                          class OrderPagination(pagination.PageNumberPagination):
                          page_size = 12
                          max_page_size = 20
                          page_size_query_param = 'page_size'


                          class OrderViewset(viewsets.ModelViewSet):
                          queryset = Order.objects.all()
                          serializer_class = OrderSerializer
                          filter_backends = (filters.DjangoFilterBackend,)
                          filterset_class = OrderFilter
                          pagination_class = OrderPagination


                          # permission_classes = [
                          # IsAdminUserOrReadonly,
                          # ]


                          # 根据不同的场景选择不同的授权策略
                          def get_permissions(self):
                          if self.request.method in SAFE_METHODS:
                          return [IsAuthenticated()]
                          else:
                          return [IsAdminUser()]


                          # Alipay


                          class AlipayAPIView(APIView):


                          # 发起支付宝支付「场景」:
                          # 1. 创建新的支付
                          # 2. 支付未完成的订单
                          def get(self, request):


                          # 判断卡号「card_id」是否正确
                          card_id = request.GET.get("card_id", None)


                          print("================")
                          print(card_id)


                          try:
                          card = Card.objects.get(id=card_id)
                          except:
                          return Response(
                          response_data(
                          *TradeError.CardParamError
                          )
                          )


                          order_sn = request.GET.get('order_sn', None)
                          if not order_sn:
                          # 如果没有 order_sn 则需要创建订单


                          # 订单号
                          out_trade_no = "pay" + datetime.now().strftime("%Y%m%d%H%M%S") + get_random_code(4)


                          # 创建订单
                          try:
                          Order.objects.create(
                          user = Profile.objects.get(user=request.user),
                          card = card,
                          order_sn = out_trade_no,
                          order_mount = card.card_price,
                          pay_time = timezone.now(),
                          )
                          except:
                          return Response(
                          response_data(
                          *TradeError.OrderCreateError
                          )
                          )
                          else:
                          # 如果有 order_sn 则不需要创建订单
                          # 支付之前没支付的订单


                          try:
                          order = Order.objects.get(order_sn=order_sn)
                          if order.pay_status != 'PAYING':
                          return Response(
                          response_data(
                          *TradeError.OrderStatusError
                          )
                          )
                          out_trade_no = order_sn
                          except:
                          return Response(
                          response_data(
                          *TradeError.OrderStatusError
                          )
                          )


                          # 请求支付宝
                          try:
                          # pip install alipay-sdk-python --upgrade
                          alipay = Alipay()
                          pay_url = alipay.trade_page(
                          out_trade_no=out_trade_no,
                          total_amount=str(card.card_price),
                          subject=card.card_name,
                          body="「支付宝 - 沙箱环境」支付",
                          product_code="FAST_INSTANT_TRADE_PAY"
                          )


                          # pip install python-alipay-sdk
                          # 电脑网站支付 alipay.trade.page.pay
                          # pay_url = alipay.api_alipay_trade_page_pay(
                          # out_trade_no=out_trade_no,
                          # total_amount=str(card.card_price),
                          # subject=card.card_name,
                          # return_url=settings.ALIPAY_RETURN_URL,
                          # notify_url=settings.ALIPAY_NOTIFY_URL, # 可选,不填则使用默认 notify url
                          # )


                          # 返回
                          return Response(pay_url)
                          except:
                          return Response(
                          response_data(
                          *TradeError.AlipayRequestError
                          )
                          )


                          class AlipayCallbackAPIView(APIView):


                          def get(self, request):
                          return Response("阿里支付 - 回调函数")


                          def post(self, request):


                          # =======================
                          # 拿到异步返回的结果
                          params = request.POST.dict()
                          print(params)


                          # =======================
                          # 对异步返回结果的验签


                          # ----------------
                          # Step 1 - 在通知返回参数列表中,除去 sign、sign_type 两个参数外,凡是通知返回回来的参数皆是待验签的参数。
                          # --- SIGN
                          # del params['sign']
                          sign = params.pop('sign')


                          # --- SIGN_TYPE
                          del params['sign_type']


                          # ----------------
                          # Step 2 - 将剩下参数进行 url_decode,然后进行字典排序,组成字符串,得到待签名字符串


                          # 对字典进行排序
                          # 1. 列表生成式
                          sorted_list = sorted([(k,v) for k,v in params.items() ])


                          # 拼接字符串
                          unsigned_string = '&'.join(f"{k}={v}" for k,v in sorted_list)


                          # ----------------
                          # Step 3 - 将签名参数(sign)使用 base64 解码为字节码串
                          # Step 4 - 使用 RSA 的验签方法,通过签名字符串、签名参数(经过 base64 解码)及支付宝公钥验证签名


                          # ALIPAY 的官方 PIP 包有提供方法解决「Step 3 Step 4」
                          # from alipay.aop.api.util.SignatureUtils import verify_with_rsa
                          # 该方法在单独实现的 ALIPAY 的工具类中有实现


                          alipay = Alipay()
                          if not alipay.verify_sign(
                          unsigned_string=unsigned_string,
                          sign=sign
                          ):
                          print("==================")
                          print("验签失败")


                          return Response(
                          '验签失败'
                          )


                          # ----------------
                          # Step 5 - 需要严格按照如下描述校验通知数据的正确性
                          # 1. 商家需要验证该通知数据中的 out_trade_no 是否为商家系统中创建的订单号。
                          # 2. 判断 total_amount 是否确实为该订单的实际金额(即商家订单创建时的金额)。
                          # 3. 校验通知中的 seller_id(或者 seller_email)是否为 out_trade_no 这笔单据的对应的操作方(有的时候,一个商家可能有多个 seller_id/seller_email)。
                          # 4. 验证 app_id 是否为该商家本身。
                          # ----------------
                          # 上述 1、2、3、4 有任何一个验证不通过,则表明本次通知是异常通知,务必忽略。在上述验证通过后商家必须根据支付宝不同类型的业务通知,正确的进行不同的业务处理,并且过滤重复的通知结果数据。在支付宝的业务通知中,只有交易通知状态为 TRADE_SUCCESS 或 TRADE_FINISHED 时,支付宝才会认定为买家付款成功。
                          # ----------------


                          try:
                          order = Order.objects.get(order_sn=params.get('out_trade_no'))
                          except:
                          return Response('「订单」不存在')


                          if params.get('total_amount') != str(order.order_mount):
                          print("==================")
                          print("验签失败")


                          return Response(
                          '验签失败'
                          )


                          if params.get('seller_id') != settings.ALIPAY_SELLER_PID:
                          print("==================")
                          print("验签失败")


                          return Response(
                          '验签失败'
                          )


                          if params.get('app_id') != settings.ALIPAY_APP_ID:
                          print("==================")
                          print("验签失败")


                          return Response(
                          '验签失败'
                          )


                          if params.get('trade_status') not in ['TRADE_SUCCESS','TRADE_FINISHED']:
                          print("==================")
                          print("验签失败")


                          return Response(
                          '验签失败'
                          )


                          # 至此,验签「全部通过」


                          # 执行其他的业务逻辑


                          # 数据库「事务一致性」
                          with transaction.atomic():


                          # ======================
                          # --- 修改 ORDER 表
                          # ======================


                          order.trade_no = params.get('trade_no')
                          order.pay_status = params.get('trade_status')


                          # order.pay_time = datetime.now()
                          order.pay_time = timezone.now()


                          # 保存修改
                          order.save()


                          # ======================
                          # --- 修改 PROFILE 表
                          # ======================


                          profile = Profile.objects.get(uid=order.user.uid)
                          profile.is_upgrade = 1


                          # profile.upgrade_time = datetime.now()
                          profile.upgrade_time = timezone.now()


                          profile.upgrade_count += 1


                          # 如果首次充值或者会员已过期
                          # 如果会员未过期 - 在原来的基础时间上再次加上时长
                          if not profile.expire_time or profile.expire_time < timezone.now():
                          # profile.expire_time = datetime.now() + timedelta(days=order.card.duration)
                          profile.expire_time = timezone.now() + timedelta(days=order.card.duration)
                          else:
                          profile.expire_time = profile.expire_time + timedelta(days=order.card.duration)


                          # 保存修改
                          profile.save()


                          print("==================")
                          print("验签成功")


                          return Response("success")


                          class TaskAPIView(APIView):
                          def get(self, request):
                          result_1 = add.delay(3, 4)
                          result_2 = mul.delay(4, 5)
                          result_3 = xsum.delay([1, 2, 3, 4, 5])


                          print(f'add: {result_1}')
                          print(f'mul: {result_2}')
                          print(f'xsum: {result_3}')


                          # 返回阶段
                          return Response("Celery - 执行任务(Tasks)")


                          08

                          启动Django的「Celery Flower」

                          根据上面的Django中的Celery的配置,我们从命令行启动Celery:

                            (venv) (base) adamhuan@Leviathan django_daily_media % pwd
                            /Users/adamhuan/PycharmProjects/django_daily_media
                            (venv) (base) adamhuan@Leviathan django_daily_media % celery -A django_daily_media worker -l INFO

                            -------------- celery@Leviathan.local v5.4.0 (opalescent)
                            --- ***** -----
                            -- ******* ---- macOS-14.4-arm64-arm-64bit 2024-06-10 23:56:11
                            - *** --- * ---
                            - ** ---------- [config]
                            - ** ---------- .> app: django_daily_media:0x107fcd840
                            - ** ---------- .> transport: redis://localhost:6379/0
                            - ** ---------- .> results: redis://localhost:6379/1
                            - *** --- * --- .> concurrency: 10 (prefork)
                            -- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
                            --- ***** -----
                            -------------- [queues]
                            .> celery exchange=celery(direct) key=celery



                            [tasks]
                            . django_daily_media.celery.debug_task
                            . trade.celery_tasks.add
                            . trade.celery_tasks.mul
                            . trade.celery_tasks.xsum


                            [2024-06-10 23:56:12,106: WARNING/MainProcess] Users/adamhuan/PycharmProjects/django_daily_media/venv/lib/python3.10/site-packages/celery/worker/consumer/consumer.py:508: CPendingDeprecationWarning: The broker_connection_retry configuration setting will no longer determine
                            whether broker connection retries are made during startup in Celery 6.0 and above.
                            If you wish to retain the existing behavior for retrying connections on startup,
                            you should set broker_connection_retry_on_startup to True.
                            warnings.warn(


                            [2024-06-10 23:56:12,117: INFO/MainProcess] Connected to redis://localhost:6379/0
                            [2024-06-10 23:56:12,117: WARNING/MainProcess] Users/adamhuan/PycharmProjects/django_daily_media/venv/lib/python3.10/site-packages/celery/worker/consumer/consumer.py:508: CPendingDeprecationWarning: The broker_connection_retry configuration setting will no longer determine
                            whether broker connection retries are made during startup in Celery 6.0 and above.
                            If you wish to retain the existing behavior for retrying connections on startup,
                            you should set broker_connection_retry_on_startup to True.
                            warnings.warn(


                            [2024-06-10 23:56:12,119: INFO/MainProcess] mingle: searching for neighbors
                            [2024-06-10 23:56:13,130: INFO/MainProcess] mingle: all alone
                            [2024-06-10 23:56:13,156: INFO/MainProcess] celery@Leviathan.local ready.





                            启动Flower:

                              (venv) (base) adamhuan@Leviathan django_daily_media % pwd
                              /Users/adamhuan/PycharmProjects/django_daily_media
                              (venv) (base) adamhuan@Leviathan django_daily_media %
                              (venv) (base) adamhuan@Leviathan django_daily_media % celery -A django_daily_media flower
                              [I 240610 23:58:47 command:168] Visit me at http://0.0.0.0:5555
                              [I 240610 23:58:47 command:176] Broker: redis://localhost:6379/0
                              [I 240610 23:58:47 command:177] Registered tasks:
                              ['celery.accumulate',
                              'celery.backend_cleanup',
                              'celery.chain',
                              'celery.chord',
                              'celery.chord_unlock',
                              'celery.chunks',
                              'celery.group',
                              'celery.map',
                              'celery.starmap',
                              'django_daily_media.celery.debug_task',
                              'trade.celery_tasks.add',
                              'trade.celery_tasks.mul',
                              'trade.celery_tasks.xsum']
                              [I 240610 23:58:47 mixins:228] Connected to redis://localhost:6379/0





                              09

                              访问前面设置的路由,触发Task执

                              在触发Celery的Task执行前,先看看Flower的状态:

                              请求路由:

                              http://localhost:8000/api/tasks/

                              可以看到,请求成功。


                              这个时候,看看Flower的状态:

                              可以看到,我们写的Tasks执行成功。


                              10

                              Django Celery Beat

                              通过PIP安装依赖包:

                                (venv) (base) adamhuan@Leviathan django_daily_media % pip install django-celery-beat
                                Looking in indexes: https://mirrors.aliyun.com/pypi/simple/
                                Collecting django-celery-beat
                                Downloading https://mirrors.aliyun.com/pypi/packages/1b/ce/308fdad8c073051c0a1e494939d5c304b4efbbeb4bee1115495a60c139e8/django-celery-beat-2.6.0.tar.gz (160 kB)
                                ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 160.5/160.5 kB 760.5 kB/s eta 0:00:00
                                Preparing metadata (setup.py) ... done
                                Requirement already satisfied: celery<6.0,>=5.2.3 in ./venv/lib/python3.10/site-packages (from django-celery-beat) (5.4.0)
                                Collecting django-timezone-field>=5.0 (from django-celery-beat)
                                Downloading https://mirrors.aliyun.com/pypi/packages/0a/a0/010458d5314492b06e29d55ac1ae0409a9e7f69ef2d690a60b2aa4787577/django_timezone_field-6.1.0-py3-none-any.whl (12 kB)
                                Requirement already satisfied: tzdata in ./venv/lib/python3.10/site-packages (from django-celery-beat) (2024.1)
                                Collecting python-crontab>=2.3.4 (from django-celery-beat)
                                Downloading https://mirrors.aliyun.com/pypi/packages/2d/55/55c92a1b84aed86b9c9e9db130eefe473775b9d6bb9fbd031036e581fb9d/python-crontab-3.1.0.tar.gz (56 kB)
                                ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 56.8/56.8 kB 699.8 kB/s eta 0:00:00
                                Preparing metadata (setup.py) ... done
                                Collecting cron-descriptor>=1.2.32 (from django-celery-beat)
                                Downloading https://mirrors.aliyun.com/pypi/packages/e0/18/136b0073305038d1317f3442f614a698ce686830a2810bbe7e875311e09f/cron_descriptor-1.4.3-py3-none-any.whl (49 kB)
                                ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 49.8/49.8 kB 663.6 kB/s eta 0:00:00
                                Requirement already satisfied: Django<5.1,>=2.2 in ./venv/lib/python3.10/site-packages (from django-celery-beat) (5.0.6)
                                Requirement already satisfied: billiard<5.0,>=4.2.0 in ./venv/lib/python3.10/site-packages (from celery<6.0,>=5.2.3->django-celery-beat) (4.2.0)
                                Requirement already satisfied: kombu<6.0,>=5.3.4 in ./venv/lib/python3.10/site-packages (from celery<6.0,>=5.2.3->django-celery-beat) (5.3.7)
                                Requirement already satisfied: vine<6.0,>=5.1.0 in ./venv/lib/python3.10/site-packages (from celery<6.0,>=5.2.3->django-celery-beat) (5.1.0)
                                Requirement already satisfied: click<9.0,>=8.1.2 in ./venv/lib/python3.10/site-packages (from celery<6.0,>=5.2.3->django-celery-beat) (8.1.7)
                                Requirement already satisfied: click-didyoumean>=0.3.0 in ./venv/lib/python3.10/site-packages (from celery<6.0,>=5.2.3->django-celery-beat) (0.3.1)
                                Requirement already satisfied: click-repl>=0.2.0 in ./venv/lib/python3.10/site-packages (from celery<6.0,>=5.2.3->django-celery-beat) (0.3.0)
                                Requirement already satisfied: click-plugins>=1.1.1 in ./venv/lib/python3.10/site-packages (from celery<6.0,>=5.2.3->django-celery-beat) (1.1.1)
                                Requirement already satisfied: python-dateutil>=2.8.2 in ./venv/lib/python3.10/site-packages (from celery<6.0,>=5.2.3->django-celery-beat) (2.9.0.post0)
                                Requirement already satisfied: asgiref<4,>=3.7.0 in ./venv/lib/python3.10/site-packages (from Django<5.1,>=2.2->django-celery-beat) (3.8.1)
                                Requirement already satisfied: sqlparse>=0.3.1 in ./venv/lib/python3.10/site-packages (from Django<5.1,>=2.2->django-celery-beat) (0.5.0)
                                Requirement already satisfied: typing-extensions>=4 in ./venv/lib/python3.10/site-packages (from asgiref<4,>=3.7.0->Django<5.1,>=2.2->django-celery-beat) (4.11.0)
                                Requirement already satisfied: prompt-toolkit>=3.0.36 in ./venv/lib/python3.10/site-packages (from click-repl>=0.2.0->celery<6.0,>=5.2.3->django-celery-beat) (3.0.47)
                                Requirement already satisfied: amqp<6.0.0,>=5.1.1 in ./venv/lib/python3.10/site-packages (from kombu<6.0,>=5.3.4->celery<6.0,>=5.2.3->django-celery-beat) (5.2.0)
                                Requirement already satisfied: six>=1.5 in ./venv/lib/python3.10/site-packages (from python-dateutil>=2.8.2->celery<6.0,>=5.2.3->django-celery-beat) (1.16.0)
                                Requirement already satisfied: wcwidth in ./venv/lib/python3.10/site-packages (from prompt-toolkit>=3.0.36->click-repl>=0.2.0->celery<6.0,>=5.2.3->django-celery-beat) (0.2.13)
                                Building wheels for collected packages: django-celery-beat, python-crontab
                                Building wheel for django-celery-beat (setup.py) ... done
                                Created wheel for django-celery-beat: filename=django_celery_beat-2.6.0-py3-none-any.whl size=92824 sha256=ccda69d984a469da3bce69565d8c257c5f81e86ab716fe25f4113bb392fbad1c
                                Stored in directory: Users/adamhuan/Library/Caches/pip/wheels/5e/d5/32/be975842907b25708139cf512e75ddc09e360479c05661af2e
                                Building wheel for python-crontab (setup.py) ... done
                                Created wheel for python-crontab: filename=python_crontab-3.1.0-py3-none-any.whl size=27287 sha256=61034d28a84ad806ebf02a615ca22880c4fd7936587d7fbe63720db23656616a
                                Stored in directory: Users/adamhuan/Library/Caches/pip/wheels/cf/07/86/d82187eb1398b63c155d9f61616ad7144927f9c082cff8ad65
                                Successfully built django-celery-beat python-crontab
                                Installing collected packages: cron-descriptor, python-crontab, django-timezone-field, django-celery-beat
                                Successfully installed cron-descriptor-1.4.3 django-celery-beat-2.6.0 django-timezone-field-6.1.0 python-crontab-3.1.0
                                (venv) (base) adamhuan@Leviathan django_daily_media %


                                全局配置文件中添加:

                                  # Application definition
                                  INSTALLED_APPS = [
                                  "django.contrib.admin",
                                  "django.contrib.auth",
                                  "django.contrib.contenttypes",
                                  "django.contrib.sessions",
                                  "django.contrib.messages",
                                  "django.contrib.staticfiles",


                                  # Django Filter
                                  "django_filters",


                                  # Django Rest Framework
                                  "rest_framework",


                                  # Django Djoser
                                  "djoser",


                                  # pip install django-cors-headers
                                  "corsheaders",


                                  # pip install django-celery-beat
                                  'django_celery_beat',


                                  # 电影
                                  "movie",


                                  # 用户
                                  "account",


                                  # 交易
                                      "trade",
                                  ]


                                  执行数据库的迁移操作:

                                    (venv) (base) adamhuan@Leviathan django_daily_media % pwd
                                    /Users/adamhuan/PycharmProjects/django_daily_media
                                    (venv) (base) adamhuan@Leviathan django_daily_media %
                                    (venv) (base) adamhuan@Leviathan django_daily_media % python manage.py makemigrations
                                    No changes detected
                                    (venv) (base) adamhuan@Leviathan django_daily_media %
                                    (venv) (base) adamhuan@Leviathan django_daily_media % python manage.py migrate
                                    System check identified some issues:


                                    WARNINGS:
                                    trade.Card.card_name: (mysql.W003) MySQL may not allow unique CharFields to have a max_length > 255.
                                    HINT: See: https://docs.djangoproject.com/en/5.0/ref/databases/#mysql-character-fields
                                    Operations to perform:
                                    Apply all migrations: account, admin, auth, contenttypes, django_celery_beat, movie, sessions, trade
                                    Running migrations:
                                    Applying django_celery_beat.0001_initial... OK
                                    Applying django_celery_beat.0002_auto_20161118_0346... OK
                                    Applying django_celery_beat.0003_auto_20161209_0049... OK
                                    Applying django_celery_beat.0004_auto_20170221_0000... OK
                                    Applying django_celery_beat.0005_add_solarschedule_events_choices... OK
                                    Applying django_celery_beat.0006_auto_20180322_0932... OK
                                    Applying django_celery_beat.0007_auto_20180521_0826... OK
                                    Applying django_celery_beat.0008_auto_20180914_1922... OK
                                    Applying django_celery_beat.0006_auto_20180210_1226... OK
                                    Applying django_celery_beat.0006_periodictask_priority... OK
                                    Applying django_celery_beat.0009_periodictask_headers... OK
                                    Applying django_celery_beat.0010_auto_20190429_0326... OK
                                    Applying django_celery_beat.0011_auto_20190508_0153... OK
                                    Applying django_celery_beat.0012_periodictask_expire_seconds... OK
                                    Applying django_celery_beat.0013_auto_20200609_0727... OK
                                    Applying django_celery_beat.0014_remove_clockedschedule_enabled... OK
                                    Applying django_celery_beat.0015_edit_solarschedule_events_choices... OK
                                    Applying django_celery_beat.0016_alter_crontabschedule_timezone... OK
                                    Applying django_celery_beat.0017_alter_crontabschedule_month_of_year... OK
                                    Applying django_celery_beat.0018_improve_crontab_helptext... OK
                                    (venv) (base) adamhuan@Leviathan django_daily_media %


                                    启动「Celery」:

                                      (venv) (base) adamhuan@Leviathan django_daily_media % pwd
                                      /Users/adamhuan/PycharmProjects/django_daily_media
                                      (venv) (base) adamhuan@Leviathan django_daily_media % celery -A django_daily_media worker --loglevel=info

                                      -------------- celery@Leviathan.local v5.4.0 (opalescent)
                                      --- ***** -----
                                      -- ******* ---- macOS-14.4-arm64-arm-64bit 2024-06-11 00:15:23
                                      - *** --- * ---
                                      - ** ---------- [config]
                                      - ** ---------- .> app: django_daily_media:0x1070ed840
                                      - ** ---------- .> transport: redis://localhost:6379/0
                                      - ** ---------- .> results: redis://localhost:6379/1
                                      - *** --- * --- .> concurrency: 10 (prefork)
                                      -- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
                                      --- ***** -----
                                      -------------- [queues]
                                      .> celery exchange=celery(direct) key=celery



                                      [tasks]
                                      . django_daily_media.celery.debug_task
                                      . trade.celery_tasks.add
                                      . trade.celery_tasks.mul
                                      . trade.celery_tasks.xsum


                                      [2024-06-11 00:15:23,613: WARNING/MainProcess] Users/adamhuan/PycharmProjects/django_daily_media/venv/lib/python3.10/site-packages/celery/worker/consumer/consumer.py:508: CPendingDeprecationWarning: The broker_connection_retry configuration setting will no longer determine
                                      whether broker connection retries are made during startup in Celery 6.0 and above.
                                      If you wish to retain the existing behavior for retrying connections on startup,
                                      you should set broker_connection_retry_on_startup to True.
                                      warnings.warn(


                                      [2024-06-11 00:15:23,618: INFO/MainProcess] Connected to redis://localhost:6379/0
                                      [2024-06-11 00:15:23,618: WARNING/MainProcess] Users/adamhuan/PycharmProjects/django_daily_media/venv/lib/python3.10/site-packages/celery/worker/consumer/consumer.py:508: CPendingDeprecationWarning: The broker_connection_retry configuration setting will no longer determine
                                      whether broker connection retries are made during startup in Celery 6.0 and above.
                                      If you wish to retain the existing behavior for retrying connections on startup,
                                      you should set broker_connection_retry_on_startup to True.
                                      warnings.warn(


                                      [2024-06-11 00:15:23,622: INFO/MainProcess] mingle: searching for neighbors
                                      [2024-06-11 00:15:24,633: INFO/MainProcess] mingle: all alone
                                      [2024-06-11 00:15:24,650: INFO/MainProcess] celery@Leviathan.local ready.





                                      启动「Celery Beat」:

                                        (venv) (base) adamhuan@Leviathan django_daily_media % celery -A django_daily_media beat --loglevel=info --scheduler django_celery_beat.schedulers:DatabaseScheduler
                                        celery beat v5.4.0 (opalescent) is starting.
                                        __ - ... __ - _
                                        LocalTime -> 2024-06-11 00:17:18
                                        Configuration ->
                                        . broker -> redis://localhost:6379/0
                                        . loader -> celery.loaders.app.AppLoader
                                        . scheduler -> django_celery_beat.schedulers.DatabaseScheduler


                                        . logfile -> [stderr]@%INFO
                                        . maxinterval -> 5.00 seconds (5s)
                                        [2024-06-11 00:17:18,166: INFO/MainProcess] beat: Starting...





                                        启动「Flower」:

                                          (venv) (base) adamhuan@Leviathan django_daily_media % celery -A django_daily_media flower
                                          [I 240611 00:27:08 command:168] Visit me at http://0.0.0.0:5555
                                          [I 240611 00:27:08 command:176] Broker: redis://localhost:6379/0
                                          [I 240611 00:27:08 command:177] Registered tasks:
                                          ['celery.accumulate',
                                          'celery.backend_cleanup',
                                          'celery.chain',
                                          'celery.chord',
                                          'celery.chord_unlock',
                                          'celery.chunks',
                                          'celery.group',
                                          'celery.map',
                                          'celery.starmap',
                                          'django_daily_media.celery.debug_task',
                                          'trade.celery_tasks.add',
                                          'trade.celery_tasks.mul',
                                          'trade.celery_tasks.xsum']
                                          [I 240611 00:27:08 mixins:228] Connected to redis://localhost:6379/0





                                          这时候,任务的配置可以通过Django的Admin后台页面去配置:

                                          可以看到新增的「周期任务」的相关表的后台管理页面。


                                          来添加一个测试任务:

                                          定义周期:

                                          定义任务的传参:

                                          保存后,可以看到新增的定时任务:


                                          可以通过Flower的Tasks监听到的状态,看到任务确实执行了,并且是每隔5秒执行一次:


                                          11

                                          修改过期的订单状态


                                          通过上面的演示,你看到了Celery在异步与执行定时任务方面的表现。


                                          那么,这个功能有什么现实意义呢?


                                          以前面的电影网站的练手项目为例,我们设置一个定时任务,定期检查订单状态,将超时订单的状态设置为「失效」


                                          文件「trade/celery_tasks.py」

                                            #!/usr/bin/env python
                                            # -*- coding: UTF-8 -*-


                                            # ——————————————————
                                            # 脚本说明:
                                            # xxxxxxxxx
                                            # ——————————————————


                                            # ========================================
                                            # 开始
                                            # ))))))))) 模块包导入
                                            from celery import shared_task


                                            from datetime import timedelta
                                            from django.utils import timezone


                                            from trade.models import *


                                            # ))))))))) 类名
                                            @shared_task
                                            def add(x, y):
                                            return x + y


                                            @shared_task
                                            def mul(x, y):
                                            return x * y


                                            @shared_task
                                            def xsum(numbers):
                                            return sum(numbers)


                                            # ))))))))) 执行阶段


                                            @shared_task
                                            def check_order_expierd(order_id):
                                            order = Order.objects.get(id = order_id)


                                            expired_time = order.pay_time + timedelta(minutes=30)
                                            current_time = timezone.now()


                                            if current_time > expired_time:
                                            order.pay_status = "TRADE_CLOSE"
                                            order.save()


                                            return f'order {order.id} 已过期「TRADE_CLOSE」'


                                            else:
                                            return f'order {order.id} 未过期「PAYING」'


                                            @shared_task
                                            def check_expired_orders():
                                            orders = Order.objects.filter(pay_status="PAYING")


                                            for order in orders:
                                            # 如果不使用「.delay」则为「同步执行 顺序执行」
                                            # 使用了之后「.delay」则为「异步执行」
                                            check_order_expierd.delay(order.id)




                                            # ))))))))) 结束


                                            # ========================================
                                            # 结束



                                            然后,在Django的Admin页面中配置定时任务:

                                            这次的任务不需要参数,直接保存后:


                                            然后,就可以在Flower中看到定时任务的执行状态了:

                                            在Celery的命令行后台也可以看到这些信息:


                                            12

                                            前端增加「TRADE_CLOSE」状态


                                            这个时候,看到的前端页面的状态:


                                            在表格中,添加新的「a-tag」:

                                            这时候的页面效果:



                                            这个时候的完整前端代码如下:

                                              <template>
                                              <div id="container" class="text-white text-sm bg-primary-300 min-h-screen pb-4">
                                              <Header/>


                                              <div class="flex items-center justify-center">


                                              <div class="w-full px-2" style="max-width:1440px;">


                                              <div id="main" class="bg-primary-300 p-6 text-black">


                                              <!-- 订单 -->
                                              <div class="rounded bg-white mx-4 my-4 py-6 ">
                                              <div class="px-6">
                                              <h1 class="text-lg font-semibold">我的订单</h1>
                                              </div>
                                              </div>


                                              <div id="orders_all" class="rounded bg-white mx-4 my-4 py-6">


                                              <!-- 头部标签组 与 输入框 -->
                                              <a-row class="flex items-center justify-center">
                                              <a-space>


                                              <!-- 标签组 -->
                                              <div>
                                              <a-tag color="purple" @click="TagClick('all')">
                                              <a href="/orders">
                                              All 「全部」
                                              </a>
                                              <!-- All 「全部」 -->
                                              </a-tag>
                                              <a-tag color="cyan" @click="TagClick('paying')">
                                              <a href="/orders/?pay_status=paying">
                                              Paying 「支付中」
                                              </a>
                                              <!-- Paying 「支付中」 -->
                                              </a-tag>
                                              <a-tag color="green" @click="TagClick('finish')">
                                              <a href="/orders/?pay_status=trade_success">
                                              FINISH 「完结」
                                              </a>
                                              <!-- FINISH 「完结」 -->
                                              </a-tag>
                                              </div>


                                              <!--输入框 -->
                                              <div class="relative shrink">
                                              <form>
                                              <input v-model.trim="order_sn" type="text" name="order_sn" class="
                                              outline-0
                                              h-9
                                              rounded
                                              border
                                              border-gray-600
                                              placeholder-gray-400
                                              w-64
                                              px-2
                                              py-1
                                              max-w-[180px]
                                              " placeholder="输入订单编号" >
                                              <div class="
                                              absolute
                                              top-0
                                              right-0
                                              flex
                                              items-center
                                              h-full
                                              ">
                                              <div class=" rounded text-xs text-gray-400 px-2 mr-2">
                                              <button v-on:click.prevent="searchOrders">
                                              <svg xmlns="http://www.w3.org/2000/svg" class="h-4 w-4" fill="none" viewBox="0 0 24 24" stroke="currentColor" stroke-width="2">
                                              <path stroke-linecap="round" stroke-linejoin="round" d="M21 21l-6-6m2-5a7 7 0 11-14 0 7 7 0 0114 0z" >
                                              </svg>
                                              </button>
                                              </div>
                                              </div>
                                              </form>
                                              </div>


                                              </a-space>
                                              </a-row>


                                              <!-- 空行 -->
                                              <br/>


                                              <!-- 表格组件 -->
                                              <a-table
                                              :columns="columns"
                                              :data-source="data"
                                              >
                                              <template #bodyCell="{ column, record }">


                                              <!-- 支付状态 -->
                                              <template v-if="column.key === 'pay_status'">
                                              <span>
                                              <a-tag v-if="record.pay_status === 'PAYING'" color="orange">
                                              支付中
                                              </a-tag>
                                              <a-tag v-if="record.pay_status === 'TRADE_SUCCESS'" color="green">
                                              支付成功
                                              </a-tag>
                                              <a-tag v-if="record.pay_status === 'TRADE_CLOSE'" color="red">
                                              交易关闭
                                              </a-tag>
                                              </span>
                                              </template>


                                              <!-- 操作 -->
                                              <template v-if="column.key === 'action'">
                                              <!-- 未完成的订单 继续支付 -->
                                              <a-button v-if="record.pay_status === 'PAYING'" @click="pay(record)">支付</a-button>
                                              </template>


                                              </template>
                                              </a-table>


                                              </div>


                                              </div>


                                              </div>


                                              </div>


                                              <Footer/>
                                              </div>
                                              </template>


                                              <script setup lang="ts">
                                              // ================================= 引入包


                                              // 项目:头与底部
                                              import Header from "@/components/layout/Header.vue"
                                              import Footer from "@/components/layout/Footer.vue"


                                              // 消息通知
                                              import showMessage from "@/utils/message.js";


                                              // 处理表格数据与渲染
                                              import axios from 'axios'
                                              import {useRequest} from 'vue-request'
                                              import {watch} from 'vue';


                                              // 路由或地址相关
                                              import {useRoute} from "vue-router";
                                              const route = useRoute()


                                              import router from "@/router/index.ts";
                                              import {ref} from "vue";


                                              // ================================= 获取数据
                                              const getOrders = () => {
                                              // 最终返回的变量
                                              let data_return = ''


                                              // 地址
                                              let url = '/api/order/'


                                              // 根据 URL 地址的状态进行动态设定
                                              const page:any = Number(route.query.page)
                                              const pay_status:any = route.query.pay_status
                                              const order_sn:any = route.query.order_sn


                                              const params = new URLSearchParams()


                                              if (page) {
                                              params.append('page', page)
                                              }
                                              if (pay_status) {
                                              params.append('pay_status', pay_status)
                                              }
                                              if (order_sn) {
                                              params.append('order_sn', order_sn)
                                              }


                                              url = url + '?' + params.toString()


                                              // AXIOS 请求API接口
                                              const orders = axios.get(url).then(
                                              response => {
                                              // 最终返回数据
                                              let return_data = []


                                              // 原始队列
                                              let origin_orders = response.data.results


                                              for (var key in origin_orders) {


                                              let origin_orders_item = origin_orders[key]
                                              let origin_orders_item_card = origin_orders_item['card']


                                              // 处理原始字典
                                              delete origin_orders_item.card


                                              return_data.push(Object.assign(
                                              {},
                                              origin_orders_item,
                                              origin_orders_item_card,
                                              ))


                                              // 显示
                                              // console.log("@@@@@@@@@@ 项目")
                                              // console.log(origin_orders_item)
                                              // console.log("--- 项目【Card】")
                                              // console.log(origin_orders_item_card)
                                              }


                                              return return_data
                                              }
                                              )


                                              // data_return = handleNestedDict(orders)
                                              data_return = orders


                                              // 返回阶段
                                              return data_return
                                              }


                                              // ================================= 通过 vue-request 请求
                                              // vue-request 官方文档 https://www.attojs.com/api/#return-values
                                              const {
                                              data,
                                              run,
                                              } = useRequest(getOrders)


                                              // ================================= 表格组件相关
                                              const columns = [
                                              {
                                              title: '订单编号',
                                              dataIndex: 'order_sn',
                                              key: 'order_sn',
                                              sorter: (a, b) => {
                                              // 按名称的字母顺序排序
                                              return a.order_sn.localeCompare(b.order_sn)
                                              },
                                              },
                                              {
                                              title: '支付状态',
                                              dataIndex: 'pay_status',
                                              key: 'pay_status',
                                              filters: [
                                              {text:"已支付", value:"TRADE_SUCCESS"},
                                              {text:"支付中", value:"PAYING"},
                                              ],
                                              onFilter: (value, record) => record.pay_status.indexOf(value) === 0,
                                              },
                                              {
                                              title: '支付金额',
                                              dataIndex: 'order_mount',
                                              key: 'order_mount',
                                              // 排序「数字」
                                              sorter: (a, b) => a.order_mount - b.order_mount,
                                              sortDirections: ['ascend','descend',],
                                              },
                                              {
                                              title: '创建时间',
                                              dataIndex: 'created_at',
                                              sorter: (a, b) => {


                                              // 按时间排序「1」
                                              // const dateA = new Date(a.created_at).getTime()
                                              // const dateB = new Date(b.created_at).getTime()
                                              //
                                              // return dateA - dateB


                                              // 按时间排序「2」
                                              return Date.parse(a.created_at) - Date.parse(b.created_at)
                                              },
                                              },
                                              {
                                              title: '更新时间',
                                              dataIndex: 'updated_at',
                                              sorter: (a, b) => {
                                              // 按时间排序
                                              return Date.parse(a.updated_at) - Date.parse(b.updated_at)
                                              },
                                              },
                                              {
                                              title: '会员卡',
                                              dataIndex: "card_name",
                                              key: "card_name",
                                              filters: [
                                              {text:"黄金VIP", value:"黄金VIP"},
                                              {text:"白金VIP", value:"白金VIP"},
                                              {text:"星钻VIP", value:"星钻VIP"},
                                              ],
                                              onFilter: (value, record) => record.card_name.indexOf(value) === 0,
                                              },
                                              {
                                              title: '会员有效期(天)',
                                              dataIndex: 'duration',
                                              key: 'duration'
                                              },
                                              {
                                              title: '操作',
                                              key: 'action',
                                              // 固定位置
                                              // fixed: 'right',
                                              },
                                              ]


                                              // ================================= 当表格数据变化时触发


                                              // const handleTableChange: TableProps['onChange'] = (
                                              // filters: any,
                                              // sorter: any,
                                              // ) => {
                                              // run({
                                              // sortField: sorter.field,
                                              // sortOrder: sorter.order,
                                              // ...filters,
                                              // })
                                              // }


                                              // ================================= 「标签组」相关


                                              function TagClick(message) {


                                              if (message === 'all') {
                                              showMessage(message, "info")
                                              run({
                                              pay_status: ''
                                              })
                                              }
                                              if (message === 'paying') {
                                              showMessage(message, "info")
                                              run({
                                              pay_status: 'paying'
                                              })
                                              }
                                              if (message === 'finish') {
                                              showMessage(message, "info")
                                              run({
                                              pay_status: 'trade_success'
                                              })
                                              }
                                              }


                                              // ================================= 检索框「搜索」相关


                                              // 检索框搜索方法
                                              const order_sn = ref('')


                                              function searchOrders() {
                                              router.push({
                                              name: "Orders",
                                              query: {
                                              order_sn: order_sn.value
                                              }
                                              })
                                              }


                                              // 当路由地址发生变化的时候,重新渲染表格数据
                                              watch(() => route.params, (newParams, oldParams) => {
                                              console.log("路由参数发生了变化")
                                              run({
                                              order_sn: order_sn.value
                                              })


                                              })


                                              // ================================= 未完成的订单的支付功能


                                              function pay(record) {


                                              console.log(record)


                                              axios
                                              // .get('/api/alipay', {card_id: this.card.id})
                                              .get('/api/alipay/?card_id=' + record.id + '&order_sn=' + record.order_sn)
                                              .then(
                                              response => {
                                              console.log(response.data)


                                              // 跳转到支付页面
                                              window.location.href = response.data
                                              }
                                              )
                                              }


                                              // ================================= 输出 / 显示
                                              console.log("@@@@@@@@@@@@@@@@@@@@@@ FINISH")
                                              console.log(data)
                                              </script>


                                              <style scoped>
                                              </style>


                                              13

                                              终了


                                              至此,关于Django中,通过Celery实现异步与定时任务的功能已经完成了。





                                              END




                                              温馨提示



                                              如果你喜欢本文,请分享到朋友圈,想要获得更多信息,请关注我。


                                              文章转载自Nephilim,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

                                              评论