gpt4 book ai didi

python - 在 iter 上并行化循环

转载 作者:太空狗 更新时间:2023-10-30 02:12:30 25 4
gpt4 key购买 nike

我的代码有性能问题。步骤 # IIII 耗时数小时。我曾经实现过之前的 itertools.prodct,但多亏了一位用户,我不再使用 pro_data = product(array_b,array_a)。这帮助我解决了内存问题,但仍然非常耗时。我想用多线程或多进程并行化它,无论你能提出什么建议,我都很感激。

解释。我有两个包含粒子 x 和 y 值的数组。对于每个粒子(由两个坐标定义),我想用另一个计算一个函数。对于组合,我使用 itertools.product 方法并遍历每个粒子。我总共运行了 50000 多个粒子,所以我有 N*N/2 个组合要计算。

提前致谢

import numpy as np
import matplotlib.pyplot as plt
from itertools import product,combinations_with_replacement

def func(ar1,ar2,ar3,ar4): #example func that takes four arguments
return (ar1*ar2**22+np.sin(ar3)+ar4)

def newdist(a):
return func(a[0][0],a[0][1],a[1][0],a[1][1])

x_edges = np.logspace(-3,1, num=25) #prepare x-axis for histogram

x_mean = 10**((np.log10(x_edges[:-1])+np.log10(x_edges[1:]))/2)
x_width=x_edges[1:]-x_edges[:-1]

hist_data=np.zeros([len(x_edges)-1])

array1=np.random.uniform(0.,10.,100)
array2=np.random.uniform(0.,10.,100)

array_a = np.dstack((array1,array1))[0]
array_b = np.dstack((array2,array2))[0]
# IIII
for i in product(array_a,array_b):
(result,bins) = np.histogram(newdist(i),bins=x_edges)
hist_data+=result

hist_data = np.array(map(float, hist_data))
plt.bar(x_mean,hist_data,width=x_width,color='r')
plt.show()

-----编辑-----我现在使用这段代码:

def mp_dist(array_a,array_b, d, bins): #d chunks AND processes
def worker(array_ab, out_q):
""" push result in queue """
outdict = {}
outdict = vec_chunk(array_ab, bins)
out_q.put(outdict)
out_q = mp.Queue()
a = np.swapaxes(array_a, 0 ,1)
b = np.swapaxes(array_b, 0 ,1)
array_size_a=len(array_a)-(len(array_a)%d)
array_size_b=len(array_b)-(len(array_b)%d)
a_chunk = array_size_a / d
b_chunk = array_size_b / d
procs = []
#prepare arrays for mp
array_ab = np.empty((4, a_chunk, b_chunk))
for j in xrange(d):
for k in xrange(d):
array_ab[[0, 1]] = a[:, a_chunk * j:a_chunk * (j + 1), None]
array_ab[[2, 3]] = b[:, None, b_chunk * k:b_chunk * (k + 1)]
p = mp.Process(target=worker, args=(array_ab, out_q))
procs.append(p)
p.start()
resultarray = np.empty(len(bins)-1)
for i in range(d):
resultarray+=out_q.get()
# Wait for all worker processes to finish
for pro in procs:
pro.join()
print resultarray
return resultarray

这里的问题是我无法控制进程的数量。我如何改用 mp.Pool()?比

最佳答案

首先,让我们看一下问题的直接向量化。我有一种感觉,您希望您的 array_aarray_b 完全相同,即粒子的坐标,但我在这里将它们分开。

我已将您的代码转换为一个函数,以简化计时:

def IIII(array_a, array_b, bins) :
hist_data=np.zeros([len(bins)-1])
for i in product(array_a,array_b):
(result,bins) = np.histogram(newdist(i), bins=bins)
hist_data+=result
hist_data = np.array(map(float, hist_data))
return hist_data

顺便说一句,您可以按如下方式以不太复杂的方式生成示例数据:

n = 100
array_a = np.random.uniform(0, 10, size=(n, 2))
array_b = np.random.uniform(0, 10, size=(n, 2))

所以首先我们需要向量化你的func。我已经这样做了,所以它可以采用任何 (4, ...) 形状的 array。为了节省内存,它在原地进行计算,并返回第一个平面,即 array[0]

def func_vectorized(a) :
a[1] **= 22
np.sin(a[2], out=a[2])
a[0] *= a[1]
a[0] += a[2]
a[0] += a[3]
return a[0]

