- android - 多次调用 OnPrimaryClipChangedListener
- android - 无法更新 RecyclerView 中的 TextView 字段
- android.database.CursorIndexOutOfBoundsException : Index 0 requested, 光标大小为 0
- android - 使用 AppCompat 时,我们是否需要明确指定其 UI 组件(Spinner、EditText)颜色
我在 Celery 中使用 Chord 来实现一个回调,当一组并行任务完成执行时会调用该回调。具体来说,我有一组函数可以包装对外部 API 的调用。在我处理结果并在 Chord 回调中更新我的数据库之前,我想等待所有这些返回。我希望回调在所有 API 调用完成时执行,无论它们的状态如何。
我的问题是回调函数只有在组的子任务都没有引发异常时才会被调用。但是,如果一个子任务引发异常,则会调用一个可选的错误处理程序 on_error()
,并使用和弦的 task_id
的字符串表示形式进行调用。组中的其余任务会继续执行,但永远不会调用回调。
我将用下面的例子来说明这一点:
@app.task
def maybe_succeed():
divisor = randint(0, 10)
return 1 / divisor
@app.task
def master_task():
g = group([maybe_succeed.s() for i in range(100)])
c = g | chord_callback.s()
return c.delay()
@app.task
def chord_callback(results):
print 'Made it here!'
在上面的示例中,调用 master_task()
将运行组中的所有任务,但是,回调永远不会被调用,因为其中一个 maybe_succeed()
会失败(除非你 super 幸运!)。
现在,我正在处理这个问题,方法是在我的等效 maybe_succeed()
中捕获所有异常,这样和弦就永远不会失败。我想这是一个很好的解决方案,尽管它感觉不对。
所以,我的问题是:有没有办法让 Celery Chord 回调执行而不管其组的子任务的返回状态?
最佳答案
您可以尝试在 errback 中调用原始回调:
@celery.task
def plus(x, y):
print(f'Running plus {x}, {y}')
return x + y
@celery.task
def failure():
print('Running failure')
raise ValueError('BAD')
@celery.task
def callme(stuff):
print('Callback')
print(f'Callback arg: {stuff}')
@celery.task
def on_chord_error(task_id, extra_info):
print('ON ERROR CALLBACK')
print(f'Task ID: {task_id}')
print(f'Extra info: {extra_info}')
callme.delay(extra_info)
@celery.task
def chord_test():
tasks = [plus.s(1, 1), plus.s(2, 2), failure.s(), plus.s(3, 3)]
callback = callme.s().on_error(on_chord_error.s('extra info'))
chord(tasks)(callback)
结果是:
Received task: tasks.plus[b0d084a5-0956-4f13-bf0d-580a3e3cd55e]
Running plus 1, 1
Task tasks.plus[b0d084a5-0956-4f13-bf0d-580a3e3cd55e] succeeded in 0.020222999999532476s: 2
Received task:tasks.plus[44a9d306-a0a5-4a7d-b71d-0fef56fe3481]
Running plus 2, 2
Task tasks.plus[44a9d306-a0a5-4a7d-b71d-0fef56fe3481] succeeded in 0.019981499994173646s: 4
Task tasks.chord_test[b6173c52-aa62-4dad-84f2-f3df2e1efcd1] succeeded in 0.45647509998525493s: None
Received task: tasks.failure[3880e8bd-2a09-4735-bb5f-9a49e992dfee]
Running failure
Task tasks.failure[3880e8bd-2a09-4735-bb5f-9a49e992dfee] raised unexpected: ValueError('BAD',)
Received task: tasks.plus[b3290ce9-fc74-45f2-a820-40bd6dea8473]
Running plus 3, 3
Task tasks.plus[b3290ce9-fc74-45f2-a820-40bd6dea8473] succeeded in 0.016270199994323775s: 6
celery.chord_unlock[0f37fa4d-4f12-4c65-9e08-b69f0cf2afd7] ETA:[2018-09-14 03:08:58.441070+00:00]
Chord 'dadece86-d399-4e64-b63a-f02a2a3de434' raised: ValueError('BAD',)
Traceback (most recent call last):
File "/home/flask/.local/lib/python3.6/site-packages/celery/app/builtins.py", line 81, in unlock_chord
ret = j(timeout=3.0, propagate=True)
File "/home/flask/.local/lib/python3.6/site-packages/celery/result.py", line 739, in join
interval=interval, no_ack=no_ack, on_interval=on_interval,
File "/home/flask/.local/lib/python3.6/site-packages/celery/result.py", line 213, in get
self.maybe_throw(callback=callback)
File "/home/flask/.local/lib/python3.6/site-packages/celery/result.py", line 329, in maybe_throw
self.throw(value, self._to_remote_traceback(tb))
File "/home/flask/.local/lib/python3.6/site-packages/celery/result.py", line 322, in throw
self.on_ready.throw(*args, **kwargs)
File "/home/flask/.local/lib/python3.6/site-packages/vine/promises.py", line 217, in throw
reraise(type(exc), exc, tb)
File "/home/flask/.local/lib/python3.6/site-packages/vine/five.py", line 179, in reraise
raise value
ValueError: BAD
Received task: tasks.on_chord_error[cf3056bc-34ea-4681-87e7-cded53acb958]
Task celery.chord_unlock[0f37fa4d-4f12-4c65-9e08-b69f0cf2afd7] succeeded in 0.12482409999938682s: None
ON ERROR CALLBACK
Task ID: fe3dae19-0641-47fa-9c4d-953b868992e7
Extra info: extra info
Received task: tasks.callme[d6dfd6c0-f0d9-474f-9d98-be43e031de69]
Callback
Callback arg: extra info
关于python - 你如何确保在失败的子任务中调用 Celery chord 回调?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/46801448/
我正在研究 learnyounode 的 HTTP 客户端作业。 我想知道为什么控制台记录来自response.on(“end”,callback)的数据仅输出预期输出的最后一部分,而控制台记录来自r
我正在尝试创建一个对象列表(在我的示例中为 List),我在其中使用 json 将对象添加到此列表,但该列表仍为空。这是我的代码: public List readCardsFromJson() {
我有一个 JavaScript 函数“print_something”,它在大约 300 个 jsp 帮助页面中实现。我发现这个“print_something”函数必须被纠正。所以我正在寻找一个不更
有 2 个 HTML 下拉列表,一个用于 12 小时时间,一个用于每小时 5 分钟的时间间隔。 .. 1 .. 12 .. 0 .. 55 .. 一直在尝试使用 if/
我有一个 A 类,我打算在它与设备驱动程序交互时将其放入共享库中。 我有一个 B 类,将来可能是 C、D、E...,它将使用共享库中的 A 类。 我想要在类 A 中设置回调函数的功能,以便当特定事件发
我需要能够在处理完 Observable.next() 之后执行回调。 我有一个组件“A”,它有一个主题使用 Subject.next() 发送通知。我有一个组件“B”,它订阅了 Subject.as
我有一张在顶部和底部单元格下方带有阴影的表格(此处使用 Matt Gallagher 的解决方案:http://cocoawithlove.com/2009/08/adding-shadow-effe
有人可以向我解释一下为什么这段代码有效 renderSquare(i) { return ( this.handleClick(i)} /> ); } 但
我可以让两个不同的客户端监听相同的 WCF 回调并让它们都接收相同的数据而不必进行两次处理吗? 最佳答案 不是真的 - 至少不是直接的。你所描述的听起来很像发布/订阅模式。 WCF 服务基本上在任何给
我是 SignalR 的新手,如果这个问题太明显,我深表歉意,但我在文档中找不到任何答案。 这是我的代码。 /*1*/ actions.client.doActionA = function (r
我有这个应用程序,您可以在其中输入一些文本并按下一个按钮,将此文本添加到自定义小部件中。这是代码: import 'dart:core'; import 'package:flutter/materi
我读到当您还想使用模型回调时不能使用 Keras 进行交叉验证,但是 this post表明这毕竟是可能的。但是,我很难将其纳入我的上下文。 为了更详细地探讨这个问题,我正在关注 machinelea
我尝试在重力表单中提交表单失败后运行一些 jQuery 代码,也就是验证发现错误时。 我尝试使用 Ajax:complete 回调,但它根本不触发。 我尝试运行的代码基本上将监听器添加到选择下拉列表中
我有一个 $image,我 .fadeIn 和 .fadeOut,然后 .remove .fadeOut 完成。这是我的代码: $image .fadeIn() .fadeOut(func
我正在处理一个自定义文件路径类,它应该始终执行一个函数 写入相应的系统文件及其文件对象后 关闭。该函数将文件路径的内容上传到远程位置。 我希望上传功能完全在用户的幕后发生 透视,即用户可以像使用其他任
这里是 javascript 新手,所以回调在我的大脑中仍然有点不确定。 我想做的是:给定一个“菜单”,它是一个 objectId 数组,查询与该 objectId 相对应的每个 foodItem,获
我正在学习回调,我编写了以下代码: var http = require('http'); var str = ""; var count = 2; function jugglingAsync(ca
这是我的困境,我有一系列被调用的函数,我正在使用回调函数在它们完成时执行函数。回调返回一个值并且效果也很好,我的问题是当我向回调添加参数时我无法再访问返回值。这是一个有效的例子: function m
This question already has answers here: Explanation of function pointers (4个答案) 上个月关闭。 如何将函数指针作为参数传递
我无法让以下代码工作。假设 ajax 调用有效,并且 msg['username'] 预设为 'john'。我想我对如何将变量传递给回调感到困惑。编辑:我认为我的主要困惑是如何从 Ajax 中获取“m
我是一名优秀的程序员,十分优秀!