gpt4 book ai didi

python - 在python中的进程之间共享连续的numpy数组

转载 作者:IT老高 更新时间:2023-10-28 20:42:15 27 4
gpt4 key购买 nike

尽管我找到了许多类似于我的问题的答案,但我认为这里并没有直接解决它-我还有其他问题。共享连续的numpy数组的动机如下:

  • 我正在使用在Caffe上运行的卷积神经网络对图像进行回归到一系列连续值标签。
  • 图像需要特定的预处理和数据增强。
  • (1)标签的连续性质(它们是 float 的)和(2)数据扩充的约束意味着我正在python中预处理数据,然后使用内存将其作为连续的numpy数组使用Caffe中的数据层。
  • 将训练数据加载到内存中相对较慢。我想将其并行化:

  • (1)我正在编写的python创建了一个“数据处理程序”类,该类实例化了两个连续的numpy数组。
    (2)工作进程在这些numpy数组之间交替,从磁盘加载数据,执行预处理,然后将数据插入numpy数组。
    (3)同时,python Caffe包装器将数据从另一个阵列发送到GPU,以通过网络运行。

    我有几个问题:
  • 是否可以使用类似python多重处理中的Array类的方法在连续的numpy数组中分配内存,然后将其包装在共享内存对象中(我不确定'object'在这里是否是正确的术语)?
  • Numpy数组具有.ctypes属性,我认为这对于从Array()实例化共享内存数组很有用,但似乎无法确切确定如何使用它们。
  • 如果在没有numpy数组的情况下实例化了共享内存,它是否保持连续?如果不是,是否有办法确保它保持连续?

  • 是否可以做类似的事情:
    import numpy as np
    from multiprocessing import Array
    contArr = np.ascontiguousarray(np.zeros((n_images, n_channels, img_height, img_width)), dtype=np.float32)
    sm_contArr = Array(contArr.ctypes.?, contArr?)

    然后使用实例化该 worker
    p.append(Process(target=some_worker_function, args=(data_to_load, sm_contArr)))
    p.start()

    谢谢!

    编辑:我知道有许多库在不同的维护状态下具有相似的功能。我宁愿将其限制为纯python和numpy,但如果不可能的话,我当然会愿意使用它。

    最佳答案

    用多处理的ndarray包裹numpy的RawArray()
    跨进程共享内存中的numpy数组有多种方法。让我们看一下如何使用多处理模块来实现它。

    第一个重要的观察结果是numpy提供了 np.frombuffer()函数来将ndarray接口(interface)包装在支持缓冲区协议(protocol)(例如bytes()bytearray()array()等)的现有对象周围。这将根据只读对象创建只读数组,并根据可写对象创建可写数组。

    我们可以将其与多处理提供的共享内存RawArray() 结合使用。请注意,Array()不适用于该目的,因为它是具有锁定的代理对象,并且不会直接公开缓冲区接口(interface)。当然,这意味着我们需要自己对数字化的RawArrays进行适当的同步。

    关于ndarray包装的RawArray,存在一个复杂的问题:当多进程在进程之间发送这样的数组时-实际上,一旦创建数组,它就需要将我们的数组发送给两个工作人员-对其进行腌制,然后对其进行腌制。不幸的是,这导致它创建ndarray的副本,而不是在内存中共享它们。

    解决方案虽然有点丑陋,但它是保持RawArrays像一样,直到它们被转移到工作程序为止,并且仅在每个工作程序进程启动后才将它们包装在ndarray中。

    此外,最好直接通过multiprocessing.Queue传递数组,无论是普通的RawArray还是ndarray包装的数组,但这都不起作用。无法将RawArray放在这样的Queue中,并且将ndarray包装的nd进行腌制和未腌制,因此实际上已被复制。

    解决方法是将所有预分配数组的列表发送到工作进程,并通过队列将传递索引到该列表中。这非常像传递 token (索引),并且持有 token 的人都可以在关联的数组上进行操作。

    主程序的结构如下所示:

    #!/usr/bin/env python3
    # -*- coding: utf-8 -*-

    import numpy as np
    import queue

    from multiprocessing import freeze_support, set_start_method
    from multiprocessing import Event, Process, Queue
    from multiprocessing.sharedctypes import RawArray


    def create_shared_arrays(size, dtype=np.int32, num=2):
    dtype = np.dtype(dtype)
    if dtype.isbuiltin and dtype.char in 'bBhHiIlLfd':
    typecode = dtype.char
    else:
    typecode, size = 'B', size * dtype.itemsize

    return [RawArray(typecode, size) for _ in range(num)]


    def main():
    my_dtype = np.float32

    # 125000000 (size) * 4 (dtype) * 2 (num) ~= 1 GB memory usage
    arrays = create_shared_arrays(125000000, dtype=my_dtype)
    q_free = Queue()
    q_used = Queue()
    bail = Event()

    for arr_id in range(len(arrays)):
    q_free.put(arr_id) # pre-fill free queue with allocated array indices

    pr1 = MyDataLoader(arrays, q_free, q_used, bail,
    dtype=my_dtype, step=1024)
    pr2 = MyDataProcessor(arrays, q_free, q_used, bail,
    dtype=my_dtype, step=1024)

    pr1.start()
    pr2.start()

    pr2.join()
    print("\n{} joined.".format(pr2.name))

    pr1.join()
    print("{} joined.".format(pr1.name))


    if __name__ == '__main__':
    freeze_support()

    # On Windows, only "spawn" is available.
    # Also, this tests proper sharing of the arrays without "cheating".
    set_start_method('spawn')
    main()

    这将准备两个数组的列表,两个队列-一个“免费”队列,其中MyDataProcessor放置完成处理的数组索引,然后MyDataLoader从中获取它们;还有一个“二手”队列,其中MyDataLoader放置易于填充的数组的索引,然后由MyDataProcessor获取它们from-和multiprocessing.Event,以从所有 worker 中启动一致的保释。我们现在可以取消后者,因为我们只有一个阵列的生产者和一个消费者,但是为更多的 worker 做准备并没有什么坏处。

    然后,在列表中用RawArrays的所有索引预填充“空”队列,并实例化每种类型的worker之一,并向它们传递必要的通信对象。我们启动它们两个,然后等待它们到join()

    这是MyDataProcessor的样子,它使用“已用”队列中的数组索引,并将数据发送到某个外部黑盒(示例中为debugio.output):

    class MyDataProcessor(Process):
    def __init__(self, arrays, q_free, q_used, bail, dtype=np.int32, step=1):
    super().__init__()
    self.arrays = arrays
    self.q_free = q_free
    self.q_used = q_used
    self.bail = bail
    self.dtype = dtype
    self.step = step

    def run(self):
    # wrap RawArrays inside ndarrays
    arrays = [np.frombuffer(arr, dtype=self.dtype) for arr in self.arrays]

    from debugio import output as writer

    while True:
    arr_id = self.q_used.get()
    if arr_id is None:
    break

    arr = arrays[arr_id]

    print('(', end='', flush=True) # just visualizing activity
    for j in range(0, len(arr), self.step):
    writer.write(str(arr[j]) + '\n')
    print(')', end='', flush=True) # just visualizing activity

    self.q_free.put(arr_id)

    writer.flush()

    self.bail.set() # tell loaders to bail out ASAP
    self.q_free.put(None, timeout=1) # wake up loader blocking on get()

    try:
    while True:
    self.q_used.get_nowait() # wake up loader blocking on put()
    except queue.Empty:
    pass

    首先,它使用'np.frombuffer()'将接收到的RawArrays包装在ndarrays中,并保留新列表,因此它们在进程的运行时可用作numpy数组,而不必一遍又一遍地包装它们。

    还要注意,MyDataProcessor只会写入self.bail事件,而不会对其进行检查。相反,如果需要告诉它退出,它将在队列上找到一个None标记而不是数组索引。当MyDataLoader没有更多数据可用并启动拆卸过程时,将执行此操作,MyDataProcessor仍可以处理队列中的所有有效数组而不会过早退出。

    这是MyDataLoader的样子:

    class MyDataLoader(Process):
    def __init__(self, arrays, q_free, q_used, bail, dtype=np.int32, step=1):
    super().__init__()
    self.arrays = arrays
    self.q_free = q_free
    self.q_used = q_used
    self.bail = bail
    self.dtype = dtype
    self.step = step

    def run(self):
    # wrap RawArrays inside ndarrays
    arrays = [np.frombuffer(arr, dtype=self.dtype) for arr in self.arrays]

    from debugio import input as reader

    for _ in range(10): # for testing we end after a set amount of passes
    if self.bail.is_set():
    # we were asked to bail out while waiting on put()
    return

    arr_id = self.q_free.get()
    if arr_id is None:
    # we were asked to bail out while waiting on get()
    self.q_free.put(None, timeout=1) # put it back for next loader
    return

    if self.bail.is_set():
    # we were asked to bail out while we got a normal array
    return

    arr = arrays[arr_id]

    eof = False
    print('<', end='', flush=True) # just visualizing activity
    for j in range(0, len(arr), self.step):
    line = reader.readline()
    if not line:
    eof = True
    break

    arr[j] = np.fromstring(line, dtype=self.dtype, sep='\n')

    if eof:
    print('EOF>', end='', flush=True) # just visualizing activity
    break

    print('>', end='', flush=True) # just visualizing activity

    if self.bail.is_set():
    # we were asked to bail out while we filled the array
    return

    self.q_used.put(arr_id) # tell processor an array is filled

    if not self.bail.is_set():
    self.bail.set() # tell other loaders to bail out ASAP
    # mark end of data for processor as we are the first to bail out
    self.q_used.put(None)

    它的结构与其他 worker 非常相似。它有点肿的原因是它在许多点上检查了self.bail事件,以减少卡住的可能性。 (这并不是完全万无一失的,因为在检查和访问队列之间设置事件的可能性很小。如果这是个问题,则需要使用一些同步原语来仲裁对事件和队列的访问。)

    它还从一开始就将接收到的RawArrays包装在ndarrays中,并从外部黑盒(示例中为debugio.input)读取数据。

    请注意,通过在step=函数中为两个工作人员使用main()参数,我们可以更改完成读写的比率(严格地出于测试目的-在生产环境中,step=将是1,读写所有numpy数组成员)。

    增大两个值会使工作进程仅访问numpy数组中的几个值,从而显着加快了所有工作,这表明性能不受工作进程之间的通信的限制。如果我们将numpy数组直接放在Queues上,然后在整个进程之间来回复制它们,则增加步长不会显着改善性能-它将保持缓慢。

    作为引用,这是我用于测试的debugio模块:

    #!/usr/bin/env python3
    # -*- coding: utf-8 -*-

    from ast import literal_eval
    from io import RawIOBase, BufferedReader, BufferedWriter, TextIOWrapper


    class DebugInput(RawIOBase):
    def __init__(self, end=None):
    if end is not None and end < 0:
    raise ValueError("end must be non-negative")

    super().__init__()
    self.pos = 0
    self.end = end

    def readable(self):
    return True

    def read(self, size=-1):
    if self.end is None:
    if size < 0:
    raise NotImplementedError("size must be non-negative")
    end = self.pos + size
    elif size < 0:
    end = self.end
    else:
    end = min(self.pos + size, self.end)

    lines = []
    while self.pos < end:
    offset = self.pos % 400
    pos = self.pos - offset
    if offset < 18:
    i = (offset + 2) // 2
    pos += i * 2 - 2
    elif offset < 288:
    i = (offset + 12) // 3
    pos += i * 3 - 12
    else:
    i = (offset + 112) // 4
    pos += i * 4 - 112

    line = str(i).encode('ascii') + b'\n'
    line = line[self.pos - pos:end - pos]
    self.pos += len(line)
    size -= len(line)
    lines.append(line)

    return b''.join(lines)

    def readinto(self, b):
    data = self.read(len(b))
    b[:len(data)] = data
    return len(data)

    def seekable(self):
    return True

    def seek(self, offset, whence=0):
    if whence == 0:
    pos = offset
    elif whence == 1:
    pos = self.pos + offset
    elif whence == 2:
    if self.end is None:
    raise ValueError("cannot seek to end of infinite stream")
    pos = self.end + offset
    else:
    raise NotImplementedError("unknown whence value")

    self.pos = max((pos if self.end is None else min(pos, self.end)), 0)
    return self.pos


    class DebugOutput(RawIOBase):
    def __init__(self):
    super().__init__()
    self.buf = b''
    self.num = 1

    def writable(self):
    return True

    def write(self, b):
    *lines, self.buf = (self.buf + b).split(b'\n')

    for line in lines:
    value = literal_eval(line.decode('ascii'))
    if value != int(value) or int(value) & 255 != self.num:
    raise ValueError("expected {}, got {}".format(self.num, value))

    self.num = self.num % 127 + 1

    return len(b)


    input = TextIOWrapper(BufferedReader(DebugInput()), encoding='ascii')
    output = TextIOWrapper(BufferedWriter(DebugOutput()), encoding='ascii')

    关于python - 在python中的进程之间共享连续的numpy数组,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/31171277/

    27 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com