gpt4 book ai didi

python - multiprocessing.Pool 在 Linux/Python2.7 上的 terminate() 之后生成新的子进程?

转载 作者:IT王子 更新时间:2023-10-29 00:52:16 29 4
gpt4 key购买 nike

我有一个可执行文件,我需要使用不同的参数经常运行它。为此,我使用多处理模块编写了一个小型 Python (2.7) 包装器,遵循给定的模式 here .

我的代码是这样的:

try:
logging.info("starting pool runs")
pool.map(run_nlin, params)
pool.close()
except KeyboardInterrupt:
logging.info("^C pressed")
pool.terminate()
except Exception, e:
logging.info("exception caught: ", e)
pool.terminate()
finally:
time.sleep(5)
pool.join()
logging.info("done")

我的工作函数在这里:

class KeyboardInterruptError(Exception): pass

def run_nlin((path_config, path_log, path_nlin, update_method)):
try:
with open(path_log, "w") as log_:
cmdline = [path_nlin, path_config]
if update_method:
cmdline += [update_method, ]
sp.call(cmdline, stdout=log_, stderr=log_)
except KeyboardInterrupt:
time.sleep(5)
raise KeyboardInterruptError()
except:
raise

path_config 是二进制程序配置文件的路径;里面有例如运行程序的日期。

当我启动包装器时,一切看起来都很好。但是,当我按下 ^C 时,包装器脚本似乎在终止之前从池中启动了一个额外的 numproc 进程。例如,当我在第 1-10 天启动脚本时,我可以在 ps aux 输出中看到二进制程序的两个实例正在运行(通常是第 1 天和第 3 天)。现在,当我按下 ^C 时,包装脚本退出,第 1 天和第 3 天的二进制程序消失了,但有新的二进制程序在第 5 天和第 7 天运行。

所以对我来说,似乎 Pool 在最终死亡之前启动了另一个 numproc 进程。

知道这里发生了什么,我能做些什么吗?

最佳答案

关于 this page ,多处理模块的作者 Jesse Noller 表明处理 KeyboardInterrupt 的正确方法是让子进程返回——而不是重新引发异常。这允许主进程终止池。

但是,如下面的代码所示,主进程不会到达 except KeyboardInterrupt block ,直到 pool.map 已经运行。这就是为什么(我相信)在按下 Ctrl-C 之后,您会看到对辅助函数 run_nlin 的额外调用。

一种可能的解决方法是让所有工作函数测试是否已设置 multiprocessing.Event。如果事件已设置,则让 worker 提前退出,否则继续进行长计算。


import logging
import multiprocessing as mp
import time

logger = mp.log_to_stderr(logging.WARNING)

def worker(x):
try:
if not terminating.is_set():
logger.warn("Running worker({x!r})".format(x = x))
time.sleep(3)
else:
logger.warn("got the message... we're terminating!")
except KeyboardInterrupt:
logger.warn("terminating is set")
terminating.set()
return x

def initializer(terminating_):
# This places terminating in the global namespace of the worker subprocesses.
# This allows the worker function to access `terminating` even though it is
# not passed as an argument to the function.
global terminating
terminating = terminating_

def main():
terminating = mp.Event()
result = []
pool = mp.Pool(initializer=initializer, initargs=(terminating, ))
params = range(12)
try:
logger.warn("starting pool runs")
result = pool.map(worker, params)
pool.close()
except KeyboardInterrupt:
logger.warn("^C pressed")
pool.terminate()
finally:
pool.join()
logger.warn('done: {r}'.format(r = result))

if __name__ == '__main__':
main()

运行脚本会产生:

% test.py
[WARNING/MainProcess] starting pool runs
[WARNING/PoolWorker-1] Running worker(0)
[WARNING/PoolWorker-2] Running worker(1)
[WARNING/PoolWorker-3] Running worker(2)
[WARNING/PoolWorker-4] Running worker(3)

这里按下了 Ctrl-C;每个工作人员都设置了 terminating 事件。我们真的只需要一个来设置它,但尽管效率低下,但它仍然有效。

  C-c C-c[WARNING/PoolWorker-4] terminating is set
[WARNING/PoolWorker-2] terminating is set
[WARNING/PoolWorker-3] terminating is set
[WARNING/PoolWorker-1] terminating is set

现在 pool.map 排队的所有其他任务都已运行:

[WARNING/PoolWorker-4] got the message... we're terminating!
[WARNING/PoolWorker-2] got the message... we're terminating!
[WARNING/PoolWorker-1] got the message... we're terminating!
[WARNING/PoolWorker-2] got the message... we're terminating!
[WARNING/PoolWorker-4] got the message... we're terminating!
[WARNING/PoolWorker-2] got the message... we're terminating!
[WARNING/PoolWorker-1] got the message... we're terminating!
[WARNING/PoolWorker-3] got the message... we're terminating!

最后主进程到达 except KeyboardInterrupt block 。

[WARNING/MainProcess] ^C pressed
[WARNING/MainProcess] done: []

关于python - multiprocessing.Pool 在 Linux/Python2.7 上的 terminate() 之后生成新的子进程?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/14579474/

29 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com