gpt4 book ai didi

python - Celery 动态队列创建和路由

转载 作者:太空狗 更新时间:2023-10-29 20:48:04 29 4
gpt4 key购买 nike

我正在尝试调用一个任务并为该任务创建一个队列(如果它不存在),然后立即将调用的任务插入到该队列中。我有以下代码:

@task
def greet(name):
return "Hello %s!" % name


def run():
result = greet.delay(args=['marc'], queue='greet.1',
routing_key='greet.1')
print result.ready()

然后我有一个自定义路由器:

class MyRouter(object):

def route_for_task(self, task, args=None, kwargs=None):
if task == 'tasks.greet':
return {'queue': kwargs['queue'],
'exchange': 'greet',
'exchange_type': 'direct',
'routing_key': kwargs['routing_key']}
return None

这会创建一个名为 greet.1 的交易所和一个名为 greet.1 的队列但队列是空的。交易所应该叫greet它知道如何路由像 greet.1 这样的路由键到名为 greet.1 的队列.

有什么想法吗?

最佳答案

当您执行以下操作时:

task.apply_async(queue='foo', routing_key='foobar')

然后 Celery 将从 CELERY_QUEUES 中的 'foo' 队列中获取默认值,或者如果它不存在则使用 (queue=foo, exchange=foo, routing_key=foo) 自动创建它

因此,如果 CELERY_QUEUES 中不存在“foo”,您将得到:

queues['foo'] = Queue('foo', exchange=Exchange('foo'), routing_key='foo')

然后生产者将声明该队列,但是由于您覆盖了 routing_key,实际上使用 routing_key = 'foobar'

发送消息

这可能看起来很奇怪,但这种行为实际上对话题交流很有用,您发布到不同主题的地方。

虽然做你想做的事比较难,你可以自己创建队列并声明它,但这不适用于自动消息发布重试。如果 apply_async 的队列参数可以支持就更好了一个自定义的 kombu.Queue 而不是将被声明并用作目的地。也许你可以在 http://github.com/celery/celery/issues 上为此打开一个问题

关于python - Celery 动态队列创建和路由,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/11882826/

29 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com