gpt4 book ai didi

python - Celery 中的顺序任务执行

转载 作者:太空宇宙 更新时间:2023-11-03 16:17:01 25 4
gpt4 key购买 nike

我有一些对数据库的“繁重”请求,我将使用 Celery 执行这些请求。考虑到它们很“重”,我想按顺序执行它们(一一执行)。一种可能的解决方案是指定 --concurrency=1在 Celery 的命令行中。它有效。但有一个问题:在执行任务时,所有以下请求都会返回 None :

from celery.task.control import inspect

# Inspect all nodes.
i = inspect()

print(i.scheduled()) # None
print(i.active()) # None
print(i.reserved()) # None
print(i.registered()) # None

另外,运行 celery inspect ping返回Error: No nodes replied within time constraint.这样我就无法收到有关 Celery 队列状态的任何信息。

这是我的测试 python 模块:

celeryconfig.py

#BROKER_URL = 'redis://localhost:6379/0'
BROKER_URL = 'amqp://'
#CELERY_RESULT_BACKEND = "redis"
CELERY_RESULT_BACKEND = "amqp://"
# for php
CELERY_TASK_RESULT_EXPIRES = None
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
CELERY_ACKS_LATE = True

tasks.py

from celery import Celery
from time import sleep

app = Celery('hello')
app.config_from_object('celeryconfig')

@app.task
def add(x, y):
sleep(30)
return x + y

client.py

from tasks import add

result=add.delay(4, 4)
result=add.delay(4, 4)
result=add.delay(4, 4)
result=add.delay(4, 4)
result=add.delay(4, 4)
result=add.delay(4, 4)

那么,问题是,如何一项一项地运行任务并能够检查队列的状态?

最佳答案

Celery 的检查是通过向任何监听的对象广播查询然后收集响应来执行的。任何在超时(我认为默认为 1 秒)内未响应的工作人员都将被忽略。就像不存在一样。

您使用 --concurrency=1 应该不是问题。我刚刚尝试过,在这里效果很好。即使并发数为 1,Celery 工作线程通常也会有一个额外的执行线程用于通信。 (我说“通常”是因为我确信有办法配置 Celery 搬起石头砸自己的脚。我所说的与默认值保持一致。)当我尝试 --concurrency=1 时,有实际上每个工作线程有两个线程。因此,即使工作线程正忙于计算任务,也应该有一个线程能够响应广播。

话虽这么说,如果机器负载很重,那么工作人员可能需要很长时间才能做出响应。我解决此问题的方法是重试 i.scheduled() 等调用,直到得到每个人的答复。在我的项目中,我知道应该启动并运行多少个工作人员,因此我有一个列表,可以用来了解每个人是否都做出了响应。

关于python - Celery 中的顺序任务执行,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/38867489/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com