gpt4 book ai didi

python - Celery、RabbitMQ、Redis : Celery message enters exchange, 但不排队?

转载 作者:可可西里 更新时间:2023-11-01 11:33:20 28 4
gpt4 key购买 nike

我正在使用 Python 2.7(叹息)、celery==3.1.19、librabbitmq==1.6.1、rabbitmq-server-3.5.6-1.noarch 和 redis 2.8.24(来自 redis-cli 信息).

我正在尝试从 celery 生产者向 celery 消费者发送消息,并在生产者中获取结果。有 1 个生产者和 1 个消费者,但中间有 2 个 rabbitmq(作为代理)和 1 个 redis(作为结果)。

我面临的问题是:

  1. 在消费者中,我通过 async_result = 返回一个 AsyncResultZipUp.delay(unique_directory),但 async_result.ready() 从不返回 True(至少 9 秒不返回)——即使是消费者任务,除了返回一个字符串外什么都不做。
  2. 我可以在 rabbitmq 管理 web 界面中看到我的消息被 rabbitmq 交换接收,但它没有出现在对应的rabbitmq队列。此外,由ZipUp 任务的最开始似乎没有得到记录。

如果我不尝试从 AsyncResult 返回结果,一切正常!但我有点希望得到调用的结果 - 它很有用 :)。

以下是配置细节。

我们按如下方式设置 Celery 以获取返回:

CELERY_RESULT_BACKEND = 'redis://%s' % _SHARED_WRITE_CACHE_HOST_INTERNAL
CELERY_RESULT = Celery('TEST', broker=CELERY_BROKER)
CELERY_RESULT.conf.update(
BROKER_HEARTBEAT=60,
CELERY_RESULT_BACKEND=CELERY_RESULT_BACKEND,
CELERY_TASK_RESULT_EXPIRES=100,
CELERY_IGNORE_RESULT=False,
CELERY_RESULT_PERSISTENT=False,
CELERY_ACCEPT_CONTENT=['json'],
CELERY_TASK_SERIALIZER='json',
CELERY_RESULT_SERIALIZER='json',
)

我们有另一个 Celery 配置,它不期望返回值,并且在同一个程序中有效。看起来像:

CELERY = Celery('TEST', broker=CELERY_BROKER)
CELERY.conf.update(
BROKER_HEARTBEAT=60,
CELERY_RESULT_BACKEND=CELERY_BROKER,
CELERY_TASK_RESULT_EXPIRES=100,
CELERY_STORE_ERRORS_EVEN_IF_IGNORED=False,
CELERY_IGNORE_RESULT=True,
CELERY_ACCEPT_CONTENT=['json'],
CELERY_TASK_SERIALIZER='json',
CELERY_RESULT_SERIALIZER='json',
)

celery 生产者的 stub 看起来像:

@CELERY_RESULT.task(name='ZipUp', exchange='cognition.workflow.ZipUp_%s' % INTERNAL_VERSION)
def ZipUp(directory): # pylint: disable=invalid-name
""" Task stub """
_unused_directory = directory
raise NotImplementedError

有人提到在这个 stub 中使用 queue= 而不是 exchange= 会更简单。任何人都可以证实这一点(我用谷歌搜索但没有找到关于该主题的任何内容)?显然你可以只使用 queue= 除非你想使用扇出或类似的东西,因为并不是所有的 celery 后端都有交换的概念。

无论如何, celery 消费者从:

@task(queue='cognition.workflow.ZipUp_%s' % INTERNAL_VERSION, name='ZipUp')
@StatsInstrument('workflow.ZipUp')
def ZipUp(directory): # pylint: disable=invalid-name
'''
Zip all files in directory, password protected, and return the pathname of the new zip archive.
:param directory Directory to zip
'''
try:
LOGGER.info('zipping up {}'.format(directory))

但是“压缩”不会在任何地方记录。我在 celery 服务器上的每个(磁盘支持的)文件中搜索了该字符串,并得到了两次匹配:/usr/bin/zip 和我的 celery 任务代码——没有日志消息。

有什么建议吗?

感谢阅读!

最佳答案

似乎在生产者中使用以下任务 stub 解决了问题:

@CELERY_RESULT.task(name='ZipUp', queue='cognition.workflow.ZipUp_%s' % INTERNAL_VERSION)
def ZipUp(directory): # pylint: disable=invalid-name
""" Task stub """
_unused_directory = directory
raise NotImplementedError

简而言之,它使用 queue= 而不是 exchange=。

关于python - Celery、RabbitMQ、Redis : Celery message enters exchange, 但不排队?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/38647974/

28 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com