gpt4 book ai didi

python - ProcessPoolExecutor,BrokenProcessPool处理

转载 作者:行者123 更新时间:2023-11-28 19:01:03 25 4
gpt4 key购买 nike

在本文档 ( https://pymotw.com/3/concurrent.futures/ ) 中说:

“ProcessPoolExecutor 的工作方式与 ThreadPoolExecutor 相同,但使用进程而不是线程。这允许 CPU 密集型操作使用单独的 CPU,而不会被 CPython 解释器的全局解释器锁阻塞。”

这听起来不错!它还说:

“如果其中一个工作进程发生意外导致其意外退出,则 ProcessPoolExecutor 被视为“已损坏”并且将不再安排任务。”

这听起来很糟糕 :( 所以我想我的问题是:什么被认为是“意外”?这是否仅仅意味着退出信号不是 1?我可以安全地退出线程并继续处理队列吗?示例如下如下:

from concurrent import futures
import os
import signal


with futures.ProcessPoolExecutor(max_workers=2) as ex:
print('getting the pid for one worker')
f1 = ex.submit(os.getpid)
pid1 = f1.result()

print('killing process {}'.format(pid1))
os.kill(pid1, signal.SIGHUP)

print('submitting another task')
f2 = ex.submit(os.getpid)
try:
pid2 = f2.result()
except futures.process.BrokenProcessPool as e:
print('could not start new tasks: {}'.format(e))

最佳答案

我没有在 IRL 中看到它,但从代码来看,返回的文件描述符似乎不包含 results_queue 文件描述符。

来自 concurrent.futures.process:

    reader = result_queue._reader

while True:
_add_call_item_to_queue(pending_work_items,
work_ids_queue,
call_queue)

sentinels = [p.sentinel for p in processes.values()]
assert sentinels
ready = wait([reader] + sentinels)
if reader in ready: # <===================================== THIS
result_item = reader.recv()
else:
# Mark the process pool broken so that submits fail right now.
executor = executor_reference()
if executor is not None:
executor._broken = True
executor._shutdown_thread = True
executor = None
# All futures in flight must be marked failed
for work_id, work_item in pending_work_items.items():
work_item.future.set_exception(
BrokenProcessPool(
"A process in the process pool was "
"terminated abruptly while the future was "
"running or pending."
))
# Delete references to object. See issue16284
del work_item

wait 函数取决于系统,但假设 linux 操作系统(在 multiprocessing.connection 处,删除了所有与超时相关的代码):

    def wait(object_list, timeout=None):
'''
Wait till an object in object_list is ready/readable.

Returns list of those objects in object_list which are ready/readable.
'''
with _WaitSelector() as selector:
for obj in object_list:
selector.register(obj, selectors.EVENT_READ)

while True:
ready = selector.select(timeout)
if ready:
return [key.fileobj for (key, events) in ready]
else:
# some timeout code

关于python - ProcessPoolExecutor,BrokenProcessPool处理,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/52617401/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com