gpt4 book ai didi

python - 同步访问时,在共享内存中使用 numpy 数组会很慢

转载 作者:行者123 更新时间:2023-12-01 04:26:43 31 4
gpt4 key购买 nike

我编写了一个程序,它接收大型数据集作为输入(~150mb 文本文件),对它们进行一些数学计算,然后以直方图报告结果。必须执行的计算数量与数据集中两个点的组合数量成正比,对于 100 万个点的数据集来说,这个数量非常大(约 50 亿)。

我希望通过使用Python的multiprocessing模块将部分直方图数据的计算分配给各个进程来减少一些计算时间,同时将最终直方图的数组保留在共享内存中,以便每个进程进程可以添加到它。

我通常基于this answer中描述的过程,使用多处理创建了该程序的工作版本。 ,但是我发现它实际上比我之前编写的非并行化版本稍慢。我尝试取消同步对共享数组的访问,发现这可以显着加快速度,但会导致部分数据丢失。

以下是代码的概要:

import numpy as np
from multiprocessing import Pool, Array

BINS = 200
DMAX = 3.5
DMIN = 0

def init(histo):
global histo_shared
histo_shared = histo

def to_np_array(mp_array):
return np.frombuffer(mp_array.get_obj())

# synchronize access to shared array
def calc_sync(i):
with histo_shared.get_lock():
calc_histo(i)

def calc_histo(i):
# create new array 'd_new' by doing some math on DATA using argument i
histo = to_np_array(histo_shared)
histo += np.histogram(d_new, bins=BINS,
range=(DMIN, DMAX))[0].astype(np.int32)

def main():
# read in data and calculate no. of iterations
global DATA
DATA = np.loadtxt("data.txt")
it = len(DATA) // 2

# create shared array
histo_shared = Array('l', BINS)

# write to shared array from different processes
p = Pool(initializer=init, initargs=(histo_shared,))
for i in range(1, it + 1):
p.apply_async(calc_sync, [i])
p.close()
p.join()

histo_final = to_np_array(histo_shared)
np.savetxt("histo.txt", histo_final)

if __name__ == '__main__':
main()

我是否遗漏了一些对我的表现产生严重影响的东西?有什么办法可以解决这个问题以加快速度吗?

非常感谢任何见解或建议!

最佳答案

您实际上锁定了可能获得的任何并行性,因为在您处理的整个过程中数据都处于锁定状态。

当这个方法

def calc_sync(i):
with histo_shared.get_lock():
calc_histo(i)

正在执行,您在处理直方图时对整个共享数据集设置了锁定。另请注意,

def calc_histo(i):
# create new array 'd_new' by doing some math on DATA using argument i
histo = to_np_array(histo_shared)
histo += np.histogram(d_new, bins=BINS,
range=(DMIN, DMAX))[0].astype(np.int32)

没有对 i 做任何事情,所以看起来你只是在再次处理相同的数据。什么是 d_new?我在您的列表中没有看到它。

理想情况下,您应该做的是获取大型数据集,将其切成一定数量的 block 并单独处理,然后组合结果。只锁定共享数据,不锁定处理步骤。这可能看起来像这样:

def calc_histo(slice):
# process the slice asyncronously
return np.histogram(slice, bins=BINS,
range=(DMIN, DMAX))[0].astype(np.int32)

def calc_sync(start,stop):

histo = None

# grab a chunk of data, you likely don't need to lock this
histo = raw_data[start:stop]

# acutal calculation is async
result = calc_histo(histo)

with histo_shared.get_lock():
histo_shared += result

对于成对数据:

def calc_sync(part1,part2):

histo = None
output = [] # or numpy array
# acutal calculation is async
for i in range(part1):
for j in range(part2):
# do whatever computation you need and add it to output

result = calc_histo(output)

with histo_shared.get_lock():
histo_shared += result

现在

 p = Pool(initializer=init, initargs=(histo_shared,))
for i in range(1, it + 1,slice_size):
for j in range(1, it + 1,slice_size):
p.apply_async(calc_sync, [histo_shared[j:j+slice_size], histo_shared[i:i+slice_size])

换句话来说,我们对数据进行两两切割,生成相关数据,然后将它们放入直方图中。您唯一需要的真正同步是在组合直方图中的数据时

关于python - 同步访问时,在共享内存中使用 numpy 数组会很慢,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/33001155/

31 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com