gpt4 book ai didi

python - 一旦任何进程遇到错误,如何在 python 中终止异步星图多处理池

转载 作者:行者123 更新时间:2023-12-05 04:26:02 25 4
gpt4 key购买 nike

我正在使用 Python 的多处理库中的 starmap_async 函数。但是,我注意到如果我的代码在其中一个进程中遇到错误,则在所有进程完成之前不会抛出异常。这是相关代码:

from multiprocessing import Pool, cpu_count
import datetime
import itertools
import time

with Pool(max(cpu_count()//2, 1)) as p:
#p = Pool()
df_iter = df_options.iterrows()
ir = itertools.repeat
results = p.starmap_async(_run,zip(df_iter,ir(fixed_options),ir(outputs_grab)), chunksize=1)
p.close() #no more jobs to submit

#Printing progress

n_remaining = results._number_left + 1
while (not results.ready()):
time.sleep(1)
#Check for errors here ... How ????
#Then what? call terminate()?????
if verbose:
if results._number_left < n_remaining:
now = datetime.datetime.now()
n_remaining = results._number_left
print('%d/%d %s' % (n_remaining,n_rows,str(now)[11:]))

print('joining')
p.join()

all_results = results.get()

df = pd.DataFrame(all_results)

目前,如果我在生成的进程中引发错误,其他进程似乎不仅完成运行,而且开始新任务,尽管其中一个调用出现错误。

一些搜索让我相信这可能是不可能的。有人似乎建议我可能需要改用 concurrent.futures,尽管目前还不清楚如何将我的示例映射到该示例,尤其是在流程完成时保持实时反馈。

concurrent.futures 的讨论:https://stackoverflow.com/a/47108581/764365

最佳答案

tldr;使用 imap_unordered 可以让主进程知道 child 抛出异常的延迟最小,因为它允许您在主进程通过结果 Queue< 进入时立即处理结果。然后,您可以根据需要使用包装函数来构建您自己的函数“明星”版本。作为代码设计的一个要点,大多数 Pool 方法倾向于从子进程中重新引发异常,而 concurrent.futures 倾向于设置返回值的属性以指示异常那是提出来的。

from random import random
from functools import partial
from multiprocessing import Pool
from time import sleep

def foo(a, b):
sleep(random()) #introduce some processing delay to simulate work
if random() > .95:
raise Exception("randomly rasied an exception")
else:
return f"{a}\t{b}"

def star_helper(func, args):
return func(*args)

if __name__ == "__main__":
n = 20
print("chance of early termination:", (1-.95**n)*100, "%")
with Pool() as p:
try:
for result in p.imap_unordered(partial(star_helper, foo), zip(range(n), range(n))):
print(result)
except:
p.terminate()
print("terminated")
print("done") # `with Pool()` joins the child processes to prove they quit early

关于python - 一旦任何进程遇到错误,如何在 python 中终止异步星图多处理池,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/73114219/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com