gpt4 book ai didi

python - 启动 celery worker 并为广播队列启用它

转载 作者:行者123 更新时间:2023-11-28 22:27:16 34 4
gpt4 key购买 nike

我正在尝试启动 celery worker,因此它只监听单个队列。这不是问题,我可以这样做:

python -m celery worker -A my_module -Q my_queue -c 1

但现在我还希望这个 my_queue 队列成为一个广播队列,所以我在我的 celeryconfig 中这样做:

from kombu.common import Broadcast
CELERY_QUEUES = (Broadcast('my_queue'),)

但是一旦我这样做,我就无法再启动我的 worker,我从 rabbitmq 收到错误消息:

amqp.exceptions.PreconditionFailed: Exchange.declare: (406) PRECONDITION_FAILED - inequivalent arg 'type' for exchange 'my_queue' in vhost 'myvhost': received 'fanout' but current is 'direct'

如果我在没有 -Q 的情况下启动 worker(但如上所述将 Broadcast 留在 celeryconfig.py 中)并且我列出了 rabbitmq 队列,我可以看到广播队列是这样创建和命名的:

bcast.43fecba7-786a-461c-a322-620039b29b8b

同样,如果我在 worker 中定义这个队列(如上所述使用 -Q)或在 celeryconfig.py 中定义为简单的 Queue,例如这个:

from kombu import Queue
CELERY_QUEUES = (Queue('my_queue'),)

我可以像这样在 rabbitmq 中看到这个队列:

my_queue

看来在定义队列时我在 Broadcast 调用中放入什么并不重要 - 这似乎是内部 celery 名称,没有传递给 rabbitmq。

所以我猜当 worker 开始时 my_queue 被创建,一旦完成就不能进行 Broadcast

我可以让一个工作人员监听任何队列(不仅是 my_queue),我将从删除 -Q 参数开始。但是,如果能够有一个进程只监听那个特定的队列,那就太好了,因为我投入其中的任务速度很快,而且我想尽可能地降低延迟。

--- 编辑 1 ---花了一些时间解决这个问题,上面提到的 bcast 队列似乎并没有始终如一地出现。重置 rabbitmq 并在没有 -Q 选项的情况下运行 celery 后 bcast 队列没有出现...

最佳答案

当使用代理发送消息时,客户端和工作线程必须就相同的配置值达成一致。如果您必须更改配置,则需要清除现有消息并重新启动所有内容,以便它们同步。

启动广播队列时,您可以设置交换类型并配置队列。

from kombu.common import Broadcast
from kombu import Exchange


exchange = Exchange('custom_exchange', type='fanout')

CELERY_QUEUES = (
Broadcast(name='bcast', exchange=exchange),
)

现在你可以开始工作了

celery worker -l info -A tasks -Q bcast 

关于python - 启动 celery worker 并为广播队列启用它,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/44141958/

34 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com