gpt4 book ai didi

python - 改变 Python 多处理引发错误的方式

转载 作者:行者123 更新时间:2023-12-01 03:50:21 24 4
gpt4 key购买 nike

我在 Python 中使用 pool.apply_async 进行多重处理,以同时运行具有各种不同参数的函数。

相关代码摘录为:

import multiprocessing as mp

all_details_to_process_full = [[x,y.z], [x2,y2.z2]]

def loop_over_desired_sub(arg_list):
...

if __name__ == '__main__':
pool = mp.Pool(processes=10)
desired_content = [pool.apply_async(loop_over_desired_sub, args=(arg_list,)) for arg_list in all_details_to_process_full]
results = [p.get() for p in desired_content]

据我所知,Python 的默认行为是仅在初始化的最早的子进程引发错误时才停止代码。

例如,如果列表中有 10 个项目需要使用单独的子进程进行处理,并且在处理第一项(即初始化的第一个子进程)时出现错误,Python 将立即引发错误并停止代码。但是,如果第二个子进程中出现错误,虽然该子进程将停止,但代码的其余部分将继续执行,直到第一项完成,此时会引发错误并且代码停止。 [如果处理第三项时出现错误,则第一项和第二项都需要在错误出现之前完成]。

有没有办法改变这种行为:

  1. 发生任何错误,即在任何子进程中,停止代码立即

  2. 如果出现错误,代码不会停止,直到所有子处理已完成

最佳答案

当您使用apply_async时,您的每个进程都是独立的。因此,Python 的默认行为是独立处理它们,这意味着一个失败不会影响另一个'

这里的问题是您以有序的方式处理函数 loop_over_desired_content 的结果。get 方法将阻塞,直到检索到第一个操作的结果(即使第二个进程返回/失败)。然后,它将处理第二个值,并在需要时引发错误。

import multiprocessing as mp
import time


def fail_in(args):
x, l = args
if x == l:
raise RuntimeError(x)
time.sleep(.5)
print("Finish process {}".format(x))
return x


if __name__ == '__main__':
pool = mp.Pool(processes=3)
tasks = [(i, 0) for i in range(9)]

try:
desired_content = [pool.apply_async(fail_in, args=(a,)) for a in tasks]
t1 = time.time()
results = [p.get() for p in desired_content]
except RuntimeError:
print("apply_async 0 failed in {:4.2}s".format(time.time()-t1))
pool.terminate()
pool = mp.Pool(processes=3)
tasks = [(i, 1) for i in range(9)]

try:
desired_content = [pool.apply_async(fail_in, args=(a,)) for a in tasks]
t1 = time.time()
results = [p.get() for p in desired_content]
except RuntimeError:
print("apply_async 1 failed in {:4.2}s".format(time.time()-t1))
pool.terminate()
pool = mp.Pool(processes=3)
tasks = [(i, 4) for i in range(9)]

try:
desired_content = [pool.apply_async(fail_in, args=(a,)) for a in tasks]
t1 = time.time()
results = [p.get() for p in desired_content]
except RuntimeError:
print("apply_async 4 failed in {:4.2}s".format(time.time()-t1))
pool.terminate()

请注意,剩余的进程不会被此错误终止。您可以通过尝试在池中提交新作业而不使用 terminate 来查看它。它们将在您之前工作的所有剩余流程完成后开始。

要获得更快的错误通知,您可以使用方法imap_unordered,该方法会在返回错误时立即引发错误。您必须小心,因为您需要使用 job_id 来找回订单。
在这种情况下,您还可以使用 callback_error 获取通知来执行清理。

对于第二个行为,要求在引发错误之前处理所有结果,您可以使用:

desired_content = [pool.apply_async(loop_over_desired_sub, args=(arg_list,)) 
for arg_list in all_details_to_process_full]
results = []
for p in desired_content:
try:
r = p.get()
except Exception as r:
pass
results += [r]

results = [p.get() for p in desired_content]

关于python - 改变 Python 多处理引发错误的方式,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/38341186/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com