gpt4 book ai didi

python-3.x - 调用Python multiprocessing.Process.join()失败

转载 作者:行者123 更新时间:2023-12-02 07:30:12 24 4
gpt4 key购买 nike

当我运行下面的代码时,它有时会失败

time_start = time.time()
job = multiprocessing.Process(target=load_cpu, args=(deadline, ))
job.start() # This is line 37 in the source code linked below
# timeout=None in the call to join() solves the problem
job.join(deadline)
elapsed = time.time()-time_start
if elapsed < deadline and job.is_alive():
# I am getting here from time to time
logger.error(f"#{job_counter}: job.join() returned while process {job.pid} is still alive elapsed={elapsed} deadline={deadline}")

演示问题的Python 3.7容器(Docker)在这里 https://github.com/larytet-py/multiprocess
如果我在4核Ubuntu 18.04主机上运行代码几分钟,我会得到
Traceback (most recent call last):
File "/usr/lib/python3.7/threading.py", line 926, in _bootstrap_inner
self.run()
File "/usr/lib/python3.7/threading.py", line 870, in run
self._target(*self._args, **self._kwargs)
File "main.py", line 37, in spawn_job
job.start()
File "/usr/lib/python3.7/multiprocessing/process.py", line 111, in start
_cleanup()
File "/usr/lib/python3.7/multiprocessing/process.py", line 56, in _cleanup
if p._popen.poll() is not None:
AttributeError: 'NoneType' object has no attribute 'poll'

我究竟做错了什么?
我的解决方法是用轮询替换对job.join()的调用,并检查is_alive()。不幸的是,这种方法会影响延迟。有更好的选择吗?
def join_process(job, timeout):
time_start = time.time()
# Typical processing time is 100ms I want to reduce latency impact
# 10ms looks ok.
# TODO I can end up in a tight loop here.
polling_time = min(0.1*timeout, 0.010)
while time.time()-time_start < timeout and job.is_alive():
time.sleep(polling_time)
continue

更新。我尝试用multiprocessing.Event()代替Process.join()代码失败,并出现相同的异常

更新2。我已经在根本不调用Process.join()的代码中重现了该问题。它需要更多时间和更多负载,但最终Process.start()崩溃。

更新3。 https://bugs.python.org/issue40860被接受了吗?我仍在寻找解决方法。

最佳答案

同步对Process.start()的调用会有所帮助。这是一个合理的解决方法。没有其他答案。我接受自己的回答。

diff --git a/main.py b/main.py
index d09dc53..49d68f0 100644
--- a/main.py
+++ b/main.py
@@ -26,17 +26,24 @@ def load_cpu(deadline):
while time.time() - start < 0.2*deadline:
math.pow(random.randint(0, 1), random.randint(0, 1))

+def join_process(job, timeout):
+ time_start = time.time()
+ while time.time()-time_start < timeout and job.is_alive():
+ time.sleep(0.1 * timeout)
+ continue
+
job_counter = 0
+lock = threading.Lock()
def spawn_job(deadline):
'''
Creat a new Process, call join(), process errors
'''
global job_counter
time_start = time.time()
- job = multiprocessing.Process(target=load_cpu, args=(deadline, ))
- job.start()
- # timeout=None in the call to join() solves the problem
- job.join(deadline)
+ with lock:
+ job = multiprocessing.Process(target=load_cpu, args=(deadline, ))
+ job.start()
+ join_process(job, deadline)
我的最终版本使用os.fork()。我完全放弃了多处理。多重处理不是线程安全的(我不是在开玩笑) https://gist.github.com/larytet/3ca9f9a32b1dc089a24cb7011455141f

关于python-3.x - 调用Python multiprocessing.Process.join()失败,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/62276345/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com