gpt4 book ai didi

python - 使用 pyspark 并行化 scipy csr 稀疏矩阵以进行大矩阵乘法

转载 作者:行者123 更新时间:2023-12-01 07:52:44 25 4
gpt4 key购买 nike

我正在计算两个大向量集(具有相同特征)之间的余弦相似度。每组向量都表示为 scipy CSR 稀疏矩阵 A 和 B。我想计算 A x B^T,它不会是稀疏的。但是,我只需要跟踪超过某个阈值的值,例如0.8。我正在尝试使用普通 RDD 在 Pyspark 中实现此功能,其想法是使用为 scipy CSR 矩阵实现的快速向量运算。

A 和 B 的行已标准化,因此要计算余弦相似度,我只需找到 A 中的每一行与 B 中的每一行的点积。A 的尺寸为 5,000,000 x 5,000。B 的尺寸为 2,000,000 x 5,000。

假设 A 和 B 太大,无法作为广播变量放入我的工作节点上的内存中。我应该如何以最佳方式并行化 A 和 B?

编辑在发布我的解决方案后,我一直在探索其他可能更清晰、更优化的方法,特别是为 Spark MLlib IndexedRowMatrix 对象实现的 columnSimilarities() 函数。 (Which pyspark abstraction is appropriate for my large matrix multiplication?)

最佳答案

我能够在这个框架中实现一个解决方案。
欢迎深入了解为什么这个解决方案很慢——是自定义序列化吗?

def csr_mult_helper(pair):
threshold=0.8
A_row = pair[0][0] # keep track of the row offset
B_col = pair[1][0] # offset for B (this will be a column index, after the transpose op)
A = sparse.csr_matrix(pair[0][1], pair[0][2]) # non-zero entires, size data
B = sparse.csr_matrix(pair[1][1], pair[1][2])

C = A * B.T # scipy sparse mat mul

for row_idx, row in enumerate(C): # I think it would be better to use a filter Transformation instead
col_indices = row.indices # but I had trouble with the row and column index book keeping
col_values = row.data

for col_idx, val in zip(col_indices, col_values):
if val > threshold:
yield (A_row + row_idx, B_col + col_idx, val) # source vector, target vector, cosine score

def parallelize_sparse_csr(M, rows_per_chunk=1):
[rows, cols] = M.shape
i_row = 0
submatrices = []
while i_row < rows:
current_chunk_size = min(rows_per_chunk, rows - i_row)
submat = M[i_row:(i_row + current_chunk_size)]
submatrices.append( (i_row, # offset
(submat.data, submat.indices, submat.indptr), # sparse matrix data
(current_chunk_size, cols)) ) # sparse matrix shape
i_row += current_chunk_size
return sc.parallelize(submatrices)

########## generate test data ###########
K,L,M,N = 5,2000,3,2000 # matrix dimensions (toy example)
A_ = sparse.rand(K,L, density=0.1, format='csr')
B_ = sparse.rand(M,N, density=0.1, format='csr')
print("benchmark: {} \n".format((A_ * B_.T).todense())) # benchmark solution for comparison

########## parallelize, multiply, and filter #########
t_start = time.time()
A = parallelize_sparse_csr(A_, rows_per_chunk=10)
B = parallelize_sparse_csr(B_, rows_per_chunk=10) # number of elements per partition, from B
# warning: this code breaks if the B_ matrix rows_per_chunk parameter != 1
# although I don't understand why yet

print("custom pyspark solution: ")
result = A.cartesian(B).flatMap(csr_mult_helper).collect()
print(results)

print("\n {} s elapsed".format(time.time() - t_start))

关于python - 使用 pyspark 并行化 scipy csr 稀疏矩阵以进行大矩阵乘法,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56118579/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com