gpt4 book ai didi

python-3.x - Python3 : Multiprocessing consumes extensively much RAM and slows down

转载 作者:行者123 更新时间:2023-12-04 15:41:42 25 4
gpt4 key购买 nike

我启动多个进程以创建新对象列表。 htop向我展示了 1 到 4 个进程(我总是创建 3 个新对象)。

def foo(self):
with multiprocessing.Pool(processes=3, maxtasksperchild=10) as pool:
result = pool.map_async(self.new_obj, self.information)
self.new_objs = result.get()
pool.terminate()
gc.collect()

我打电话 foo()多次,每次调用时,整个过程运行速度较慢,程序甚至没有完成,因为它减慢了很多。该程序开始占用我所有的 RAM,而顺序方法没有任何显着的 RAM 使用量。

当我终止程序时,大多数情况下这是程序最后执行的函数。
->File "threading.py", line 293, in wait
waiter.acquire()

编辑
提供一些关于我的情况的信息。我创建了一个由节点组成的树。 foo()由父节点调用以创建其子节点。 result进程返回的是这些子节点。这些保存在父节点的列表中。我想并行化这些子节点的创建,而不是按顺序创建它们。

最佳答案

我认为您的问题主要与您的并行函数是对象的方法这一事实有关。如果没有更多信息,很难确定,但请考虑这个小玩具程序:

import multiprocessing as mp
import numpy as np
import gc


class Object(object):
def __init__(self, _):
self.data = np.empty((100, 100, 100), dtype=np.float64)


class Container(object):
def __new__(cls):
self = object.__new__(cls)
print("Born")
return self

def __init__(self):
self.objects = []

def foo(self):
with mp.Pool(processes=3, maxtasksperchild=10) as pool:
result = pool.map_async(self.new_obj, range(50))
self.objects.extend(result.get())
pool.terminate()
gc.collect()

def new_obj(self, i):
return Object(i)

def __del__(self):
print("Dead")


if __name__ == '__main__':
c = Container()
for j in range(5):
c.foo()

现在 Container仅被调用一次,因此您希望看到 "Born" ,然后是 "Dead"被打印出来;但是由于进程执行的代码是容器的方法,这意味着整个容器必须在其他地方执行!运行这个,你会看到一个混合流 "Born""Dead"当您的容器在每次执行 map 时重建:
Born
Born
Born
Born
Born
Dead
Born
Dead
Dead
Born
Dead
Born
...
<MANY MORE LINES HERE>
...
Born
Dead

为了说服自己整个容器每次都被复制和发送,尝试设置一些不可序列化的值:
def foo(self):
with mp.Pool(processes=3, maxtasksperchild=10) as pool:
result = pool.map_async(self.new_obj, range(50))
self.fn = lambda x: x**2
self.objects.extend(result.get())
pool.terminate()
gc.collect()

这将立即引发 AttributeError因为它不能序列化容器。

总结一下:当向池发送1000个请求时, Container将被序列化,发送到进程并在那里反序列化 1000 次。当然,它们最终会被删除(假设没有太多奇怪的交叉引用发生),但这肯定会给 RAM 带来很大压力,因为对象被序列化、调用、更新、重新序列化......对于每个映射输入中的元素。

你怎么能解决呢?好吧,理想情况下,不要共享状态:
def new_obj(_):
return Object(_)


class Container(object):
def __new__(cls):
self = object.__new__(cls)
print("Born")
return self

def __init__(self):
self.objects = []

def foo(self):
with mp.Pool(processes=3, maxtasksperchild=10) as pool:
result = pool.map_async(new_obj, range(50))
self.objects.extend(result.get())
pool.terminate()
gc.collect()

def __del__(self):
print("Dead")

这会在很短的时间内完成,并且只会在 RAM 上产生最小的飞艇(因为曾经构建过单个 Container)。如果您需要在那里传递一些内部状态,请将其提取并发送:
def new_obj(tup):
very_important_state, parameters = tup
return Object(very_important_state=very_important_state,
parameters=parameters)


class Container(object):
def __new__(cls):
self = object.__new__(cls)
print("Born")
return self

def __init__(self):
self.objects = []

def foo(self):
important_state = len(self.objects)
with mp.Pool(processes=3, maxtasksperchild=10) as pool:
result = pool.map_async(new_obj,
((important_state, i) for i in range(50)))
self.objects.extend(result.get())
pool.terminate()
gc.collect()

def __del__(self):
print("Dead")

这具有与以前相同的行为。如果您绝对无法避免在进程之间共享某些可变状态,请查看 the multiprocessing tools因为这样做而不必每次都复制所有内容。

关于python-3.x - Python3 : Multiprocessing consumes extensively much RAM and slows down,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/38140693/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com