- r - 以节省内存的方式增长 data.frame
- ruby-on-rails - ruby/ruby on rails 内存泄漏检测
- android - 无法解析导入android.support.v7.app
- UNIX 域套接字与共享内存(映射文件)
我有一个可执行文件,我需要使用不同的参数经常运行它。为此,我使用多处理模块编写了一个小型 Python (2.7) 包装器,遵循给定的模式 here .
我的代码是这样的:
try:
logging.info("starting pool runs")
pool.map(run_nlin, params)
pool.close()
except KeyboardInterrupt:
logging.info("^C pressed")
pool.terminate()
except Exception, e:
logging.info("exception caught: ", e)
pool.terminate()
finally:
time.sleep(5)
pool.join()
logging.info("done")
我的工作函数在这里:
class KeyboardInterruptError(Exception): pass
def run_nlin((path_config, path_log, path_nlin, update_method)):
try:
with open(path_log, "w") as log_:
cmdline = [path_nlin, path_config]
if update_method:
cmdline += [update_method, ]
sp.call(cmdline, stdout=log_, stderr=log_)
except KeyboardInterrupt:
time.sleep(5)
raise KeyboardInterruptError()
except:
raise
path_config
是二进制程序配置文件的路径;里面有例如运行程序的日期。
当我启动包装器时,一切看起来都很好。但是,当我按下 ^C
时,包装器脚本似乎在终止之前从池中启动了一个额外的 numproc
进程。例如,当我在第 1-10 天启动脚本时,我可以在 ps aux
输出中看到二进制程序的两个实例正在运行(通常是第 1 天和第 3 天)。现在,当我按下 ^C
时,包装脚本退出,第 1 天和第 3 天的二进制程序消失了,但有新的二进制程序在第 5 天和第 7 天运行。
所以对我来说,似乎 Pool
在最终死亡之前启动了另一个 numproc
进程。
知道这里发生了什么,我能做些什么吗?
最佳答案
关于 this page ,多处理模块的作者 Jesse Noller 表明处理 KeyboardInterrupt
的正确方法是让子进程返回——而不是重新引发异常。这允许主进程终止池。
但是,如下面的代码所示,主进程不会到达 except KeyboardInterrupt
block ,直到在 pool.map
已经运行。这就是为什么(我相信)在按下
Ctrl-C
之后,您会看到对辅助函数 run_nlin
的额外调用。
一种可能的解决方法是让所有工作函数测试是否已设置
multiprocessing.Event
。如果事件已设置,则让 worker 提前退出,否则继续进行长计算。
import logging
import multiprocessing as mp
import time
logger = mp.log_to_stderr(logging.WARNING)
def worker(x):
try:
if not terminating.is_set():
logger.warn("Running worker({x!r})".format(x = x))
time.sleep(3)
else:
logger.warn("got the message... we're terminating!")
except KeyboardInterrupt:
logger.warn("terminating is set")
terminating.set()
return x
def initializer(terminating_):
# This places terminating in the global namespace of the worker subprocesses.
# This allows the worker function to access `terminating` even though it is
# not passed as an argument to the function.
global terminating
terminating = terminating_
def main():
terminating = mp.Event()
result = []
pool = mp.Pool(initializer=initializer, initargs=(terminating, ))
params = range(12)
try:
logger.warn("starting pool runs")
result = pool.map(worker, params)
pool.close()
except KeyboardInterrupt:
logger.warn("^C pressed")
pool.terminate()
finally:
pool.join()
logger.warn('done: {r}'.format(r = result))
if __name__ == '__main__':
main()
运行脚本会产生:
% test.py
[WARNING/MainProcess] starting pool runs
[WARNING/PoolWorker-1] Running worker(0)
[WARNING/PoolWorker-2] Running worker(1)
[WARNING/PoolWorker-3] Running worker(2)
[WARNING/PoolWorker-4] Running worker(3)
这里按下了 Ctrl-C;每个工作人员都设置了 terminating
事件。我们真的只需要一个来设置它,但尽管效率低下,但它仍然有效。
C-c C-c[WARNING/PoolWorker-4] terminating is set
[WARNING/PoolWorker-2] terminating is set
[WARNING/PoolWorker-3] terminating is set
[WARNING/PoolWorker-1] terminating is set
现在 pool.map
排队的所有其他任务都已运行:
[WARNING/PoolWorker-4] got the message... we're terminating!
[WARNING/PoolWorker-2] got the message... we're terminating!
[WARNING/PoolWorker-1] got the message... we're terminating!
[WARNING/PoolWorker-2] got the message... we're terminating!
[WARNING/PoolWorker-4] got the message... we're terminating!
[WARNING/PoolWorker-2] got the message... we're terminating!
[WARNING/PoolWorker-1] got the message... we're terminating!
[WARNING/PoolWorker-3] got the message... we're terminating!
最后主进程到达 except KeyboardInterrupt
block 。
[WARNING/MainProcess] ^C pressed
[WARNING/MainProcess] done: []
关于python - multiprocessing.Pool 在 Linux/Python2.7 上的 terminate() 之后生成新的子进程?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/14579474/
我正在尝试使用多处理和队列实现生产者-消费者场景;主进程是生产者,两个子进程使用队列中的数据。这在没有任何异常 发生的情况下有效,但问题是我希望能够在工作人员死亡时重新启动他们(kill -9 wor
我试图在一个管理进程下启动一个数据队列服务器(这样它以后可以变成一个服务),虽然数据队列服务器功能在主进程中工作正常,但它在一个进程中不起作用使用 multiprocessing.Process 创建
我的多处理需求非常简单:我从事机器学习工作,有时我需要评估多个数据集中的一个算法,或者一个数据集中的多个算法,等等。我只需要运行一个带有一些参数的函数并获取一个数字。 我不需要 RPC、共享数据,什么
创建进程池或简单地遍历一个进程以创建更多进程之间有任何区别(以任何方式)吗? 这有什么区别?: pool = multiprocessing.Pool(5) pool.apply_async(work
multiprocessing.BoundedSemaphore(3) 与 multiprocessing.Sempahore(3) 有何不同? 我希望 multiprocessing.Bounded
我尝试通过 multiprocessing 包中的 Queue 对 Pipe 的速度进行基准测试。我认为 Pipe 会更快,因为 Queue 在内部使用 Pipe。 奇怪的是,Pipe 在发送大型 n
我有这样一个简单的任务: def worker(queue): while True: try: _ = queue.get_nowait()
我正在尝试编写一个与 multiprocessing.Pool 同时应用函数的应用程序。我希望这个函数成为一个实例方法(所以我可以在不同的子类中以不同的方式定义它)。这似乎是不可能的;正如我在其他地方
在 python 2 中,multiprocessing.dummy.Pool 和 multiprocessing.pool.ThreadPool 之间有什么区别吗?源代码似乎暗示它们是相同的。 最佳
我正在开发一个用于财务目的的模型。我将整个 S&P500 组件放在一个文件夹中,存储了尽可能多的 .hdf 文件。每个 .hdf 文件都有自己的多索引(年-周-分)。 顺序代码示例(非并行化): im
到目前为止,我是这样做的: rets=set(pool.map_async(my_callback, args.hosts).get(60*4)) 如果超时,我会得到一个异常: File "/usr
参见下面的示例和执行结果: #!/usr/bin/env python3.4 from multiprocessing import Pool import time import os def in
我的任务是监听 UDP 数据报,对其进行解码(数据报具有二进制信息),将解码后的信息放入字典中,将字典转储为 json 字符串,然后将 json 字符串发送到远程服务器(ActiveMQ)。 解码和发
我在 macOS 上工作,最近被 Python 3.8 多处理中“fork”到“spawn”的变化所困扰(参见 doc )。下面显示了一个简化的工作示例,其中使用“fork”成功但使用“spawn”失
multiprocessing.Queue 的文档指出从项目入队到其腌制表示刷新到底层管道之间存在一点延迟。显然,您可以将一个项目直接放入管道中(它没有说明其他情况,并且暗示情况就是如此)。 为什么管
我运行了一些测试代码来检查在 Linux 中使用 Pool 和 Process 的性能。我正在使用 Python 2.7。 multiprocessing.Pool 的源代码似乎显示它正在使用 mul
我在 Windows Standard Embedded 7 上运行 python 3.4.3。我有一个继承 multiprocessing.Process 的类。 在类的 run 方法中,我为进程对
我知道multiprocessing.Process类似于 threading.Thread当我子类 multiprocessing.Process 时要创建一个进程,我发现我不必调用 __init_
我有教科书声明说在多处理器系统中不建议禁用中断,并且会花费太多时间。但我不明白这一点,谁能告诉我多处理器系统禁用中断的过程?谢谢 最佳答案 在 x86(和其他架构,AFAIK)上,启用/禁用中断是基于
我正在执行下面的代码并且它工作正常,但它不会产生不同的进程,而是有时所有都在同一个进程中运行,有时 2 个在一个进程中运行。我正在使用 4 cpu 机器。这段代码有什么问题? def f(values
我是一名优秀的程序员,十分优秀!