- android - 多次调用 OnPrimaryClipChangedListener
- android - 无法更新 RecyclerView 中的 TextView 字段
- android.database.CursorIndexOutOfBoundsException : Index 0 requested, 光标大小为 0
- android - 使用 AppCompat 时,我们是否需要明确指定其 UI 组件(Spinner、EditText)颜色
我正在尝试使用 ipython 的并行处理来并行处理数据。我按照@minrk的指示进行操作回答how to get intermidiate results in ipython parallel processing?上的问题。由于数据是异构的,一些处理任务比其他任务完成得更快,我想在它们可用时立即保存它们。我按以下方式执行此操作:
from IPython.parallel import Client
def specialfunc(param):
import time
if param > 8:
raise IOError
else:
time.sleep( param)
return param
client = Client()
balanced = client.load_balanced_view()
balanced.block = False
param_list = range(10) # [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
asyncmap = balanced.map_async(specialfunc, param_list, ordered=False)
然后我可以循环 asyncmap,结果在准备好后就可用:
for i in asyncmap:
print i
问题是我的代码有时会抛出异常(上面的示例在调用参数超过 8 时强制出现 IOError),我想对此进行处理。然而,一旦其中一个引擎出现抖动,整个异步映射“似乎”就完成了。
我实际上注意到,当我询问 asyncmap.metadata 时,可以很好地找出哪条消息给出了错误(asyncmap.metadata[i]['pyerr']),但我不知道如何等待结果像他们一样进来。
所以我的问题是我应该如何处理从引擎异步到达的结果,即使它们有时会抛出异常。如何捕获引擎中的异常而不扰乱 Controller 中的等待结果?
最佳答案
我知道这听起来有点愚蠢,但您可以返回一个特殊值来指示错误,例如 -1
或 None
或字符串。为了解决 map_async
我所做的就是循环遍历参数并使用 apply_async
,将结果存储在列表中。然后,我循环遍历列表,尝试获取结果并一次处理一个结果。看起来像这样:
n_cores = len(c.ids)
for n,p in enumerate( params ):
core = c.ids[n%n_cores]
calls.append( c[core].apply_async( f, p ) )
#then you get the results
while calls != []:
for c in calls:
try:
result = c.get(1e-3)
process(result)
calls.remove( c )
#in the case your call failed, you can apply_async again.
# and append the call to calls.
except parallel.TimeoutError:
pass
或者使用c[core].apply()
并使用c.ready()
检查调用。基本上是同样的事情,没有异常处理。令人烦恼的是,这会占用大量内存,因为每次调用的结果
和其他字典
都很难清除。
我也在做类似的事情here我认为 map_async 对我不起作用。 This如果您决定采用这种方法,也可能相关。
干杯。
PS:我认为本质上这就是您上面实现的,但我发现单独处理调用然后将它们堆叠到 map 中更自然,特别是如果您稍后可能想重新处理其中一些调用。
关于python - 处理来自 asyncmap 的结果,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/19509059/
有没有一种方法可以在不使用计数器变量的情况下编写下面的函数来知道超时何时完成。 可以使用 async/await 来完成吗? 我在下面提供了测试调用和当前工作函数。 // write asyncMap
我正在尝试使用 ipython 的并行处理来并行处理数据。我按照@minrk的指示进行操作回答how to get intermidiate results in ipython parallel p
我在有状态小部件中使用以下 StreamBuilder: StreamBuilder>( stream: widget.model.results(widget.type), builder:
如果我像这样获取 AsyncMap: if (vertx.isClustered()) { vertx.sharedData().getClusterWideMap("entities",
我是一名优秀的程序员,十分优秀!