gpt4 book ai didi

python - Celery+RabbitMQ 上任务进度未更新最新状态

转载 作者:行者123 更新时间:2023-11-28 22:45:06 30 4
gpt4 key购买 nike

我用 custom states 实现了长任务的进度反馈在 Celery + RabbitMQ 结果后端上。

但是调用者无法像我预期的那样检索最新的进度状态。在下面的代码中,result.info['step'] 总是返回0,然后任务将以“result=42”结束.

# tasks.py -- celery worker
from celery import Celery
app = Celery('tasks', backend='amqp', broker='amqp://guest@localhost//')

@app.task
def long_task():
for i in range(0, 10):
timer.sleep(10) # some work
self.update_state(state='PROGRESS', meta={'step': i})
return 42


# caller.py
from tasks import long_task
result = long_task.delay()

while not (result.successful() or result.failed()):
try:
result.get(timeout=1)
except celery.exceptions.TimeoutError:
if result.state == 'PROGRESS':
print("progress={}".format(result.info['step']))
print("result={}".format(result.get()))

python 3.4.1/ celery 3.1.17/RabbitMQ 3.4.4

最佳答案

我认为这是一个微妙的时间问题,结合 RabbitMQ result backend 的事实将任务结果作为消息发送,并且只能检索一次。

预先简短回答:避免调用 result.get(),直到您真正需要最终结果:

while not result.ready():
if result.state == "PROGRESS":
print("progress={}".format(result.info['step']))
time.sleep(1)
print("result={}".format(result.get()))
# +additional cleanup: see comments below

更长的答案是,这里实际上有两种方法(和一个属性)在起作用,它们与 AMQP 后端对话:

  • AsyncResult.get()

    电话 AMQPBackend.wait_for() ,它会消耗任务队列中的所有结果,直到状态为 celery.states.READY_STATES 的结果出现。

  • AsyncResult.successful(), AsyncResult.failed(), AsyncResult.info

    电话 AMQPBackend.get_task_meta() ,它消耗任务队列中的所有结果,然后缓存并返回最新的结果。如果未检索到任何消息,后端将返回缓存结果或 PENDING 结果。注:最新消息为requeued在后端,如果它是 the final result ,它将被 AsyncResult 实例1 缓存。

调用 result.get() 将消耗所有状态更新,result.info 没有机会提供最新的进度报告;相反,它很可能是一个陈旧的缓存,其中一个对 AsyncResult.get_task_meta() 的调用在某个时候设法获取了。

因此,根据时间的不同,step 可能会在下一个最坏的情况下停留在 0,其中最坏的情况是 PROGRESS 状态永远不会联系来电者。

1因为最终结果在通过调用 get_task_meta() 获取时会被重新排队和缓存,所以您需要手动清空队列,如在下面评论。

关于python - Celery+RabbitMQ 上任务进度未更新最新状态,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/28870679/

30 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com