python - 对称稀疏矩阵的高效切片

转载 作者:太空宇宙 更新时间:2023-11-03 11:17:39
我有一个稀疏对称矩阵列表 sigma 使得

len(sigma) = N


sigma[i].shape[0] == sigma[i].shape[1] = m  # Square
sigma[i][j,k] == sigma[i][k,j] # Symmetric

我有一个索引数组 P 这样

P.shape[0] = N
P.shape[1] = k

我的目标是使用 P[i,:] 给出的索引提取 sigma[i]k x k 密集子矩阵。这可以按如下方式完成

sub_matrices = np.empty([N,k,k])
for i in range(N):
sub_matrices[i,:,:] = sigma[i][np.ix_(P[i,:], P[i,:])].todense()

但是请注意,虽然 k 很小,但 N(和 m)非常大。如果稀疏对称矩阵以 CSR 格式存储,这将花费很长时间。我觉得必须有更好的解决方案。例如,是否存在适合需要在两个维度上切片的对称矩阵的稀疏格式?

我正在使用 Python,但愿意接受任何我可以使用 Cython 接口(interface)的 C 库建议。


请注意,我目前的 Cython 方法如下:

cimport cython
import numpy as np
cimport numpy as np

@cython.boundscheck(False) # turn off bounds-checking for entire function
cpdef sparse_slice_fast_cy(sigma,
long[:,:] P,
double[:,:,:] sub_matrices):
sigma: A list (N,) of sparse sp.csr_matrix (m x m)
P: A 2D array of integers (N, k)
sub_matrices: A 3D array of doubles (N, k, k) containing the slicing
# Create variables for keeping code tidy
cdef long N = P.shape[0]
cdef long k = P.shape[1]

cdef long i
cdef long j
cdef long index_pointer
cdef long sparse_row_pointer

# Create objects for holding sparse matrix data
cdef double[:] data
cdef long[:] indices
cdef long[:] indptr

# Object for the ordered P
cdef long[:] perm

# Make sure sub_matrices is all 0
sub_matrices[:] = 0

for i in range(N):
# Sort the P
perm = np.argsort(P[i,:])

# Get the sparse matrix values
data = sigma[i].data
indices = sigma[i].indices.astype(long)
indptr = sigma[i].indptr.astype(long)

for j in range(k):
# Loop over row P[i, perm[j]] in sigma searching for values
# in P[i, :] vector i.e. compare
# sigma[P[i, perm[j], :]
# against
# P[i,:]

# To do this we need our sparse row vector with columns
# indices[indptr[P[i, perm[j]]], indptr[P[i, perm[j]]+1]]
# and data/values
# data[indptr[P[i, perm[j]]], indptr[P[i, perm[j]]+1]]
# which comes from the csr matrix format.
# We also need our sorted indexing vector
# P[i, perm[:]]

# We begin by pointing at the top of both
# our vectors and gradually move down them. In the event of
# an equality we add the data to sub_matrices[i,:,:] and
# increment the INDEXING VECTOR pointer, not the sparse
# row vector pointer, as there can be multiple values that
# are the same in the indexing vector but not the sparse row
# column vector (only 1 column can appear in 1 row!).
index_pointer = 0
sparse_row_pointer = indptr[P[i, perm[j]]]

while ((index_pointer < k) and (sparse_row_pointer < indptr[P[i, perm[j]] + 1])):
if indices[sparse_row_pointer] == P[i, perm[index_pointer]]:
# We can add data to sub_matrices
sub_matrices[i, perm[j], perm[index_pointer]] = \

# Only increment the index pointer
index_pointer += 1
elif indices[sparse_row_pointer] > P[i, perm[index_pointer]]:
# Need to increment index pointer
index_pointer += 1
# Need to increment sparse row pointer
sparse_row_pointer += 1

我相信 np.argsort 经常在相对较小的向量上调用时可能效率低下并且想换成 C 实现。我也没有利用可能在 N 稀疏矩阵上加速的并行处理。不幸的是,由于外部循环中存在 Python 强制转换,我不知道如何使用 prange

另一点需要注意的是,Cython 方法似乎使用了大量内存,但我不知道它被分配到哪里。



cimport cython
import numpy as np
cimport numpy as np

@cython.boundscheck(False) # turn off bounds-checking for entire function
cpdef sparse_slice_fast_cy(sigma,
np.ndarray[np.int32_t, ndim=2] P,
np.float64_t[:,:,:] sub_matrices,
int symmetric):
sigma: A list (N,) of sparse sp.csr_matrix (m x m)
P: A 2D array of integers (N, k)
sub_matrices: A 3D array of doubles (N, k, k) containing the slicing
symmetric: 1 if the sigma matrices are symmetric
# Create variables for keeping code tidy
cdef np.int32_t N = P.shape[0]
cdef np.int32_t k = P.shape[1]

cdef np.int32_t i
cdef np.int32_t j
cdef np.int32_t index_pointer
cdef np.int32_t sparse_row_pointer

# Create objects for holding sparse matrix data
cdef np.float64_t[:] data
cdef np.int32_t[:] indices

