- c - 在位数组中找到第一个零
- linux - Unix 显示有关匹配两种模式之一的文件的信息
- 正则表达式替换多个文件
- linux - 隐藏来自 xtrace 的命令
我有一些 multiprocessing
Python 代码,看起来有点像这样:
import time
from multiprocessing import Pool
import numpy as np
class MyClass(object):
def __init__(self):
self.myAttribute = np.zeros(100000000) # basically a big memory struct
def my_multithreaded_analysis(self):
arg_lists = [(self, i) for i in range(10)]
pool = Pool(processes=10)
result = pool.map(call_method, arg_lists)
print result
def analyze(self, i):
time.sleep(10)
return i ** 2
def call_method(args):
my_instance, i = args
return my_instance.analyze(i)
if __name__ == '__main__':
my_instance = MyClass()
my_instance.my_multithreaded_analysis()
在阅读了其他 StackOverflow 答案中关于内存如何工作的答案后,例如这个 Python multiprocessing memory usage我的印象是,这不会根据我用于多处理的进程数来使用内存,因为它是写时复制的,而且我没有修改 my_instance
的任何属性。但是,当我运行 top 时,我确实看到了所有进程的高内存,它说我的大部分进程都在使用大量内存(这是 OSX 的最高输出,但我可以在 Linux 上复制)。
我的问题基本上是,我是否正确地解释了这一点,因为我的 MyClass
实例实际上在池中是重复的?如果是这样,我该如何防止这种情况发生?我不应该使用这样的结构吗?我的目标是减少计算分析的内存使用。
PID COMMAND %CPU TIME #TH #WQ #PORT MEM PURG CMPRS PGRP PPID STATE
2494 Python 0.0 00:01.75 1 0 7 765M 0B 0B 2484 2484 sleeping
2493 Python 0.0 00:01.85 1 0 7 765M 0B 0B 2484 2484 sleeping
2492 Python 0.0 00:01.86 1 0 7 765M 0B 0B 2484 2484 sleeping
2491 Python 0.0 00:01.83 1 0 7 765M 0B 0B 2484 2484 sleeping
2490 Python 0.0 00:01.87 1 0 7 765M 0B 0B 2484 2484 sleeping
2489 Python 0.0 00:01.79 1 0 7 167M 0B 597M 2484 2484 sleeping
2488 Python 0.0 00:01.77 1 0 7 10M 0B 755M 2484 2484 sleeping
2487 Python 0.0 00:01.75 1 0 7 8724K 0B 756M 2484 2484 sleeping
2486 Python 0.0 00:01.78 1 0 7 9968K 0B 755M 2484 2484 sleeping
2485 Python 0.0 00:01.74 1 0 7 171M 0B 594M 2484 2484 sleeping
2484 Python 0.1 00:16.43 4 0 18 775M 0B 12K 2484 2235 sleeping
最佳答案
任何发送到 pool.map
(和相关方法)的东西实际上都没有使用共享的写时复制资源。值为 "pickled" (Python's serialization mechanism) ,通过管道发送到工作进程并在那里进行 unpickled,从头开始重建子进程中的对象。因此,在这种情况下,每个 child 最终都会得到原始数据的写时复制版本(它从不使用它,因为它被告知使用通过 IPC 发送的副本),以及对原始数据的个人重建在 child 中重建,不共享。
如果您想利用 fork 的写时复制优势,您不能通过管道发送数据(或引用数据的对象)。您必须将它们存储在可以通过访问他们自己的全局变量从 child 那里找到的位置。例如:
import os
import time
from multiprocessing import Pool
import numpy as np
class MyClass(object):
def __init__(self):
self.myAttribute = os.urandom(1024*1024*1024) # basically a big memory struct(~1GB size)
def my_multithreaded_analysis(self):
arg_lists = list(range(10)) # Don't pass self
pool = Pool(processes=10)
result = pool.map(call_method, arg_lists)
print result
def analyze(self, i):
time.sleep(10)
return i ** 2
def call_method(i):
# Implicitly use global copy of my_instance, not one passed as an argument
return my_instance.analyze(i)
# Constructed globally and unconditionally, so the instance exists
# prior to forking in commonly accessible location
my_instance = MyClass()
if __name__ == '__main__':
my_instance.my_multithreaded_analysis()
通过不传递 self
,您可以避免制作副本,而只需使用写入时复制映射到子对象的单个全局对象。如果您需要多个对象,您可以在创建池之前创建一个全局 list
或 dict
映射到对象的实例,然后传递可以在 pool.map
的参数中查找对象。 worker 函数然后使用索引/键(必须被 pickle 并通过 IPC 发送给 child )在全局字典(也是写时复制映射)中查找值(写时复制映射),所以你复制便宜的信息来查找 child 中昂贵的数据而不复制它。
如果对象很小,即使您不写入它们,它们最终也会被复制。 CPython 是引用计数的,引用计数出现在公共(public)对象头中并不断更新,仅通过引用对象,即使它是逻辑上不可变的引用。因此,小对象(以及分配在同一内存页中的所有其他对象)将被写入,并因此被复制。对于大对象(你的亿元素 numpy 数组),只要你不写入它,大部分就会保持共享,因为标题只占用许多页面之一
在 python 版本 3.8 中更改:在 macOS 上,spawn 启动方法现在是默认方法。参见 mulitprocessing doc . Spawn 没有利用写时复制。
关于python - 利用 "Copy-on-Write"将数据复制到 Multiprocessing.Pool() 工作进程,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/38084401/
我正在尝试使用多处理和队列实现生产者-消费者场景;主进程是生产者,两个子进程使用队列中的数据。这在没有任何异常 发生的情况下有效,但问题是我希望能够在工作人员死亡时重新启动他们(kill -9 wor
我试图在一个管理进程下启动一个数据队列服务器(这样它以后可以变成一个服务),虽然数据队列服务器功能在主进程中工作正常,但它在一个进程中不起作用使用 multiprocessing.Process 创建
我的多处理需求非常简单:我从事机器学习工作,有时我需要评估多个数据集中的一个算法,或者一个数据集中的多个算法,等等。我只需要运行一个带有一些参数的函数并获取一个数字。 我不需要 RPC、共享数据,什么
创建进程池或简单地遍历一个进程以创建更多进程之间有任何区别(以任何方式)吗? 这有什么区别?: pool = multiprocessing.Pool(5) pool.apply_async(work
multiprocessing.BoundedSemaphore(3) 与 multiprocessing.Sempahore(3) 有何不同? 我希望 multiprocessing.Bounded
我尝试通过 multiprocessing 包中的 Queue 对 Pipe 的速度进行基准测试。我认为 Pipe 会更快,因为 Queue 在内部使用 Pipe。 奇怪的是,Pipe 在发送大型 n
我有这样一个简单的任务: def worker(queue): while True: try: _ = queue.get_nowait()
我正在尝试编写一个与 multiprocessing.Pool 同时应用函数的应用程序。我希望这个函数成为一个实例方法(所以我可以在不同的子类中以不同的方式定义它)。这似乎是不可能的;正如我在其他地方
在 python 2 中,multiprocessing.dummy.Pool 和 multiprocessing.pool.ThreadPool 之间有什么区别吗?源代码似乎暗示它们是相同的。 最佳
我正在开发一个用于财务目的的模型。我将整个 S&P500 组件放在一个文件夹中,存储了尽可能多的 .hdf 文件。每个 .hdf 文件都有自己的多索引(年-周-分)。 顺序代码示例(非并行化): im
到目前为止,我是这样做的: rets=set(pool.map_async(my_callback, args.hosts).get(60*4)) 如果超时,我会得到一个异常: File "/usr
参见下面的示例和执行结果: #!/usr/bin/env python3.4 from multiprocessing import Pool import time import os def in
我的任务是监听 UDP 数据报,对其进行解码(数据报具有二进制信息),将解码后的信息放入字典中,将字典转储为 json 字符串,然后将 json 字符串发送到远程服务器(ActiveMQ)。 解码和发
我在 macOS 上工作,最近被 Python 3.8 多处理中“fork”到“spawn”的变化所困扰(参见 doc )。下面显示了一个简化的工作示例,其中使用“fork”成功但使用“spawn”失
multiprocessing.Queue 的文档指出从项目入队到其腌制表示刷新到底层管道之间存在一点延迟。显然,您可以将一个项目直接放入管道中(它没有说明其他情况,并且暗示情况就是如此)。 为什么管
我运行了一些测试代码来检查在 Linux 中使用 Pool 和 Process 的性能。我正在使用 Python 2.7。 multiprocessing.Pool 的源代码似乎显示它正在使用 mul
我在 Windows Standard Embedded 7 上运行 python 3.4.3。我有一个继承 multiprocessing.Process 的类。 在类的 run 方法中,我为进程对
我知道multiprocessing.Process类似于 threading.Thread当我子类 multiprocessing.Process 时要创建一个进程,我发现我不必调用 __init_
我有教科书声明说在多处理器系统中不建议禁用中断,并且会花费太多时间。但我不明白这一点,谁能告诉我多处理器系统禁用中断的过程?谢谢 最佳答案 在 x86(和其他架构,AFAIK)上,启用/禁用中断是基于
我正在执行下面的代码并且它工作正常,但它不会产生不同的进程,而是有时所有都在同一个进程中运行,有时 2 个在一个进程中运行。我正在使用 4 cpu 机器。这段代码有什么问题? def f(values
我是一名优秀的程序员,十分优秀!