gpt4 book ai didi

python - 如何在 celery 中创建和弦,其任务就是和弦本身

转载 作者:太空宇宙 更新时间:2023-11-03 18:36:50 24 4
gpt4 key购买 nike

我正在尝试在 celery 中创建一个和弦,其组中的任务就是和弦本身。所以想法是 - 每个单独的子和弦首先执行,然后这些和弦的结果最终由父任务的回调使用。这是我正在尝试执行的代码

--任务.py

import celery
from celery import chord

@celery.task
def add(x, y):
print "add called..."
return x + y

@celery.task
def tsum(numbers):
return sum(numbers)

@celery.task
def gr_add(x):
print "**** gr_add called"
c = chord(add.subtask((i,i)) for i in range(2*x, 2*x+2))
r = c(tsum.subtask())
return r.get(timeout=120)

-- 然后我的 test-chord-chord.py 文件包含

from tasks import add, tsum, gr_add
from celery import chord

c = chord(gr_add.subtask((i,)) for i in range(2))
result = c(tsum.subtask())
print result.get(timeout=5)

但是,这并没有按预期执行。我做得正确吗?还有其他方法可以实现上述目标吗?

最佳答案

我终于能弄清楚问题出在哪里了。虽然不太确定如何解决它。罪魁祸首是对上面 gr_add 中回调的调用,特别是行 - r = c(tsum.subtask())。这将阻塞 - 因此运行此任务的工作人员将阻塞。现在,当我只有一名 worker 时,上述问题就出现了。当我启动多个工作人员(在本例中具体是三个 - 因为原始和弦创建两个和弦)时,每个工作人员仅执行一项任务 CELERY_PREFTCH_MULTIPLIER = 1CELERY_ACKS_LATE=True。三个工作人员拿起了队列中的任务 - 第一个拿起了第一个和弦,现在被阻止了。第二个拾取了第二个和弦,现在被阻止了。第三个任务完成了第一个和弦的第一个任务 - 计算,退出 - 畅通无阻。拿起下一个任务 - 计算/退出 - 解锁。第一个现在已经解封了。拿起第二和弦的第一个任务 - 计算/退出 - 畅通无阻。 1 号或 3 号工作人员之一计算了和弦的第二个子任务,计算完毕后退出。所有三个现在都已解除阻止,并且结果按预期计算。一般来说 - 这是一个糟糕的设计 - 我应该做的是“创建另一个子任务 - 获取和弦的结果,如果它们尚未成功,则在一段时间后重试。这些重试间隔之间的时间将允许工作人员完成各个子任务,然后最终可以调用主调用弦的回调。

编辑1:终于能够弄清楚如何处理这个问题 - 阻止外弦调用的任务是一个坏主意。相反,我所做的是 - 从被调用的任务返回 AsyncResult然后创建一个轮询器任务来轮询结果是成功还是失败。然后在这个 poller 任务中计算结果。此任务会在特定持续时间后重试。 (这几乎就像unlock_chord 任务 - 只是我自己在由代码控制的单独任务中完成了此操作 - 而不是让 celery 处理它)

关于python - 如何在 celery 中创建和弦,其任务就是和弦本身,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/21402153/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com