celery任务重复执行的问题,出现在选择redis作为broker的场景。
见官网文档:https://docs.celeryproject.org/en/v5.1.2/getting-started/backends-and-brokers/redis.html?highlight=visibility_timeout#visibility-timeout
备注:以下文章信息基于celery 5.1.2版本,不同版本情况可能不同。
1.什么是Visibility Timeout?
Visibility Timeout(可见性超时),官网解释如下
The visibility timeout defines the number of seconds to wait for the worker to acknowledge the task before the message is redelivered to another worker. Be sure to see Caveats below.
也就是说celery会等待消息被确认(acknowlege),如果等待时间超过Visibility Timeout设定的值,那么celery会重新给worker发送消息(redelivered)。
2.怎么理解acknowlege?
或者说消息被acknowlege的时机是什么时候?消息被received? 开始执行?还是执行结束?
默认情况下acknowlege是在消息开始执行时被worker返回给broker的。
当我们设置worker为acks_late=True之后,那么worker则会在任务执行结束才会给broker发送ack信息。celery还提供了Task级别的acks_late,可以单独控制某个任务是否是采用ack late。
没有被acknowlege的消息会被存储在名为unpacked的键中。
3.什么情况下会导致任务重复发送?
1)ETA and Countdown任务,延时时间超过visibility_timeout。
max_timeout_in_seconds = 30
app.conf.broker_transport_options = {"visibility_timeout": max_timeout_in_seconds}
@app.task()
def waiter(sleep_time):
# sleep(sleep_time)
return 'task finished'
if __name__ == "__main__":
# async_result = waiter.delay(3610)
result = waiter.apply_async((3610, ), countdown=300)
print(result)
如下图所示,id为f7626bbb-f3b2-4a66-bda3-ff152c00f055的task被重复执行。
2)设置了acks_late=True,但是任务执行耗时超过了visibility_timeout。
max_timeout_in_seconds = 30
app.conf.broker_transport_options = {"visibility_timeout": max_timeout_in_seconds}
@app.task(acks_late=True)
def waiter(sleep_time):
sleep(sleep_time)
return 'task finished'
if __name__ == "__main__":
async_result = waiter.delay(3610)
print(result)
4.怎么配置Visibility Timeout?
app.conf.broker_transport_options = {'visibility_timeout': 3600}
5.怎么解决任务重复执行问题?
1)比较粗的做法是将visibility timeout 这个配置的值调到足够的大。比如大到超过任务使用的最长的eta延时时间。
2)加分布式锁:使用唯一标识keyid,配合redis的原子操作setnx执行前判断是否在cache中存在,如果存在着直接返回,不执行业务逻辑。
参考:
https://github.com/celery/celery/issues/5935 https://github.com/celery/celery/issues/3270




