gpt4 book ai didi

python - 你如何确保在失败的子任务中调用 Celery chord 回调?

转载 作者:太空宇宙 更新时间:2023-11-03 12:01:34 24 4
gpt4 key购买 nike

我在 Celery 中使用 Chord 来实现一个回调,当一组并行任务完成执行时会调用该回调。具体来说,我有一组函数可以包装对外部 API 的调用。在我处理结果并在 Chord 回调中更新我的数据库之前,我想等待所有这些返回。我希望回调在所有 API 调用完成时执行,无论它们的状态如何。

我的问题是回调函数只有在组的子任务都没有引发异常时才会被调用。但是,如果一个子任务引发异常,则会调用一个可选的错误处理程序 on_error(),并使用和弦的 task_id 的字符串表示形式进行调用。组中的其余任务会继续执行,但永远不会调用回调。

我将用下面的例子来说明这一点:

@app.task
def maybe_succeed():
divisor = randint(0, 10)
return 1 / divisor

@app.task
def master_task():
g = group([maybe_succeed.s() for i in range(100)])
c = g | chord_callback.s()
return c.delay()

@app.task
def chord_callback(results):
print 'Made it here!'

在上面的示例中,调用 master_task() 将运行组中的所有任务,但是,回调永远不会被调用,因为其中一个 maybe_succeed() 会失败(除非你 super 幸运!)。


现在,我正在处理这个问题,方法是在我的等效 maybe_succeed() 中捕获所有异常,这样和弦就永远不会失败。我想这是一个很好的解决方案,尽管它感觉不对。

所以,我的问题是:有没有办法让 Celery Chord 回调执行而不管其组的子任务的返回状态?

最佳答案

您可以尝试在 errback 中调用原始回调:

@celery.task
def plus(x, y):
print(f'Running plus {x}, {y}')
return x + y


@celery.task
def failure():
print('Running failure')
raise ValueError('BAD')


@celery.task
def callme(stuff):
print('Callback')
print(f'Callback arg: {stuff}')


@celery.task
def on_chord_error(task_id, extra_info):
print('ON ERROR CALLBACK')
print(f'Task ID: {task_id}')
print(f'Extra info: {extra_info}')
callme.delay(extra_info)


@celery.task
def chord_test():
tasks = [plus.s(1, 1), plus.s(2, 2), failure.s(), plus.s(3, 3)]
callback = callme.s().on_error(on_chord_error.s('extra info'))
chord(tasks)(callback)

结果是:

Received task: tasks.plus[b0d084a5-0956-4f13-bf0d-580a3e3cd55e]
Running plus 1, 1
Task tasks.plus[b0d084a5-0956-4f13-bf0d-580a3e3cd55e] succeeded in 0.020222999999532476s: 2
Received task:tasks.plus[44a9d306-a0a5-4a7d-b71d-0fef56fe3481]
Running plus 2, 2
Task tasks.plus[44a9d306-a0a5-4a7d-b71d-0fef56fe3481] succeeded in 0.019981499994173646s: 4
Task tasks.chord_test[b6173c52-aa62-4dad-84f2-f3df2e1efcd1] succeeded in 0.45647509998525493s: None
Received task: tasks.failure[3880e8bd-2a09-4735-bb5f-9a49e992dfee]
Running failure
Task tasks.failure[3880e8bd-2a09-4735-bb5f-9a49e992dfee] raised unexpected: ValueError('BAD',)
Received task: tasks.plus[b3290ce9-fc74-45f2-a820-40bd6dea8473]
Running plus 3, 3
Task tasks.plus[b3290ce9-fc74-45f2-a820-40bd6dea8473] succeeded in 0.016270199994323775s: 6
celery.chord_unlock[0f37fa4d-4f12-4c65-9e08-b69f0cf2afd7] ETA:[2018-09-14 03:08:58.441070+00:00]
Chord 'dadece86-d399-4e64-b63a-f02a2a3de434' raised: ValueError('BAD',)
Traceback (most recent call last):
File "/home/flask/.local/lib/python3.6/site-packages/celery/app/builtins.py", line 81, in unlock_chord
ret = j(timeout=3.0, propagate=True)
File "/home/flask/.local/lib/python3.6/site-packages/celery/result.py", line 739, in join
interval=interval, no_ack=no_ack, on_interval=on_interval,
File "/home/flask/.local/lib/python3.6/site-packages/celery/result.py", line 213, in get
self.maybe_throw(callback=callback)
File "/home/flask/.local/lib/python3.6/site-packages/celery/result.py", line 329, in maybe_throw
self.throw(value, self._to_remote_traceback(tb))
File "/home/flask/.local/lib/python3.6/site-packages/celery/result.py", line 322, in throw
self.on_ready.throw(*args, **kwargs)
File "/home/flask/.local/lib/python3.6/site-packages/vine/promises.py", line 217, in throw
reraise(type(exc), exc, tb)
File "/home/flask/.local/lib/python3.6/site-packages/vine/five.py", line 179, in reraise
raise value
ValueError: BAD
Received task: tasks.on_chord_error[cf3056bc-34ea-4681-87e7-cded53acb958]
Task celery.chord_unlock[0f37fa4d-4f12-4c65-9e08-b69f0cf2afd7] succeeded in 0.12482409999938682s: None
ON ERROR CALLBACK
Task ID: fe3dae19-0641-47fa-9c4d-953b868992e7
Extra info: extra info
Received task: tasks.callme[d6dfd6c0-f0d9-474f-9d98-be43e031de69]
Callback
Callback arg: extra info

关于python - 你如何确保在失败的子任务中调用 Celery chord 回调?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/46801448/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com