gpt4 book ai didi

python - 提前退出 multiprocessing.Pool.map (在子进程中引发不起作​​用)

转载 作者:行者123 更新时间:2023-12-01 06:29:17 24 4
gpt4 key购买 nike

我的复制是错误的,如 Rugnar's answer 中所述。 . 我将大部分代码保持原样,因为我不确定它在 clarifying and changing the meaning 之间的位置。 .

我有数千个作业需要运行,并且希望任何错误都立即停止执行。我将任务包装在 try 中/exceptraise这样我就可以记录错误(没有所有多处理/线程噪音),然后重新引发。这不会不会终止主进程。

发生了什么事,我怎样才能提前退出? sys.exit(1)在子僵局中,包裹 try/exceptraise另一个函数中的函数也不起作用。

$ python3 mp_reraise.py
(0,)
(1,)
(2,)
(3,)
(4,)
(5,)
(6,)
(7,)
(8,)
(9,)
multiprocessing.pool.RemoteTraceback:
"""
Traceback (most recent call last):
File "/usr/lib/python3.6/multiprocessing/pool.py", line 119, in worker
result = (True, func(*args, **kwds))
File "/usr/lib/python3.6/multiprocessing/pool.py", line 44, in mapstar
return list(map(*args))
File "mp_reraise.py", line 5, in f_reraise
raise Exception(args)
Exception: (0,)
"""

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
File "mp_reraise.py", line 14, in <module>
test_reraise()
File "mp_reraise.py", line 12, in test_reraise
p.map(f_reraise, range(10))
File "/usr/lib/python3.6/multiprocessing/pool.py", line 266, in map
return self._map_async(func, iterable, mapstar, chunksize).get()
File "/usr/lib/python3.6/multiprocessing/pool.py", line 644, in get
raise self._value
Exception: (0,)

mp_reraise.py

import multiprocessing

def f_reraise(*args):
try:
raise Exception(args)
except Exception as e:
print(e)
raise

def test_reraise():
with multiprocessing.Pool() as p:
p.map(f_reraise, range(10))

test_reraise()

如果我不捕获并重新加注,执行将按预期提前停止:[根据鲁格纳的回答,这实际上并没有停止]

$ python3 mp_raise.py 
multiprocessing.pool.RemoteTraceback:
"""
Traceback (most recent call last):
File "/usr/lib/python3.6/multiprocessing/pool.py", line 119, in worker
result = (True, func(*args, **kwds))
File "/usr/lib/python3.6/multiprocessing/pool.py", line 44, in mapstar
return list(map(*args))
File "mp_raise.py", line 4, in f_raise
raise Exception(args)
Exception: (0,)
"""

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
File "mp_raise.py", line 10, in <module>
test_raise()
File "mp_raise.py", line 8, in test_raise
p.map(f_raise, range(10))
File "/usr/lib/python3.6/multiprocessing/pool.py", line 266, in map
return self._map_async(func, iterable, mapstar, chunksize).get()
File "/usr/lib/python3.6/multiprocessing/pool.py", line 644, in get
raise self._value
Exception: (0,)

mp_raise.py

import multiprocessing

def f_raise(*args):
# missing print, which would demonstrate that
# this actually does not stop early
raise Exception(args)

def test_raise():
with multiprocessing.Pool() as p:
p.map(f_raise, range(10))

test_raise()

最佳答案

在您的 mp_raise.py 中,您不会打印任何内容,因此您看不到完成了多少作业。我添加了 print 并发现只有当作业迭代器耗尽时池才会看到子进程的异常。所以它永远不会提前停止。

如果您需要在异常发生后尽早停止,请尝试此操作

import time
import multiprocessing as mp


def f_reraise(i):
if abort.is_set(): # cancel job if abort happened
return
time.sleep(i / 1000) # add sleep so jobs are not instant, like in real life
if abort.is_set(): # probably we need stop job in the middle of execution if abort happened
return
print(i)
try:
raise Exception(i)
except Exception as e:
abort.set()
print('error:', e)
raise


def init(a):
global abort
abort = a


def test_reraise():
_abort = mp.Event()

# jobs should stop being fed to the pool when abort happened
# so we wrap jobs iterator this way
def pool_args():
for i in range(100):
if not _abort.is_set():
yield i

# initializer and init is a way to share event between processes
# thanks to https://stackoverflow.com/questions/25557686/python-sharing-a-lock-between-processes
with mp.Pool(8, initializer=init, initargs=(_abort,)) as p:
p.map(f_reraise, pool_args())


if __name__ == '__main__':
test_reraise()

关于python - 提前退出 multiprocessing.Pool.map (在子进程中引发不起作​​用),我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/59982749/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com