Celery中其beat组件是一个scheduler(调度器), 它定期启动任务,然后由集群中的可用worker执行任务。
默认调度器-PersistentScheduler
默认情况下,celery-beat的scheduler是celery.beat:PersistentScheduler, 这种调度器将配置存储在本地shelve文件中。
shelve是什么?
shelve是一个简单的数据存储方案,类似key-value数据库,可以很方便的保存python对象,其内部是通过pickle协议来实现数据序列化。shelve有一个open函数,这个函数用于打开指定的文件,返回一个shelf对象,一种持久的、类似字段的对象。
celery-beat启动后,会在当前目录下生成一个celerybeat-schedule文件,可以使用python的shelve模块,进行读写。
import shelve
s = shelve.open('celerybeat-schedule', flag='r')
try:
for key, value in s.items():
print(key, value)
finally:
s.close()
然后执行上面的代码,会发现抛出了错误。
_gdbm.error: [Errno 11] Resource temporarily unavailable
原因是shelve不支持并发读写,可以将celerybeat-schedule文件复制一份到其他路径。
cp celerybeat-schedule /tmp/
修改代码
import shelve
s = shelve.open('/tmp/celerybeat-schedule', flag='r')
try:
for key, value in s.items():
print(key, value)
finally:
s.close()
结果如下图,其中名为entries的key,存储了celery配置的定时任务。

这种默认设置的问题是,很难动态地管理任务,比如添加或修改任务相关的执行配置。修改配置,往往需要修改配置文件后重启celery-beat,或者需要修改shelve文件(如上所示,也不方便)。
有没有办法,使用自定义调度器,自定义存储,将定时任务存储在数据库中,并动态地添加或修改这些调度相关的信息?我们想更新配置,直接操作数据库中的记录即可?
自定义调度器
celery有提供一个django-celery-beat扩展,它将schedule存储在Django数据库中,并提供了一个方便的管理界面在运行时管理定时任务。对于Django应用来说,可以拿来就用。如果对于非Django,可能需要自己实现了。git上找了下,有诸多大神已实现的多种存储方案,可供参考:
redis: https://github.com/sibson/redbeat mongodb: https://github.com/zmap/celerybeat-mongo sqlalchemy: https://github.com/AngelLiang/celery-sqlalchemy-scheduler/blob/master/README-zh.md https://github.com/tuomur/celery_sqlalchemy_scheduler
参考:
https://blog.csdn.net/qq_38684504/article/details/86629507
https://stackoverflow.com/questions/10194975/how-to-dynamically-add-remove-periodic-tasks-to-celery-celerybeat