有了这个函数,我们就可以编写一个矢量化版本的IIII:

def IIII_vec(array_a, array_b, bins) :
array_ab = np.empty((4, len(array_a), len(array_b)))
a = np.swapaxes(array_a, 0 ,1)
b = np.swapaxes(array_b, 0 ,1)
array_ab[[0, 1]] = a[:, :, None]
array_ab[[2, 3]] = b[:, None, :]
newdist = func_vectorized(array_ab)
hist, _ = np.histogram(newdist, bins=bins)
return hist

对于 n = 100 点,它们都返回相同的值:

In [2]: h1 = IIII(array_a, array_b, x_edges)

In [3]: h2 = IIII_bis(array_a, array_b, x_edges)

In [4]: np.testing.assert_almost_equal(h1, h2)

但时间差异已经非常相关:

In [5]: %timeit IIII(array_a, array_b, x_edges)
1 loops, best of 3: 654 ms per loop

In [6]: %timeit IIII_vec(array_a, array_b, x_edges)
100 loops, best of 3: 2.08 ms per loop

300 倍的加速!如果您使用更长的样本数据再次尝试,n = 1000,您会发现它们的缩放比例同样糟糕,如 n**2,因此 300x 保持不变:

In [10]: %timeit IIII(array_a, array_b, x_edges)
1 loops, best of 3: 68.2 s per loop

In [11]: %timeit IIII_bis(array_a, array_b, x_edges)
1 loops, best of 3: 229 ms per loop

所以你还在看 10 分钟。处理时间,与您当前的解决方案需要超过 2 天的时间相比,这并不算多。

当然,为了让事情变得如此美好,您需要将 (4, 50000, 50000) float 组装入内存,这是我的系统无法处理的。但是您仍然可以通过分块处理来保持相对较快的速度。 IIII_vec 的以下版本将每个数组分成 d 个 block 。如所写,数组的长度应能被 d 整除。克服这个限制不会太难,但它会混淆真正的目的:

def IIII_vec_bis(array_a, array_b, bins, d=1) :
a = np.swapaxes(array_a, 0 ,1)
b = np.swapaxes(array_b, 0 ,1)
a_chunk = len(array_a) // d
b_chunk = len(array_b) // d
array_ab = np.empty((4, a_chunk, b_chunk))
hist_data = np.zeros((len(bins) - 1,))
for j in xrange(d) :
for k in xrange(d) :
array_ab[[0, 1]] = a[:, a_chunk * j:a_chunk * (j + 1), None]
array_ab[[2, 3]] = b[:, None, b_chunk * k:b_chunk * (k + 1)]
newdist = func_vectorized(array_ab)
hist, _ = np.histogram(newdist, bins=bins)
hist_data += hist
return hist_data

首先,让我们检查它是否真的有效:

In [4]: h1 = IIII_vec(array_a, array_b, x_edges)

In [5]: h2 = IIII_vec_bis(array_a, array_b, x_edges, d=10)

In [6]: np.testing.assert_almost_equal(h1, h2)

现在是一些时间安排。 n = 100:

In [7]: %timeit IIII_vec(array_a, array_b, x_edges)
100 loops, best of 3: 2.02 ms per loop

In [8]: %timeit IIII_vec_bis(array_a, array_b, x_edges, d=10)
100 loops, best of 3: 12 ms per loop

但是当您开始不得不在内存中拥有越来越大的数组时,分块进行会开始得到返回。 n = 1000:

In [12]: %timeit IIII_vec(array_a, array_b, x_edges)
1 loops, best of 3: 223 ms per loop

In [13]: %timeit IIII_vec_bis(array_a, array_b, x_edges, d=10)
1 loops, best of 3: 208 ms per loop

使用 n = 10000 我无法再调用 IIII_vec 而不会出现 array is too big 错误,但 chunky 版本仍在运行:

In [18]: %timeit IIII_vec_bis(array_a, array_b, x_edges, d=10)
1 loops, best of 3: 21.8 s per loop

为了证明它可以完成,我用 n = 50000 运行了一次:

In [23]: %timeit -n1 -r1 IIII_vec_bis(array_a, array_b, x_edges, d=50)
1 loops, best of 1: 543 s per loop

这是一个很好的 9 分钟的数字运算,鉴于它已经计算了 25 亿次交互,这并不是那么糟糕。

关于python - 在 iter 上并行化循环,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/15271293/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com