gpt4 book ai didi

Python celery - 如何等待和弦中的所有子任务

转载 作者:太空宇宙 更新时间:2023-11-04 03:10:27 24 4
gpt4 key购买 nike

我正在对 celery 任务进行单元测试。我有也有组的链式任务,因此产生了和弦。

测试应该是这样的:

  • 运行 celery 任务(延迟)
  • 等待任务和所有子任务
  • 断言

我尝试了以下方法:

def wait_for_result(result):
result.get()
for child in result.children or list():
if isinstance(child, GroupResult):
# tried looping over task result in group
# until tasks are ready, but without success
pass
wait_for_result(child)

这会造成死锁,chord_unlock 将永远重试。我对任务结果不感兴趣。如何等待所有子任务完成?

最佳答案

虽然这是一个老问题,但我只是想分享一下我是如何摆脱死锁问题的,以防它能帮助到别人。

正如 celery 日志所说,永远不要在任务中使用 get()。这确实会造成僵局。

我有一组类似的 celery 任务,其中包括一组组任务,因此使它成为一个和弦。我通过发出 HTTP 请求使用 Tornado 调用这些任务。所以我所做的是这样的:

@task
def someFunction():
....


@task
def someTask():
....


@task
def celeryTask():
groupTask = group([someFunction.s(i) for i in range(10)])

job = (groupTask| someTask.s())

return job

celeryTask() 被 tornado 调用时,链将开始执行,并且 someTask() 的 UUID 将保存在 job。它看起来像

AsyncResult: 765b29a8-7873-4b28-b05c-7e19c33e950c

这个 UUID 被返回并且 celeryTask() 在链开始执行之前退出(理想情况下),因此为另一个进程运行留出空间。

然后我使用 Tornado 层来检查任务的状态。有关 Tornado 层的详细信息,请参阅此 stackoverflow question

关于Python celery - 如何等待和弦中的所有子任务,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/38206416/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com