cdef np.int32_t[:] indptr

# Object for the ordered P
cdef np.int32_t[:,:] perm = np.argsort(P, axis=1).astype(np.int32)

# Make sure sub_matrices is all 0
sub_matrices[:] = 0

for i in range(N):
# Get the sparse matrix values
data = sigma[i].data
indices = sigma[i].indices
indptr = sigma[i].indptr

for j in range(k):
# Loop over row P[i, perm[j]] in sigma searching for values
# in P[i, :] vector i.e. compare
# sigma[P[i, perm[j], :]
# against
# P[i,:]

# To do this we need our sparse row vector with columns
# indices[indptr[P[i, perm[j]]], indptr[P[i, perm[j]]+1]]
# and data/values
# data[indptr[P[i, perm[j]]], indptr[P[i, perm[j]]+1]]
# which comes from the csr matrix format.
# We also need our sorted indexing vector
# P[i, perm[:]]

# We begin by pointing at the top of both
# our vectors and gradually move down them. In the event of
# an equality we add the data to sub_matrices[i,:,:] and
# increment the INDEXING VECTOR pointer, not the sparse
# row vector pointer, as there can be multiple values that
# are the same in the indexing vector but not the sparse row
# column vector (only 1 column can appear in 1 row!).

if symmetric:
index_pointer = j # Only search upper triangular
index_pointer = 0
sparse_row_pointer = indptr[P[i, perm[i, j]]]

while ((index_pointer < k) and (sparse_row_pointer < indptr[P[i, perm[i, j]] + 1])):
if indices[sparse_row_pointer] == P[i, perm[i, index_pointer]]:
# We can add data to sub_matrices
sub_matrices[i, perm[i, j], perm[i, index_pointer]] = \

if symmetric:
sub_matrices[i, perm[i, index_pointer], perm[i, j]] = \

# Only increment the index pointer
index_pointer += 1
elif indices[sparse_row_pointer] > P[i, perm[i, index_pointer]]:
# Need to increment index pointer
index_pointer += 1
# Need to increment sparse row pointer
sparse_row_pointer += 1



# See
cimport cython
import numpy as np
cimport numpy as np
from libc.stdlib cimport malloc, free
from cython.parallel import prange

@cython.boundscheck(False) # turn off bounds-checking for entire function
cpdef sparse_slice_fast_cy(sigma,
np.ndarray[np.int32_t, ndim=2] P,
np.float64_t[:,:,:] sub_matrices,
int symmetric):
sigma: A list (N,) of sparse sp.csr_matrix (m x m)
P: A 2D array of integers (N, k)
sub_matrices: A 3D array of doubles (N, k, k) containing the slicing
symmetric: 1 if the sigma matrices are symmetric
# Create variables for keeping code tidy
cdef np.int32_t N = P.shape[0]
cdef np.int32_t k = P.shape[1]

cdef np.int32_t i
cdef np.int32_t j
cdef np.int32_t index_pointer
cdef np.int32_t sparse_row_pointer

# Create objects for holding sparse matrix data
cdef np.float64_t[:] data_mem_view
cdef np.int32_t[:] indices_mem_view
cdef np.int32_t[:] indptr_mem_view

cdef np.float64_t **data = <np.float64_t **> malloc(N * sizeof(np.float64_t *))
cdef np.int32_t **indices = <np.int32_t **> malloc(N * sizeof(np.int32_t *))
cdef np.int32_t **indptr = <np.int32_t **> malloc(N * sizeof(np.int32_t *))

for i in range(N):
data_mem_view = sigma[i].data
data[i] = &(data_mem_view[0])

indices_mem_view = sigma[i].indices
indices[i] = &(indices_mem_view[0])

indptr_mem_view = sigma[i].indptr
indptr[i] = &(indptr_mem_view[0])

# Object for the ordered P
cdef np.int32_t[:,:] perm = np.argsort(P, axis=1).astype(np.int32)

# Make sure sub_matrices is all 0
sub_matrices[:] = 0

for i in prange(N, nogil=True):
for j in range(k):
# Loop over row P[i, perm[j]] in sigma searching for values
# in P[i, :] vector i.e. compare
# sigma[P[i, perm[j], :]
# against
# P[i,:]
# To do this we need our sparse row vector with columns
# indices[indptr[P[i, perm[j]]], indptr[P[i, perm[j]]+1]]
# and data/values
# data[indptr[P[i, perm[j]]], indptr[P[i, perm[j]]+1]]
# which comes from the csr matrix format.
# We also need our sorted indexing vector
# P[i, perm[:]]

# We begin by pointing at the top of both
# our vectors and gradually move down them. In the event of
# an equality we add the data to sub_matrices[i,:,:] and
# increment the INDEXING VECTOR pointer, not the sparse
# row vector pointer, as there can be multiple values that
# are the same in the indexing vector but not the sparse row
# column vector (only 1 column can appear in 1 row!).

if symmetric:
index_pointer = j # Only search upper triangular
index_pointer = 0
sparse_row_pointer = indptr[i][P[i, perm[i, j]]]

