gpt4 book ai didi

python - 使用 multiprocessing.Queue 交错加载数据有时会导致项目被乱序使用

转载 作者:太空宇宙 更新时间:2023-11-04 01:13:13 28 4
gpt4 key购买 nike

我正在编写一个动画图像数据的脚本。我有许多大图像立方体(3D 阵列)。对于其中的每一个,我都会逐个遍历每个立方体中的帧,一旦接近尾部,我就会加载下一个立方体并继续。由于每个立方体的尺寸都很大,所以加载时间很长(~5 秒)。我希望动画在多维数据集之间无缝过渡(同时也节省内存),所以我错开加载过程。我在解决方案方面取得了一些进展,但仍然存在一些问题。

下面的代码加载每个数据立方体,将其拆分为帧并将它们放入 multiprocessing.Queue 中。一旦队列中的帧数低于某个阈值,就会触发下一个加载过程,加载另一个立方体并将其解压缩到队列中。

查看下面的代码:

import numpy as np
import multiprocessing as mp
import logging
logger = mp.log_to_stderr(logging.INFO)
import time

def data_loader(event, queue, **kw):
'''loads data from 3D image cube'''
event.wait() #wait for trigger before loading

logger.info( 'Loading data' )
time.sleep(3) #pretend to take long to load the data
n = 100
data = np.ones((n,20,20))*np.arange(n)[:,None,None] #imaginary 3D image cube (increasing numbers so that we can track the data ordering)

logger.info( 'Adding data to queue' )
for d in data:
queue.put(d)
logger.info( 'Done adding to queue!' )


def queue_monitor(queue, triggers, threshold=50, interval=5):
'''
Triggers the load events once the number of data in the queue falls below
threshold, then doesn't trigger again until the interval has passed.
Note: interval should be larger than data load time.
'''
while len(triggers):
if queue.qsize() < threshold:
logger.info( 'Triggering next load' )
triggers.pop(0).set()
time.sleep(interval)


if __name__ == '__main__':
logger.info( "Starting" )
out_queue = mp.Queue()

#Initialise the load processes
nprocs, procs = 3, []
triggers = [mp.Event() for _ in range(nprocs)]
triggers[0].set() #set the first process to trigger immediately
for i, trigger in enumerate(triggers):
p = mp.Process( name='data_loader %d'%i, target=data_loader,
args=(trigger, out_queue) )
procs.append( p )
for p in procs:
p.start()

#Monitoring process
qm = mp.Process( name='queue_monitor', target=queue_monitor,
args=(out_queue, triggers) )
qm.start()

#consume data
while out_queue.empty():
pass
else:
for d in iter( out_queue.get, None ):
time.sleep(0.2) #pretend to take some time to process/animate the data
logger.info( 'data: %i' %d[0,0] ) #just to keep track of data ordering

这在某些情况下效果很好,但有时在触发新的加载过程后数据的顺序会变得困惑。我不明白为什么会这样——mp.Queue 应该是 FIFO 对吧?!例如。按原样运行上面的代码不会在输出队列中保留正确的顺序,但是,将阈值更改为较低的值,例如。 30 解决了这个问题。 *很困惑...

所以问题:如何在 python 中使用 multiprocessing 正确实现这种交错加载策略?

最佳答案

这看起来像是一个缓冲问题。在内部,multiprocessing.Queue 使用缓冲区临时存储您已入队的项目,并最终将它们刷新到后台线程中的 Pipe。只有在刷新发生之后,项目才会真正发送到其他进程。因为您要将大型对象放入 Queue,所以需要进行大量缓冲。这导致加载过程实际上重叠,即使您的日志记录显示一个过程在另一个过程开始之前完成。文档实际上对这种情况有警告:

When an object is put on a queue, the object is pickled and a background thread later flushes the pickled data to an underlying pipe. This has some consequences which are a little surprising, but should not cause any practical difficulties – if they really bother you then you can instead use a queue created with a manager.

  1. After putting an object on an empty queue there may be an infinitesimal delay before the queue’s empty() method returns False and get_nowait() can return without raising Queue.Empty.
  2. If multiple processes are enqueuing objects, it is possible for the objects to be received at the other end out-of-order. However, objects enqueued by the same process will always be in the expected order with respect to each other.

我建议按照文档状态进行操作,并使用 multiprocessing.Manager 来创建您的队列:

m = mp.Manager()
out_queue = m.Queue()

这会让你完全避免这个问题。

另一种选择是只使用一个进程来完成所有数据加载,并让它在循环中运行,在循环顶部调用 event.wait()

关于python - 使用 multiprocessing.Queue 交错加载数据有时会导致项目被乱序使用,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/26409203/

28 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com