gpt4 book ai didi

python - 如何在python中并行for循环?

转载 作者:太空狗 更新时间:2023-10-30 01:32:58 35 4
gpt4 key购买 nike

关闭。这个问题需要更多focused .它目前不接受答案。












想改善这个问题吗?更新问题,使其仅关注一个问题 editing this post .

5年前关闭。




Improve this question




更新 1.0 开始
好像打电话的时候

for i, Wi in enumerate(W.T):
idx.append(i)
result.append(pool.apply_async(ALS_Y, (X, Wi, Q, lambda_, n_factors, i,)))
传递给函数的参数 ALS_Y/ALS_X不是引用,它复制了参数..所以,当 XYlarge matrixes ,例如,就我而言,它是 6000*40左右(它是一个 for-loop ,让我们假设迭代次数是 50 000 ,所以......),它超过了内存限制。
然后我尝试使用全局参数,只是将索引作为参数传递给函数,
import multiprocessing
import time
import numpy as np

def func(idx):
global a
a[idx] += 1



if __name__ == "__main__":
a=range(10)
for j in xrange(2):
pool = multiprocessing.Pool(processes=8)
result = []
for i in xrange(10):
result.append(pool.apply_async(func, (i, )))
pool.close()
pool.join()
print a
print "Sub-process(es) done."
它输出:`
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
Sub-process(es) done.
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
Sub-process(es) done.
So, this means it still copied一个`!
现在,我想知道有没有办法处理这个问题?欣赏!
更新 1.0 结束

下面是我在python中解决矩阵分解问题的代码。
W = XY。不过下面的代码效率不高,希望能转成并行版本,用GPU最好,CPU也好。我没有并行编程的经验,所以有人可以给我一些建议吗?
下面是使用 ALS 分解矩阵的代码(交替最小二乘法,详细信息 here)
for ii in range(n_iterations):
for u, Wu in enumerate(W):
X[u] = np.linalg.solve(np.dot(Y, np.dot(np.diag(Wu), Y.T)) + lambda_ * np.eye(n_factors),
np.dot(Y, np.dot(np.diag(Wu), Q[u].T))).T #X_inner loop

for i, Wi in enumerate(W.T):
Y[:,i] = np.linalg.solve(np.dot(X.T, np.dot(np.diag(Wi), X)) + lambda_ * np.eye(n_factors), #Y_inner loop
np.dot(X.T, np.dot(np.diag(Wi), Q[:, i])))#Y_inner loop
error = get_error(Q, X, Y, W)
weighted_errors.append(error)
print '{}th iteration is completed'.format(ii)
使用多处理库后,我的代码现在:
def ALS_X(Y, Wu, Q, lambda_, n_factors, u):
return np.linalg.solve(np.dot(Y, np.dot(np.diag(Wu), Y.T)) + lambda_ * np.eye(n_factors),
np.dot(Y, np.dot(np.diag(Wu), Q[u].T))).T

for ii in range(n_iterations):
pool = multiprocessing.Pool(processes=12)#create pool
result = []#store each row for X
idx = []#store the row number
for u, Wu in enumerate(W):
idx.append(u)
result.append(pool.apply_async(ALS_X, (Y, Wu, Q, lambda_, n_factors, u,)))
pool.close()
pool.join()
for u, vector in zip(idx, result):
X[u] = vector.get()#assign the result to X
######################################
pool = multiprocessing.Pool(processes=12)#for Y, much similar to X
result = []
idx = []
for i, Wi in enumerate(W.T):
idx.append(i)
result.append(pool.apply_async(ALS_Y, (X, Wi, Q, lambda_, n_factors, i,)))
pool.close()
pool.join()
for i, vector in zip(idx, result):
Y[:,i] = vector.get()
error = get_error(Q, X, Y, W)
weighted_errors.append(error)
print '{}th iteration is completed'.format(ii), 'error: ',error
但是有点惨,程序总是无声无息的崩溃...
下面是我的全部代码.. 一切都乱七八糟。只需忽略 load_data get_errorvec2str ,因为在这里我随机生成矩阵..
import pandas as pd
import numpy as np
import multiprocessing

def vec2str(vec):
res = ''
for dim in len(vec):
res += str(vec[dim]) + ','
return res

def load_data(heads, filename, sep,header=None):
data = pd.read_table(filename, sep=sep, header=header, names=heads)
rp = data.pivot_table(columns=['sid'],index=['uid'],values=['rating'])#not generally...
Q = rp.fillna(0)
Q = Q.values
W = Q >0.5
W[W == True] = 1
W[W == False] = 0
W = W.astype(np.float64, copy=False)
return Q, W, rp

def get_error(Q, X, Y, W):
return np.sum((W * (Q - np.dot(X, Y)))**2)

'''
X[u] = np.linalg.solve(np.dot(, np.dot(np.diag(), .T)) + * np.eye(),
np.dot(, np.dot(np.diag(), Q[u].T))).T
'''
def ALS_X(Y, Wu, Q, lambda_, n_factors, u):
return np.linalg.solve(np.dot(Y, np.dot(np.diag(Wu), Y.T)) + lambda_ * np.eye(n_factors),
np.dot(Y, np.dot(np.diag(Wu), Q[u].T))).T

'''
Y[:,i] = np.linalg.solve(np.dot(X.T, np.dot(np.diag(Wi), X)) + lambda_ * np.eye(n_factors),
np.dot(X.T, np.dot(np.diag(Wi), Q[:, i])))
'''

def ALS_Y(X, Wi, Q, lambda_, n_factors, i):
return np.linalg.solve(np.dot(X.T, np.dot(np.diag(Wi), X)) + lambda_ * np.eye(n_factors),
np.dot(X.T, np.dot(np.diag(Wi), Q[:, i])))



