暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

celery之broadcast queue

3222

如果不特殊指定,celery将会创建名为celery的默认队列,用于消息传递。我们实际使用celery时,可能会有多种不同的任务,往往需要将他们交给不同的worker处理。只使用默认的celery队列无法满足我们的需求。所以我们需要将不同的task路由到不同的队列

路由和队列

图片来源:https://blog.csdn.net/xsj_blog/article/details/70181984

celery中消息分发与任务调度的实现机制如上图所示:
producer发出请求, celery服务启动时,会产生一个或多个交换机,对应的交换机接收请求,根据message的内容,将message分发到一个或多个符合条件的队列,每个队列上都有一个或多个worker在监听,在监听到符合条件的message到达后,worker负责进行任务处理,任务处理完被确认后,队列中的message将被删除。

我们可以在celery中配置任务的路由规则。

from kombu import Queue

app.conf.task_queues = (
    Queue('default',    routing_key='task.#'),
    Queue('feed_tasks', routing_key='feed.#'),
)

app.conf.task_routes = {
        'feeds.tasks.import_feed': {
            'queue''feed_tasks',
            'routing_key''feed.import',
        },
}


task_queue: 在celery中存在的队列可以通过task_queues来设置。
task_routes:指定路由规则,要将任务路由到feed_tasks队列,则如上面代码所示进行配置。

要使worker只处理来自feed队列的消息,我们可以使用 celery worker -Q feed_tasks 来启动服务:

celery -A proj worker -Q feed_tasks -l info


广播路由

上述的路由配置,生产者生产的消息默认只会被一个worker消费。如果我们想要将所有监听queue的worker都执行任务,就需要配置广播路由。

Celery 也支持广播路由。下面是一个 broadcast_tasks 交换机的示例, 它将任务分发给所有连接到它的worker:

from kombu.common import Broadcast

app.conf.task_queues = (Broadcast('broadcast_tasks'),)
app.conf.task_routes = {
    'tasks.reload_cache': {
        'queue''broadcast_tasks',
        'exchange''broadcast_tasks'
    }
}

上面是官网提供的例子,当broker 是redis时,发现并没有起作用,需要做些改动。

from kombu.common import Broadcast
q = Queue('broadcast_tasks', Exchange('broadcast_tasks'type='fanout'), )
app.conf.task_queues = (q, )
app.conf.task_routes = {
        'tasks.reload_cache': {
        'queue''broadcast_tasks'
    }
}

其他

常用远程控制命令:

# 观察当前处于活跃状态的worker和task
celery -A proj inspect active 

# 向worker中增加对某队列的消费
celery control -d xxx add_consumer queue_name

# 观察当前worker状态

celery -A proj status




写在最后

今天是20200101,新年伊始,万象更新。在此遥祝大家:诸事顺遂,得偿所愿。




参考: 

https://github.com/celery/celery/issues/3740 

https://blog.csdn.net/u014686399/article/details/108387672 

https://github.com/celery/celery/issues/3740

https://linpingta.github.io/blog/2016/03/19/celery-2/

https://blog.csdn.net/xsj_blog/article/details/70181984


文章转载自PostgreSQL运维技术,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论