gpt4 book ai didi

python - 如果它的子任务给出异常,如何使链失败

转载 作者:太空狗 更新时间:2023-10-29 22:11:39 28 4
gpt4 key购买 nike

我在使用 celery 时遇到了一个非常奇怪的问题:

有一个任务链,其中一个给出异常并重试几次

chain = (err.si(1) | err.si(2))
result = chain.apply_async()
result.state
result.get()

这是任务的代码:

@celery.task(base=MyTask)
def err(x):
try:
if x < 3:
raise Exception
else:
return x+1

except Exception as exp:
print "retrying"
raise err.retry(args=[x],exc=exp,countdown=5,max_retries=3)

事情是,虽然链中的任务给出了一个异常,但 result.state 一直是“PENDING”并且 .get() 只是卡住。

如果任务达到最大重试值,我已尝试使任务失败:

class MyTask(celery.Task):
abstract = True
def after_return(self, status, retval, task_id, args, kwargs, einfo):
if self.max_retries == self.request.retries:
self.state = states.FAILURE

但是尽管单独执行的任务被标记为 FAILED,但在链中执行会给出相同的结果 - PENDING 和 Freezed get。

我预计链会在它的任何任务失败时失败,并且 .get 的结果应该会产生从任务中抛出的异常。

_更新_apply_async 使用 ALWAYS_EAGER=True 给出的堆栈跟踪

result = chain.apply_async()

Exception
Traceback (most recent call last)
<ipython-input-4-81202b369b5f> in <module>()
----> 1 result = chain.apply_async()

lib/python2.7/site-packages/celery/canvas.pyc in apply_async(self, args, kwargs, **options)
147 # For callbacks: extra args are prepended to the stored args.
148 args, kwargs, options = self._merge(args, kwargs, options)
--> 149 return self.type.apply_async(args, kwargs, **options)
150
151 def append_to_list_option(self, key, value):

/lib/python2.7/site-packages/celery/app/builtins.pyc in apply_async(self, args, kwargs, group_id, chord, task_id, **options)
232 task_id=None, **options):
233 if self.app.conf.CELERY_ALWAYS_EAGER:
--> 234 return self.apply(args, kwargs, **options)
235 options.pop('publisher', None)
236 tasks, results = self.prepare_steps(args, kwargs['tasks'])

lib/python2.7/site-packages/celery/app/builtins.pyc in apply(self, args, kwargs, subtask, **options)
249 last, fargs = None, args # fargs passed to first task only
250 for task in kwargs['tasks']:
--> 251 res = subtask(task).clone(fargs).apply(last and (last.get(), ))
252 res.parent, last, fargs = last, res, None
253 return last

lib/python2.7/site-packages/celery/result.pyc in get(self, timeout, propagate, **kwargs)
677 elif self.state in states.PROPAGATE_STATES:
678 if propagate:
--> 679 raise self.result
680 return self.result
681 wait = get

Exception:

最佳答案

当你有一条链时:

>>> c = a.s() | b.s() | c.s()
>>> res = c()
>>> res.get()

调用链将为链中的所有任务生成唯一 ID,发送消息并返回链中最后的结果

因此,当您执行 res.get() 时,您只是简单地尝试检索链中最后一个任务的结果。

它还会用 parent 属性装饰结果,你可以遍历它来获取链的进度:

>>> res                # result of c.s()
>>> res.parent # result of b.s()
>>> res.parent.parent # result of a.s()

如果你想沿途检查错误,你可以这样做:

def nodes(node):
while node.parent:
yield node
node = node.parent
yield node


values = [node.get(timeout=1) for node in reversed(list(nodes(res)))]
value = values[-1]

关于python - 如果它的子任务给出异常,如何使链失败,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/12660994/

28 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com