while ((index_pointer < k) and
(sparse_row_pointer < indptr[i][P[i, perm[i, j]] + 1])):
if indices[i][sparse_row_pointer] == P[i, perm[i, index_pointer]]:
# We can add data to sub_matrices
sub_matrices[i, perm[i, j], perm[i, index_pointer]] = \

if symmetric:
sub_matrices[i, perm[i, index_pointer], perm[i, j]] = \

# Only increment the index pointer
index_pointer = index_pointer + 1
elif indices[i][sparse_row_pointer] > P[i, perm[i, index_pointer]]:
# Need to increment index pointer
index_pointer = index_pointer + 1
# Need to increment sparse row pointer
sparse_row_pointer = sparse_row_pointer + 1

# Free malloc'd data



cythonize -i sparse_slice.pyx

其中 sparse_slice.pyx 是文件名。然后你可以使用这个脚本:

import time
import numpy as np
import scipy as sp
import scipy.sparse
from sparse_slice import sparse_slice_fast_cy

k = 100
N = 20000
m = 10000
samples = 20

# Create sigma matrices
## The sampling of random sparse takes a while so just do a few and
## then populate with these.
now = time.time()
sigma_samples = []
for i in range(samples):
sigma_samples.append(sp.sparse.rand(m, m, density=0.001, format='csr'))
sigma_samples[-1] = sigma_samples[-1] + sigma_samples[-1].T # Symmetric

## Now make the sigma list from these.
sigma = []
for i in range(N):
j = np.random.randint(samples)
print('Time to make sigma: {}'.format(time.time() - now))

# Create indexer
now = time.time()
P = np.empty([N, k]).astype(int)
for i in range(N):
P[i, :] = np.random.choice(np.arange(m), k, replace=True)
print('Time to make P: {}'.format(time.time() - now))

# Create objects for holding the slices
sub_matrices_slow = np.empty([N, k, k])
sub_matrices_fast = np.empty([N, k, k])

# Run both slicings
## Slow
now = time.time()
for i in range(N):
sub_matrices_slow[i,:,:] = sigma[i][np.ix_(P[i,:], P[i,:])].todense()
print('Time to make sub_matrices_slow: {}'.format(time.time() - now))

## Fast
symmetric = 1
now = time.time()
sparse_slice_fast_cy(sigma, P.astype(np.int32), sub_matrices_fast, symmetric)
print('Time to make sub_matrices_fast: {}'.format(time.time() - now))

assert(np.all((sub_matrices_slow - sub_matrices_fast)**2 < 1e-6))



A) 在 i-loop 上一次对所有行进行排序:

# Object for the ordered P
cdef long[:,:] perm = np.argsort(P, axis=1)

也许您需要将 P 作为 np.ndarray[np.int64_t, ndim=2] P(或任何类型)传递以避免复制。您必须通过 perm[i,X] 而不是 perm[X] 访问数据。

B) 定义

cdef np.int32_t[:] indices
cdef np.int32_t[:] indptr


for i in range(N):
data = sigma[i].data
indices = sigma[i].indices
indptr = sigma[i].indptr

我认为因为 sigma[i]O(m) 元素,复制是你函数的瓶颈:你得到运行时间 O(N *(m+k^2)) 而不是 `O(N*k^2) - 最好避免它。


为了让 prangei 循环一起工作,您应该通过创建一种指向 dataindicesindptr 的第一个元素的指针数组,并在廉价的预处理步骤中填充它们。可以让它工作,但问题是并行化的 yield 有多少——很可能是这种情况,问题是内存限制的——必须看时间安排。


index_pointer = j #only upper triangle!
# We can add data to sub_matrices
#upper triangle sub-matrix:
sub_matrices[i, perm[j], perm[index_pointer]] = \
#lower triangle sub-matrix:
sub_matrices[i, perm[index_pointer], perm[j]] = \

我会从 B) 开始,看看结果如何...



 /usr/bin/time -f "peak_used_memory:%M(in Kb)" python

我使用 N=2000 运行我的测试并得到 (python3.6+cython0.27.1):

                             peak memory usage
only slow 245Mb
only fast 245Mb
slow+fast no check 402Mb
slow+fast+assert 576Mb

因此有 50Mb 的开销,两个函数使用了 200Mb,另外还有 176Mb 用于评估断言。对于 N 的其他值,我也可以看到相同的行为。

所以我想说 cython 没有大量的内存使用。


一种可能是不使用perm - 毕竟它也需要加载到缓存中。你可以做到,如果

  1. 您可以接受矩阵 sigma 中的任何行/列排列,而不仅仅是对 P 进行排序并使用它。
  2. 每行的元素很少,因此对每个元素进行线性搜索就可以了。
  3. 对每个元素进行二分查找

我想在最好的情况下你可以赢大约 20-30%。

有时 cython 生成的代码不容易针对 c 编译器进行优化,直接用 C 编写然后用 python 包装它通常会取得更好的结果。



cdef np.int64_t[:,:] perm = np.argsort(P, axis=1)


关于python - 对称稀疏矩阵的高效切片,我们在Stack Overflow上找到一个类似的问题:

25 4 0
