gpt4 book ai didi

具有大型数组的 Windows 上的 Python 多处理

转载 作者:可可西里 更新时间:2023-11-01 10:46:47 25 4
gpt4 key购买 nike

我使用 python 的多处理模块在 linux 平台上编写了一个脚本。当我尝试在 Windows 上运行该程序时,这并没有直接运行,我发现这与 Windows 上生成子进程的方式有关。使用的对象可以被腌制似乎是至关重要的。

我的主要问题是,我使用的是大型 numpy 数组。似乎在一定尺寸下,它们不再是可采摘的。为了将它分解为一个简单的脚本,我想做这样的事情:

### Import modules

import numpy as np
import multiprocessing as mp

number_of_processes = 4

if __name__ == '__main__':

def reverse_np_array(arr):
arr = arr + 1
return arr

a = np.ndarray((200,1024,1280),dtype=np.uint16)

def put_into_queue(_Queue,arr):
_Queue.put(reverse_np_array(arr))


Queue_list = []
Process_list = []
list_of_arrays = []

for i in range(number_of_processes):
Queue_list.append(mp.Queue())


for i in range(number_of_processes):
Process_list.append(mp.Process(target=put_into_queue, args=(Queue_list[i],a)))

for i in range(number_of_processes):
Process_list[i].start()

for i in range(number_of_processes):
list_of_arrays.append(Queue_list[i].get())

for i in range(number_of_processes):
Process_list[i].join()

我收到以下错误消息:

Traceback (most recent call last):
File "Windows_multi.py", line 34, in <module>
Process_list[i].start()
File "C:\Program Files\Anaconda32\lib\multiprocessing\process.py", line 130, i
n start
self._popen = Popen(self)
File "C:\Program Files\Anaconda32\lib\multiprocessing\forking.py", line 277, i
n __init__
dump(process_obj, to_child, HIGHEST_PROTOCOL)
File "C:\Program Files\Anaconda32\lib\multiprocessing\forking.py", line 199, i
n dump
ForkingPickler(file, protocol).dump(obj)
File "C:\Program Files\Anaconda32\lib\pickle.py", line 224, in dump
self.save(obj)
File "C:\Program Files\Anaconda32\lib\pickle.py", line 331, in save
self.save_reduce(obj=obj, *rv)
File "C:\Program Files\Anaconda32\lib\pickle.py", line 419, in save_reduce
save(state)
File "C:\Program Files\Anaconda32\lib\pickle.py", line 286, in save
f(self, obj) # Call unbound method with explicit self
File "C:\Program Files\Anaconda32\lib\pickle.py", line 649, in save_dict
self._batch_setitems(obj.iteritems())

所以我基本上是在创建一个大数组,我需要在所有进程中使用这个数组进行计算并返回它。

一件重要的事情似乎是在语句 if __name__ = '__main__': 之前写下函数的定义

如果我将数组缩减为 (50,1024,1280),整个过程就会正常进行。然而,即使启动了 4 个进程并且有 4 个内核在工作,它也比编写没有仅针对一个内核的多处理代码(在 Windows 上)慢。所以我想我这里还有另一个问题。

我后面的真实程序中的函数是在一个cython模块中。

我正在使用带有 32 位 python 的 anaconda 包,因为我无法使用 64 位版本编译我的 cython 包(我将在另一个线程中询问)。

欢迎任何帮助!!

谢谢!菲利普

更新:

我犯的第一个错误是在 __main__ 中定义了一个“put_into_queue”函数。

然后我按照建议引入了共享数组,但是,它使用了大量内存,并且使用的内存与我使用的进程成比例(当然不应该是这种情况)。知道我在这里做错了什么吗?将共享数组的定义放在何处(在 __main__ 之内或之外)似乎并不重要,但我认为它应该在 __main__ 中。从这篇文章中得到这个:Is shared readonly data copied to different processes for Python multiprocessing?

import numpy as np
import multiprocessing as mp
import ctypes


shared_array_base = mp.Array(ctypes.c_uint, 1280*1024*20)
shared_array = np.ctypeslib.as_array(shared_array_base.get_obj())
#print shared_array
shared_array = shared_array.reshape(20,1024,1280)

number_of_processes = 4

def put_into_queue(_Queue,arr):
_Queue.put(reverse_np_array(arr))
def reverse_np_array(arr):
arr = arr + 1 + np.random.rand()
return arr
if __name__ == '__main__':


#print shared_arra

#a = np.ndarray((50,1024,1280),dtype=np.uint16)


Queue_list = []
Process_list = []
list_of_arrays = []

for i in range(number_of_processes):
Queue_list.append(mp.Queue())


for i in range(number_of_processes):
Process_list.append(mp.Process(target=put_into_queue, args=(Queue_list[i],shared_array)))

for i in range(number_of_processes):
Process_list[i].start()

for i in range(number_of_processes):
list_of_arrays.append(Queue_list[i].get())

for i in range(number_of_processes):
Process_list[i].join()

最佳答案

您没有包含完整的回溯;结局是最重要的。在我的 32 位 Python 上,我得到了最终以

结尾的相同回溯
  File "C:\Python27\lib\pickle.py", line 486, in save_string
self.write(BINSTRING + pack("<i", n) + obj)
MemoryError

MemoryError 是异常,它表示内存不足。

64 位 Python 可以解决这个问题,但在进程之间发送大量数据很容易成为 多处理 中的严重瓶颈。

关于具有大型数组的 Windows 上的 Python 多处理,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/21139911/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com