celery中可以使用revoke来撤销任务。
这里的revoke包含三种方式: asyncresult.revoke、app.control.revoke和revoke命令。
撤销任务的三种方式
asyncresult.revoke
>>>t = tasks.celeryTest.delay(30)
>>>t.revoke()
app.control.revoke
>>>t = tasks.celeryTest.delay(30)
>>>t.task_id
'8c1cfcfc-2fee-450f-9dc8-59747cb3796a'
>>>from celery import app
>>>app.control.revoke("8c1cfcfc-2fee-450f-9dc8-59747cb3796a")
revoke命令
celery -A proj control revoke <task_id>
celery的任务状态
celery有六种内置的状态。
PENDING (waiting for execution or unknown task id) STARTED (task has been started) SUCCESS (task executed successfully) FAILURE (task execution resulted in exception) RETRY (task is being retried) REVOKED (task has been revoked)
只有处理PENDING、STARTED、RETRY状态的任务可以被撤销。
对于仍在排队还未执行(pending)的任务,当worker收到撤销请求后,会将task的flag置为revoked。当worker执行到该任务时,会去check任务的状态,如果任务状态是revoked,则会直接丢弃该任务。
Task common.tasks.celeryTest[64e80585-dc4a-4905-9746-42abb9d6ef5d] received
Discarding revoked task: common.tasks.celeryTest[64e80585-dc4a-4905-9746-42abb9d6ef5d]
注:查看任务状态可以通过Asyncresult.status命令。
但是对于已经在执行(STARTED\RETRY)的任务,如果想要撤销它,还需要另外设置terminate=True。
t.revoke(terminate=True)
app.control.revoke("65af7dd3-0915-43e7-a553-9afa119fd14b", terminate=True)
celery的日志:
Terminating 65af7dd3-0915-43e7-a553-9afa119fd14b (Signals.SIGTERM)
终止一个已经存在的任务,其实是通过给worker发送signal来的,这里的signal默认是Signals.SIGTERM。signal实际上可以是signal模块的任意值,如SIGQUIT、SIGUSR1、SIGUSR2等,不同的值,worker也会有不同的表现。
需要注意的是celery支持多种并发方式prefork、gevent和eventlet。目前只有prefork和eventlet支持撤销任务。 eventlet也是最新版本刚加的,貌似是5.0才开始支持。
我们看下eventlet的终止任务实现,可以看到是调用了greenlet.kill。
def terminate_job(self, pid, signal=None):
if pid in self._pool_map.keys():
greenlet = self._pool_map[pid]
greenlet.kill()
greenlet.wait()
结论
celery可以使用revoker撤销任务,对于还没有开始运行的任务,只需要revoke即可,对于已经在运行的任务,则需要另外设置terminate=True。
参考:
https://note.qidong.name/2021/09/celery-revoke/ https://docs.celeryq.dev/en/stable/userguide/workers.html

点个“赞 or 在看” 你最好看!

👇👇👇谢谢各位老板!!!




