gpt4 book ai didi

python - 处理来自 asyncmap 的结果

转载 作者:太空宇宙 更新时间:2023-11-03 18:45:57 26 4
gpt4 key购买 nike

我正在尝试使用 ipython 的并行处理来并行处理数据。我按照@minrk的指示进行操作回答how to get intermidiate results in ipython parallel processing?上的问题。由于数据是异构的,一些处理任务比其他任务完成得更快,我想在它们可用时立即保存它们。我按以下方式执行此操作:

from IPython.parallel import Client

def specialfunc(param):
import time
if param > 8:
raise IOError
else:
time.sleep( param)
return param

client = Client()
balanced = client.load_balanced_view()
balanced.block = False
param_list = range(10) # [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
asyncmap = balanced.map_async(specialfunc, param_list, ordered=False)

然后我可以循环 asyncmap,结果在准备好后就可用:

for i in asyncmap:
print i

问题是我的代码有时会抛出异常(上面的示例在调用参数超过 8 时强制出现 IOError),我想对此进行处理。然而,一旦其中一个引擎出现抖动,整个异步映射“似乎”就完成了。

我实际上注意到,当我询问 asyncmap.metadata 时,可以很好地找出哪条消息给出了错误(asyncmap.metadata[i]['pyerr']),但我不知道如何等待结果像他们一样进来。

所以我的问题是我应该如何处理从引擎异步到达的结果,即使它们有时会抛出异常。如何捕获引擎中的异常而不扰乱 Controller 中的等待结果?

最佳答案

我知道这听起来有点愚蠢,但您可以返回一个特殊值来指示错误,例如 -1None 或字符串。为了解决 map_async 我所做的就是循环遍历参数并使用 apply_async,将结果存储在列表中。然后,我循环遍历列表,尝试获取结果并一次处理一个结果。看起来像这样:

 n_cores = len(c.ids)
for n,p in enumerate( params ):
core = c.ids[n%n_cores]
calls.append( c[core].apply_async( f, p ) )

#then you get the results

while calls != []:
for c in calls:
try:
result = c.get(1e-3)
process(result)
calls.remove( c )
#in the case your call failed, you can apply_async again.
# and append the call to calls.
except parallel.TimeoutError:
pass

或者使用c[core].apply()并使用c.ready()检查调用。基本上是同样的事情,没有异常处理。令人烦恼的是,这会占用大量内存,因为每次调用的结果和其他字典都很难清除。

我也在做类似的事情here我认为 map_async 对我不起作用。 This如果您决定采用这种方法,也可能相关。

干杯。

PS:我认为本质上这就是您上面实现的,但我发现单独处理调用然后将它们堆叠到 map 中更自然,特别是如果您稍后可能想重新处理其中一些调用。

关于python - 处理来自 asyncmap 的结果,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/19509059/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com