gpt4 book ai didi

python - celery apply_async 窒息 rabbitmq

转载 作者:行者123 更新时间:2023-11-28 19:14:58 27 4
gpt4 key购买 nike

我正在使用 celery 的 apply_async 方法对任务进行排队。我预计每天大约有 100,000 个这样的任务运行(数量只会增加)。我正在使用 RabbitMQ 作为代理。几天前我运行代码,几个小时后 RabbitMQ 崩溃了。我注意到 apply_async 为每个任务创建了一个新队列,x-expires 设置为 1 天。我的假设是,当创建如此多的队列时,RabbitMQ 会阻塞。我怎样才能阻止 celery 为每个任务创建这些额外的队列?

我还尝试将队列参数赋予 apply_async 并为该队列分配一个 x-message-ttl。消息确实进入了这个新队列,但是它们立即被消耗并且从未达到我设置的 30 秒的 ttl。这并没有阻止 celery 创建这些额外的队列。

这是我的代码:

View .py

   from celery import task, chain

chain(task1.s(a), task2.s(b),)
.apply_async(link_error=error_handler.s(a), queue="async_tasks_queue")

任务.py

from celery.result import AsyncResult

@shared_task
def error_handler(uuid, a):
#Handle error

@shared_task
def task1(a):
#Do something
return a

@shared_task
def task2(a, b):
#Do something more

celery .py

app = Celery(
'app',
broker=settings.QUEUE_URL,
backend=settings.QUEUE_URL,
)

app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)


app.amqp.queues.add("async_tasks_queue", queue_arguments={'durable' : True , 'x-message-ttl': 30000})

来自 celery 原木:

[2016-01-05 01:17:24,398: INFO/MainProcess] Received task: project.tasks.task1[615e094c-2ec9-4568-9fe1-82ead2cd303b]

[2016-01-05 01:17:24,834: INFO/MainProcess] Received task: project.decorators.wrapper[bf9a0a94-8e71-4ad6-9eaa-359f93446a3f]

在执行这些任务时,RabbitMQ 有 2 个名为“615e094c2ec945689fe182ead2cd303b”和“bf9a0a948e714ad69eaa359f93446a3f”的新队列我的代码在 Django 1.7.7、celery 3.1.17 和 RabbitMQ 3.5.3 上运行。

也欢迎任何其他异步执行任务的建议

最佳答案

尝试使用不同的后端 - 我推荐 Redis。当我们尝试使用 Rabbitmq 作为代理和后端时,我们发现它不适合代理角色。

关于python - celery apply_async 窒息 rabbitmq,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/34599408/

27 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com