gpt4 book ai didi

python - 使我的 NumPy 数组跨进程共享

转载 作者:太空狗 更新时间:2023-10-29 18:05:30 25 4
gpt4 key购买 nike

我已经阅读了很多关于共享数组的问题,它对于简单数组来说似乎足够简单,但我一直在尝试让它为我拥有的数组工作。

import numpy as np
data=np.zeros(250,dtype='float32, (250000,2)float32')

我尝试通过以某种方式使 mp.Array 接受 data 来将其转换为共享数组,我还尝试使用 ctypes 创建数组:

import multiprocessing as mp
data=mp.Array('c_float, (250000)c_float',250)

我设法让我的代码工作的唯一方法不是将数据传递给函数,而是传递一个编码字符串以进行解压缩/解码,然而这最终会导致调用 n(字符串数)个进程,这似乎是多余的.我想要的实现是基于将二进制字符串列表切成 x(进程数)并将这个 block 、dataindex 传递给工作的进程,除了 data 是在本地修改的,因此关于如何使其共享 的问题,任何使用自定义(嵌套)numpy 数组的示例都已经是帮助很大。

PS:这个问题是Python multi-processing的后续问题

最佳答案

请注意,您可以从复杂数据类型的数组开始:

In [4]: data = np.zeros(250,dtype='float32, (250000,2)float32')

并将其视为同质数据类型的数组:

In [5]: data2 = data.view('float32')

然后,将其转换回复杂数据类型:

In [7]: data3 = data2.view('float32, (250000,2)float32')

改变数据类型是一个非常快速的操作;它不会影响底层数据,只会影响 NumPy 解释它的方式。所以改变 dtype 实际上是没有成本的。

因此,您可以使用上面的技巧轻松地将您所读到的具有简单(同类)数据类型的数组应用于您的复杂数据类型。


下面的代码借鉴了J.F. Sebastian's answer, here 中的许多想法。 .

import numpy as np
import multiprocessing as mp
import contextlib
import ctypes
import struct
import base64


def decode(arg):
chunk, counter = arg
print len(chunk), counter
for x in chunk:
peak_counter = 0
data_buff = base64.b64decode(x)
buff_size = len(data_buff) / 4
unpack_format = ">%dL" % buff_size
index = 0
for y in struct.unpack(unpack_format, data_buff):
buff1 = struct.pack("I", y)
buff2 = struct.unpack("f", buff1)[0]
with shared_arr.get_lock():
data = tonumpyarray(shared_arr).view(
[('f0', '<f4'), ('f1', '<f4', (250000, 2))])
if (index % 2 == 0):
data[counter][1][peak_counter][0] = float(buff2)
else:
data[counter][1][peak_counter][1] = float(buff2)
peak_counter += 1
index += 1
counter += 1


def pool_init(shared_arr_):
global shared_arr
shared_arr = shared_arr_ # must be inherited, not passed as an argument


def tonumpyarray(mp_arr):
return np.frombuffer(mp_arr.get_obj())


def numpy_array(shared_arr, peaks):
"""Fills the NumPy array 'data' with m/z-intensity values acquired
from b64 decoding and unpacking the binary string read from the
mzXML file, which is stored in the list 'peaks'.

The m/z values are assumed to be ordered without validating this
assumption.

Note: This function uses multi-processing
"""
processors = mp.cpu_count()
with contextlib.closing(mp.Pool(processes=processors,
initializer=pool_init,
initargs=(shared_arr, ))) as pool:
chunk_size = int(len(peaks) / processors)
map_parameters = []
for i in range(processors):
counter = i * chunk_size
# WARNING: I removed -1 from (i + 1)*chunk_size, since the right
# index is non-inclusive.
chunk = peaks[i*chunk_size : (i + 1)*chunk_size]
map_parameters.append((chunk, counter))
pool.map(decode, map_parameters)

if __name__ == '__main__':
shared_arr = mp.Array(ctypes.c_float, (250000 * 2 * 250) + 250)
peaks = ...
numpy_array(shared_arr, peaks)

如果能保证执行赋值的各个进程

if (index % 2 == 0):
data[counter][1][peak_counter][0] = float(buff2)
else:
data[counter][1][peak_counter][1] = float(buff2)

永远不要争相修改同一个位置的数据,那我相信你真的可以放弃使用锁

with shared_arr.get_lock():

但我对你的代码理解得不够好,无法确定,所以为了安全起见,我包含了锁。

关于python - 使我的 NumPy 数组跨进程共享,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/15976937/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com