if __name__ == "__main__":

lambda_ = 0.1
n_factors = 40
filename = 'data_songID'
n_iterations = 20
#Q, W, rp = load_data(['uid', 'sid', 'rating'], filename, ',')
Q = np.random.rand(1000,1000)
m, n = Q.shape
W = np.eye(1000)
print 'Loading data finished, ', 'size: ', Q.shape
print 'Settings ', 'lambda = {}'.format(lambda_), 'n_factors = {}'.format(n_factors)
X = 5 * np.random.rand(m, n_factors)
Y = 5 * np.random.rand(n_factors, n)
errors = []
for ii in range(n_iterations):
X = np.linalg.solve(np.dot(Y, Y.T) + lambda_ * np.eye(n_factors),
np.dot(Y, Q.T)).T
Y = np.linalg.solve(np.dot(X.T, X) + lambda_ * np.eye(n_factors),
np.dot(X.T, Q))
if ii % 100 == 0:
print('{}th iteration is completed'.format(ii))
errors.append(get_error(Q, X, Y, W))
Q_hat = np.dot(X, Y)
print('Error of rated movies: {}'.format(get_error(Q, X, Y, W)))
print errors
#####ALS start....#####
print '*'*100
weighted_errors = []
for ii in range(n_iterations):
pool = multiprocessing.Pool(processes=12)
result = []
idx = []
for u, Wu in enumerate(W):
idx.append(u)
result.append(pool.apply_async(ALS_X, (Y, Wu, Q, lambda_, n_factors, u,)))
pool.close()
pool.join()
for u, vector in zip(idx, result):
X[u] = vector.get()
######################################
pool = multiprocessing.Pool(processes=12)
result = []
idx = []
for i, Wi in enumerate(W.T):
idx.append(i)
result.append(pool.apply_async(ALS_Y, (X, Wi, Q, lambda_, n_factors, i,)))
pool.close()
pool.join()
for i, vector in zip(idx, result):
Y[:,i] = vector.get()
error = get_error(Q, X, Y, W)
weighted_errors.append(error)
print '{}th iteration is completed'.format(ii), 'error: ',error

weighted_Q_hat = np.dot(X,Y)
print weighted_errors
X.tofile('X.bin')
Y.tofile('Y.bin')
latent_user_file = open('user_latent','w')
for idx in len(rp.axes[0]):
latent_user_file.write(str(rp.axes[0][idx]) + '\t' + vec2str(X[idx,:]) + '\n')

latent_mid_file = open('mid_latent', 'w')
for idx in len(rp.axes[1]):
latent_mid_file.write(str(rp.axes[1][idx]) + '\t' + vec2str(Y.T[idx,:]) + '\n')

最佳答案

去年我遇到了你对 Python 中的“并行循环”的渴望,并在我的物理论文中修改了一个。有很多模块可以做你想做的事,但我发现我真的只能用 让它工作。 pp 我想要的任意函数的方式。

如果你想要看起来像这样的东西:

ResultList = Library_ParallelLoop.Main(
Function = ExampleFunction,
ListOfArgSets = ListOfArgSets,
Algorithm = 'pp',
PrintExtra = True
)

然后我将你指向我的 git hub 而不是在这篇文章中提供我的整个源代码,因为实际让它工作的实现是痛苦的很多行,并且涉及深度复制 python 函数,这显然是其他一些没有很好地预先构建的东西在 python 中。

Finding Primes Example:



https://github.com/douglasquincyadams/Main/blob/master/Test_ParallelLoop.py

Repo:



https://github.com/douglasquincyadams/Main

如果您在计算机的某个暗角下载我的存储库 - 那么您的工作片段应该是:
import Library_ParallelLoop

def do_the_thing_function(ii):
for u, Wu in enumerate(W):
X[u] = np.linalg.solve(np.dot(Y, np.dot(np.diag(Wu), Y.T)) + lambda_ * np.eye(n_factors),
np.dot(Y, np.dot(np.diag(Wu), Q[u].T))).T #X_inner loop

for i, Wi in enumerate(W.T):
Y[:,i] = np.linalg.solve(np.dot(X.T, np.dot(np.diag(Wi), X)) + lambda_ * np.eye(n_factors), #Y_inner loop
np.dot(X.T, np.dot(np.diag(Wi), Q[:, i])))#Y_inner loop
error = get_error(Q, X, Y, W)
weighted_errors.append(error)
print '{}th iteration is completed'.format(ii)
return #whatever your result is supposed to be... your code doesn't work on its own

ListOfArgSets = []
for ii in range(n_iterations):
ListOfArgSets.append( { "ii" : ii , } )

ResultList = Library_ParallelLoop.Main(
Function = do_the_thing_function,
ListOfArgSets = ListOfArgSets,
Algorithm = 'pp',
PrintExtra = True
)

如果你问我 - 一个非常像上面那个的并行循环应该已经很方便并且内置于语言中,但似乎总是以某种方式被塔中的巫师神秘地举例,当你尝试它时并不完全有效在你糟糕的笔记本电脑上。无论如何 - 希望这会有所帮助。

附加说明 - 我还建议,如果您想解决任意大规模并行化问题(除了简单的循环之外的任何问题),请使用 MPI,因为它具有各种花里胡哨的功能,可以让进程在运行中相互交谈. MPI 是科学界人士喜欢用于最大模拟的东西,因此设计用于处理非常大的作业(~10k+ 核)的更大尺寸的集群都支持 MPI,当然不太可能支持 pp 甚至多处理模块。如果您只想使用 PC 中的所有内核(或网络上的几台 PC),那么只需选择最简单的内核即可开始工作。

关于python - 如何在python中并行for循环?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/37583531/

35 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com