如果不特殊指定,celery将会创建名为celery的默认队列,用于消息传递。我们实际使用celery时,可能会有多种不同的任务,往往需要将他们交给不同的worker处理。只使用默认的celery队列无法满足我们的需求。所以我们需要将不同的task路由到不同的队列。
路由和队列
图片来源:https://blog.csdn.net/xsj_blog/article/details/70181984
celery中消息分发与任务调度的实现机制如上图所示:
producer发出请求, celery服务启动时,会产生一个或多个交换机,对应的交换机接收请求,根据message的内容,将message分发到一个或多个符合条件的队列,每个队列上都有一个或多个worker在监听,在监听到符合条件的message到达后,worker负责进行任务处理,任务处理完被确认后,队列中的message将被删除。
我们可以在celery中配置任务的路由规则。
from kombu import Queue
app.conf.task_queues = (
Queue('default', routing_key='task.#'),
Queue('feed_tasks', routing_key='feed.#'),
)
app.conf.task_routes = {
'feeds.tasks.import_feed': {
'queue': 'feed_tasks',
'routing_key': 'feed.import',
},
}
task_queue: 在celery中存在的队列可以通过task_queues来设置。
task_routes:指定路由规则,要将任务路由到feed_tasks队列,则如上面代码所示进行配置。
要使worker只处理来自feed队列的消息,我们可以使用 celery worker -Q feed_tasks 来启动服务:
celery -A proj worker -Q feed_tasks -l info
广播路由
上述的路由配置,生产者生产的消息默认只会被一个worker消费。如果我们想要将所有监听queue的worker都执行任务,就需要配置广播路由。
Celery 也支持广播路由。下面是一个 broadcast_tasks 交换机的示例, 它将任务分发给所有连接到它的worker:
from kombu.common import Broadcast
app.conf.task_queues = (Broadcast('broadcast_tasks'),)
app.conf.task_routes = {
'tasks.reload_cache': {
'queue': 'broadcast_tasks',
'exchange': 'broadcast_tasks'
}
}
上面是官网提供的例子,当broker 是redis时,发现并没有起作用,需要做些改动。
from kombu.common import Broadcast
q = Queue('broadcast_tasks', Exchange('broadcast_tasks', type='fanout'), )
app.conf.task_queues = (q, )
app.conf.task_routes = {
'tasks.reload_cache': {
'queue': 'broadcast_tasks'
}
}
其他
常用远程控制命令:
# 观察当前处于活跃状态的worker和task
celery -A proj inspect active
# 向worker中增加对某队列的消费
celery control -d xxx add_consumer queue_name
# 观察当前worker状态
celery -A proj status
写在最后
今天是20200101,新年伊始,万象更新。在此遥祝大家:诸事顺遂,得偿所愿。

参考:
https://github.com/celery/celery/issues/3740
https://blog.csdn.net/u014686399/article/details/108387672
https://github.com/celery/celery/issues/3740
https://linpingta.github.io/blog/2016/03/19/celery-2/
https://blog.csdn.net/xsj_blog/article/details/70181984




