gpt4 book ai didi

python - 为什么在使用 python 的多处理处理 for 循环中具有共享 numpy 数据的令人尴尬的并行问题时没有加速?

转载 作者:太空狗 更新时间:2023-10-29 17:08:02 27 4
gpt4 key购买 nike

我想加速一个与贝叶斯推理相关的令人尴尬的并行问题。目的是推断一组图像 x 的系数 u,给定矩阵 A,使得 X = A*U。X 具有维度 mxn、A mxp 和 U pxn。对于 X 的每一列,必须推断出系数 U 的最佳对应列。最后,此信息用于更新 A。我使用 m = 3000、p = 1500 和 n = 100。因此,由于它是一个线性模型,系数矩阵 u 的推论由 n 个独立的计算组成。因此,我尝试使用 Python 的多处理模块,但没有加速。

这是我做的:

没有并行化的主要结构是:

import numpy as np
from convex import Crwlasso_cd

S = np.empty((m, batch_size))

for t in xrange(start_iter, niter):

## Begin Warm Start ##
# Take 5 gradient steps w/ this batch using last coef. to warm start inf.
for ws in range(5):
# Initialize the coefficients
if ws:
theta = U
else:
theta = np.dot(A.T, X)

# Infer the Coefficients for the given data batch X of size mxn (n=batch_size)
# Crwlasso_cd is the function that does the inference per data sample
# It's basically a C-inline code
for k in range(batch_size):
U[:,k] = Crwlasso_cd(X[:, k].copy(), A, theta=theta[:,k].copy())

# Given the inferred coefficients, update and renormalize
# the basis functions A
dA1 = np.dot(X - np.dot(A, U), U.T) # Gaussian data likelihood
A += (eta / batch_size) * dA1
A = np.dot(A, np.diag(1/np.sqrt((A**2).sum(axis=0))))

多处理的实现:

我尝试实现多处理。我有一台可以使用的 8 核机器。

  1. 有 3 个 for 循环。唯一似乎“可并行化”的是第三个,其中推断出系数:
    • 生成一个Queue,并将迭代次数从0到batch_size-1放入Queue
    • 生成8个进程,让他们通过Queue工作
  2. 使用multiprocessing.Array共享数据U

因此,我将第三个循环替换为以下内容:

from multiprocessing import Process, Queue
import multiprocessing as mp
from Queue import Empty

num_cpu = mp.cpu_count()
work_queue = Queue()

# Generate the empty ndarray U and a multiprocessing.Array-Wrapper U_mp around U
# The class Wrap_mp is attached. Basically, U_mp.asarray() gives the corresponding
# ndarray
U = np.empty((p, batch_size))
U_mp = Wrap_mp(U)

...

# Within the for-loops:
for p in xrange(batch_size):
work_queue.put(p)

processes = [Process(target=infer_coefficients_mp, args=(work_queue,U_mp,A,X)) for p in range(num_cpu)]

for p in processes:
p.start()
print p.pid
for p in processes:
p.join()

这是 Wrap_mp 类:

class Wrap_mp(object):
""" Wrapper around multiprocessing.Array to share an array across
processes. Store the array as a multiprocessing.Array, but compute with it
as a numpy.ndarray
"""

def __init__(self, arr):
""" Initialize a shared array from a numpy array.

The data is copied.
"""
self.data = ndarray_to_shmem(arr)
self.dtype = arr.dtype
self.shape = arr.shape

def __array__(self):
""" Implement the array protocole.
"""
arr = shmem_as_ndarray(self.data, dtype=self.dtype)
arr.shape = self.shape
return arr

def asarray(self):
return self.__array__()

这里是函数 infer_coefficients_mp:

def infer_feature_coefficients_mp(work_queue,U_mp,A,X):

while True:
try:
index = work_queue.get(block=False)
x = X[:,index]
U = U_mp.asarray()
theta = np.dot(phit,x)

# Infer the coefficients of the column index
U[:,index] = Crwlasso_cd(x.copy(), A, theta=theta.copy())

except Empty:
break

现在的问题如下:

  1. 对于给定的数据维度,多处理版本并不比单线程版本快。
  2. 进程 ID 会随着每次迭代而增加。这是否意味着不断有新的进程产生?这不会产生巨大的开销吗?我怎样才能避免这种情况?是否有可能在整个 for 循环中创建 8 个不同的进程并仅使用数据更新它们?
  3. 我在进程之间共享系数 U 的方式是否会减慢计算速度?还有其他更好的方法吗?
  4. 进程池会更好吗?

我真的很感谢任何形式的帮助!一个月前我开始使用 Python,现在我很迷茫。

引擎

最佳答案

每次创建流程时,您都在创建一个新流程。如果您在 for 循环中执行此操作,那么是的,您每次通过循环都会启动新进程。听起来您想要做的是在循环外初始化队列和进程,然后在循环内填充队列。

我以前使用过 multiprocessing.Pool,它很有用,但与您已经使用 Queue 实现的功能相比,它提供的功能并不多。

关于python - 为什么在使用 python 的多处理处理 for 循环中具有共享 numpy 数据的令人尴尬的并行问题时没有加速?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/4373131/

27 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com