gpt4 book ai didi

python - 通过链 id 从 async python celery 链获取进度

转载 作者:太空狗 更新时间:2023-10-29 20:21:43 25 4
gpt4 key购买 nike

我试图通过查询每个任务状态来获取任务链的进度。但是当通过它的 id 检索链时,我得到了一些行为不同的对象。

在 tasks.py 中

from celery import Celery

celery = Celery('tasks')
celery.config_from_object('celeryconfig')

def unpack_chain(nodes):
while nodes.parent:
yield nodes.parent
nodes = nodes.parent
yield nodes

@celery.task
def add(num, num2):
return num + num2

从 ipython 查询时...

In [43]: from celery import chain
In [44]: from tasks import celery, add, unpack_chain
In [45]: c = chain(add.s(3,3), add.s(10).set(countdown=100))
In [46]: m = c.apply_async()
In [47]: a = celery.AsyncResult(m.id)
In [48]: a == m
Out[48]: True
In [49]: a.id == m.id
Out[49]: True
In [50]: [t.status for t in list(unpack_chain(a))]
Out[50]: ['PENDING']
In [51]: [t.status for t in list(unpack_chain(m))]
Out[51]: ['PENDING', 'SUCCESS']

在 Redis 下使用 Python 2.7.3 和 Celery 3.0.19。

正如您在 50 & 51 中看到的,celery.AsyncResult 返回的值与原始链不同。

如何通过链id获取原链任务列表?

最佳答案

就像@Hernantz 所说的那样,您无法仅从任务 ID 恢复父链,您必须遍历您的队列,这可能会也可能不会,具体取决于您使用的代理。

但是如果你有最后一个任务 ID 来进行查找,那么你就有了链,你只需要存储所有任务 ID 并在需要检查它们的状态时重建链。您可以使用以下功能:

def store(node):
id_chain = []
while node.parent:
id_chain.append(node.id)
node = node.parent
id_chain.append(node.id)
return id_chain

def restore(id_chain):
id_chain.reverse()
last_result = None
for tid in id_chain:
result = celery.AsyncResult(tid)
result.parent = last_result
last_result = result
return last_result

当您第一次从链中获取 AsyncResult 时调用存储。调用 restore 会给你一个 AsyncResult 的链接列表,就像 chain 给你的一样。

关于python - 通过链 id 从 async python celery 链获取进度,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/16306175/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com