gpt4 book ai didi

Python 多处理池;等待迭代完成

转载 作者:太空宇宙 更新时间:2023-11-03 21:45:52 26 4
gpt4 key购买 nike

我有一个大型数据集,我希望我的脚本对其进行迭代,对每个条目执行一系列操作,然后安排结果存储到 HDD。由于数据集可能相对较大(~250 GB),RAM 可用性要求一次以 1000 个条目的 block (我在下面的代码中称为 dataBlock)的形式处理数据集。我还使用 multiprocessing.Pool 类来方便使用多个 CPU 核心来完成此任务。

我基本上已经做好了安排,以便将每个数据 block 传递给池,池使用 imap 方法在数据 block 上执行所需的计算,池返回计算结果,然后数据 block 的结果被附加到列表中。此列表 (processed_data) 是这组计算所需的最终产品。

processed_data = []

multiprocessing.Pool(processor_cap) as pool:

for blockIndex, block in enumerate(range(1000, height-remainder, 1000)):

#Read-in 1000 spectra from source dataset
dataBlock = np.asarray(raw_dset[blockIndex*1000:block][:])

'''
Pass data block to processor pool, which iterates through data
block. Each spectrum is handed off to a CPU in the pool,
which centroids it and appends the result to "processed_block".
'''
processed_block = pool.imap(centroid_spectrum, dataBlock)

#Append processed spectra to processed data bin
for idx, processed_spectrum in enumerate(processed_block):
processed_data.append(processed_spectrum)

我想知道的是如何让脚本在调用pool.imap()之后暂停,直到返回完整的processed_block而不关闭水池。目前,它直接进入 for 循环,该循环紧随上面的代码片段,而不等待 processed_blockpool.imap 返回。我尝试在 pool.imap() 调用后立即调用 pool.join() ,但它只返回 ***AssertionError 并且再次继续下面的 for 循环。一旦所有数据 block 都被送入池中,就在脚本末尾的下方,我最终可以在脚本中成功调用 pool.close()pool.join()上面最外面的 for 循环。

预先感谢您的帮助!

最佳答案

如果不付出很大的努力来改变周围的事情,就很难使用你的例子;但是如果您有一个来自 imap() 调用的迭代器,那么您可能会考虑在到达 for 循环之前将迭代器的元素解析为列表:

processed_block = pool.imap(centroid_spectrum, dataBlock)
processed_block = [ x for x in processed_block ] # convert from an iterator to a list
for idx, processed_spectrum in enumerate(processed_block):

等等

这样可以达到你想要的效果吗?

关于Python 多处理池;等待迭代完成,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/52505225/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com