- c - 在位数组中找到第一个零
- linux - Unix 显示有关匹配两种模式之一的文件的信息
- 正则表达式替换多个文件
- linux - 隐藏来自 xtrace 的命令
我花了几个小时在不同的尝试上并行化我的数字运算代码,但是当我这样做时它只会变得更慢。不幸的是,当我尝试将其简化为下面的示例时,问题就消失了,而且我真的不想在这里发布整个程序。所以问题是:在这种类型的程序中我应该避免哪些陷阱?
(注:Unutbu 回答后的跟进在底部。)
以下是情况:
BigData
有很多内部数据。在这个例子中有一个列表 ff
插值函数;在实际程序中,还有更多,例如ffA[k]
, ffB[k]
, ffC[k]
. do_chunk()
. do_single()
将在 5 秒内运行并且 do_multi()
将在 55 秒内运行。 xi
来拆分工作。和 yi
数组成连续块并遍历所有 k
每个块中的值。那效果好一点。现在,无论我使用 1、2、3 还是 4 个线程,总执行时间都没有区别。但是,当然,我希望看到实际的加速! def do_chunk(array1, array2, array3)
并对该数组进行仅 numpy 的计算。在那里,有一个显着的速度提升。 #!/usr/bin/python2.7
import numpy as np, time, sys
from multiprocessing import Pool
from scipy.interpolate import RectBivariateSpline
_tm=0
def stopwatch(msg=''):
tm = time.time()
global _tm
if _tm==0: _tm = tm; return
print("%s: %.2f seconds" % (msg, tm-_tm))
_tm = tm
class BigData:
def __init__(self, n):
z = np.random.uniform(size=n*n*n).reshape((n,n,n))
self.ff = []
for i in range(n):
f = RectBivariateSpline(np.arange(n), np.arange(n), z[i], kx=1, ky=1)
self.ff.append(f)
self.n = n
def do_chunk(self, k, xi, yi):
s = np.sum(np.exp(self.ff[k].ev(xi, yi)))
sys.stderr.write(".")
return s
def do_multi(self, numproc, xi, yi):
procs = []
pool = Pool(numproc)
stopwatch('Pool setup')
for k in range(self.n):
p = pool.apply_async( _do_chunk_wrapper, (self, k, xi, yi))
procs.append(p)
stopwatch('Jobs queued (%d processes)' % numproc)
sum = 0.0
for k in range(self.n):
# Edit/bugfix: replaced p.get by procs[k].get
sum += np.sum(procs[k].get(timeout=30)) # timeout allows ctrl-C interrupt
if k == 0: stopwatch("\nFirst get() done")
stopwatch('Jobs done')
pool.close()
pool.join()
return sum
def do_single(self, xi, yi):
sum = 0.0
for k in range(self.n):
sum += self.do_chunk(k, xi, yi)
stopwatch('\nAll in single process')
return sum
def _do_chunk_wrapper(bd, k, xi, yi): # must be outside class for apply_async to chunk
return bd.do_chunk(k, xi, yi)
if __name__ == "__main__":
stopwatch()
n = 50
bd = BigData(n)
m = 1000*1000
xi, yi = np.random.uniform(0, n, size=m*2).reshape((2,m))
stopwatch('Initialized')
bd.do_multi(2, xi, yi)
bd.do_multi(3, xi, yi)
bd.do_single(xi, yi)
Initialized: 0.06 seconds
Pool setup: 0.01 seconds
Jobs queued (2 processes): 0.03 seconds
..
First get() done: 0.34 seconds
................................................Jobs done: 7.89 seconds
Pool setup: 0.05 seconds
Jobs queued (3 processes): 0.03 seconds
..
First get() done: 0.50 seconds
................................................Jobs done: 6.19 seconds
..................................................
All in single process: 11.41 seconds
self
被腌制成一个 37 到 140 MB 的对象,需要传递给工作进程。更糟糕的是,Python 酸洗非常慢;酸洗本身需要几秒钟,这发生在传递给工作进程的每一块工作中。除了酸洗和传递大数据对象之外,
apply_async
的开销在 Linux 中非常小;对于一个小函数(添加几个整数参数),每
apply_async
只需要 0.2 毫秒/
get
对。因此,将工作分成非常小的块本身并不是问题。因此,我将所有大数组参数作为索引传输到全局变量。出于 CPU 缓存优化的目的,我保持块大小较小。
dict
中。 ;设置工作池后,条目会立即在父进程中删除。只有
dict
的 key 被传送到工作进程。酸洗/IPC 的唯一大数据是 worker 创建的新数据。
#!/usr/bin/python2.7
import numpy as np, sys
from multiprocessing import Pool
_mproc_data = {} # global storage for objects during multiprocessing.
class BigData:
def __init__(self, size):
self.blah = np.random.uniform(0, 1, size=size)
def do_chunk(self, k, xi, yi):
# do the work and return an array of the same shape as xi, yi
zi = k*np.ones_like(xi)
return zi
def do_all_work(self, xi, yi, num_proc):
global _mproc_data
mp_key = str(id(self))
_mproc_data['bd'+mp_key] = self # BigData
_mproc_data['xi'+mp_key] = xi
_mproc_data['yi'+mp_key] = yi
pool = Pool(processes=num_proc)
# processes have now inherited the global variabele; clean up in the parent process
for v in ['bd', 'xi', 'yi']:
del _mproc_data[v+mp_key]
# setup indices for the worker processes (placeholder)
n_chunks = 45
n = len(xi)
chunk_len = n//n_chunks
i1list = np.arange(0,n,chunk_len)
i2list = i1list + chunk_len
i2list[-1] = n
klist = range(n_chunks) # placeholder
procs = []
for i in range(n_chunks):
p = pool.apply_async( _do_chunk_wrapper, (mp_key, i1list[i], i2list[i], klist[i]) )
sys.stderr.write(".")
procs.append(p)
sys.stderr.write("\n")
# allocate space for combined results
zi = np.zeros_like(xi)
# get data from workers and finish
for i, p in enumerate(procs):
zi[i1list[i]:i2list[i]] = p.get(timeout=30) # timeout allows ctrl-C handling
pool.close()
pool.join()
return zi
def _do_chunk_wrapper(key, i1, i2, k):
"""All arguments are small objects."""
global _mproc_data
bd = _mproc_data['bd'+key]
xi = _mproc_data['xi'+key][i1:i2]
yi = _mproc_data['yi'+key][i1:i2]
return bd.do_chunk(k, xi, yi)
if __name__ == "__main__":
xi, yi = np.linspace(1, 100, 100001), np.linspace(1, 100, 100001)
bd = BigData(int(1e7))
bd.do_all_work(xi, yi, 4)
xi
、
yi
、
zi
数组切片的总字节数) )。这些数字以“每秒百万个结果值”为单位,但这对于比较来说并不重要。 “1 个进程”的行是对
do_chunk
的直接调用使用完整的输入数据,没有任何子流程。
#Proc 125K 250K 500K 1000K unlimited
1 0.82
2 4.28 1.96 1.3 1.31
3 2.69 1.06 1.06 1.07
4 2.17 1.27 1.23 1.28
BigData
的几MB内部数据。对象。因此,我们从中学到的是,进行这种速度测试很有用。对于这个程序,2 个进程最快,其次是 4 个,3 个进程最慢。
最佳答案
尽量减少进程间通信。
在 multiprocessing
模块所有(单机)进程间通信通过队列完成。通过队列的对象
腌制。所以尝试通过队列发送更少和/或更小的对象。
self
,BigData
的实例,通过队列。它比较大,随着self
中数据量的增加而变大。成长:In [6]: import pickle
In [14]: len(pickle.dumps(BigData(50)))
Out[14]: 1052187
pool.apply_async( _do_chunk_wrapper, (self, k, xi, yi))
被称为,self
在主进程中pickled,在worker进程中unpickled。该len(pickle.dumps(BigData(N)))
的尺寸长出 N
增加。 After fork(), parent and child are in an equivalent state. It would be stupid to copy the entire memory of the parent to another place in the RAM. That's [where] the copy-on-write principle [comes] in. As long as the child does not change its memory state, it actually accesses the parent's memory. Only upon modification, the corresponding bits and pieces are copied into the memory space of the child.
BigData
的实例。通过队列bd = BigData(n)
, (正如您已经在做的那样)并在工作进程中引用其值(例如 _do_chunk_wrapper
)。它基本上相当于删除 self
从调用 pool.apply_async
:p = pool.apply_async(_do_chunk_wrapper, (k_start, k_end, xi, yi))
bd
作为全局,并对 do_chunk_wrapper
进行必要的随附更改的通话签名。 func
, 至 pool.apply_async
.pool.apply_async
然后通过队列传递参数和返回值的开销成为整个时间的重要部分。相反,如果您减少拨打 pool.apply_async
的电话并给每个 func
在返回结果之前要做的工作越多,那么进程间通信就只占总时间的一小部分。_do_chunk_wrapper
接受 k_start
和 k_end
参数,以便每次调用 pool.apply_async
将计算 k
的许多值的总和在返回结果之前。 import math
import numpy as np
import time
import sys
import multiprocessing as mp
import scipy.interpolate as interpolate
_tm=0
def stopwatch(msg=''):
tm = time.time()
global _tm
if _tm==0: _tm = tm; return
print("%s: %.2f seconds" % (msg, tm-_tm))
_tm = tm
class BigData:
def __init__(self, n):
z = np.random.uniform(size=n*n*n).reshape((n,n,n))
self.ff = []
for i in range(n):
f = interpolate.RectBivariateSpline(
np.arange(n), np.arange(n), z[i], kx=1, ky=1)
self.ff.append(f)
self.n = n
def do_chunk(self, k, xi, yi):
n = self.n
s = np.sum(np.exp(self.ff[k].ev(xi, yi)))
sys.stderr.write(".")
return s
def do_chunk_of_chunks(self, k_start, k_end, xi, yi):
s = sum(np.sum(np.exp(self.ff[k].ev(xi, yi)))
for k in range(k_start, k_end))
sys.stderr.write(".")
return s
def do_multi(self, numproc, xi, yi):
procs = []
pool = mp.Pool(numproc)
stopwatch('\nPool setup')
ks = list(map(int, np.linspace(0, self.n, numproc+1)))
for i in range(len(ks)-1):
k_start, k_end = ks[i:i+2]
p = pool.apply_async(_do_chunk_wrapper, (k_start, k_end, xi, yi))
procs.append(p)
stopwatch('Jobs queued (%d processes)' % numproc)
total = 0.0
for k, p in enumerate(procs):
total += np.sum(p.get(timeout=30)) # timeout allows ctrl-C interrupt
if k == 0: stopwatch("\nFirst get() done")
print(total)
stopwatch('Jobs done')
pool.close()
pool.join()
return total
def do_single(self, xi, yi):
total = 0.0
for k in range(self.n):
total += self.do_chunk(k, xi, yi)
stopwatch('\nAll in single process')
return total
def _do_chunk_wrapper(k_start, k_end, xi, yi):
return bd.do_chunk_of_chunks(k_start, k_end, xi, yi)
if __name__ == "__main__":
stopwatch()
n = 50
bd = BigData(n)
m = 1000*1000
xi, yi = np.random.uniform(0, n, size=m*2).reshape((2,m))
stopwatch('Initialized')
bd.do_multi(2, xi, yi)
bd.do_multi(3, xi, yi)
bd.do_single(xi, yi)
Initialized: 0.15 seconds
Pool setup: 0.06 seconds
Jobs queued (2 processes): 0.00 seconds
First get() done: 6.56 seconds
83963796.0404
Jobs done: 0.55 seconds
..
Pool setup: 0.08 seconds
Jobs queued (3 processes): 0.00 seconds
First get() done: 5.19 seconds
83963796.0404
Jobs done: 1.57 seconds
...
All in single process: 12.13 seconds
Initialized: 0.10 seconds
Pool setup: 0.03 seconds
Jobs queued (2 processes): 0.00 seconds
First get() done: 10.47 seconds
Jobs done: 0.00 seconds
..................................................
Pool setup: 0.12 seconds
Jobs queued (3 processes): 0.00 seconds
First get() done: 9.21 seconds
Jobs done: 0.00 seconds
..................................................
All in single process: 12.12 seconds
关于python - 使用 numpy/scipy 最小化 Python multiprocessing.Pool 中的开销,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/37068981/
我正在尝试使用多处理和队列实现生产者-消费者场景;主进程是生产者,两个子进程使用队列中的数据。这在没有任何异常 发生的情况下有效,但问题是我希望能够在工作人员死亡时重新启动他们(kill -9 wor
我试图在一个管理进程下启动一个数据队列服务器(这样它以后可以变成一个服务),虽然数据队列服务器功能在主进程中工作正常,但它在一个进程中不起作用使用 multiprocessing.Process 创建
我的多处理需求非常简单:我从事机器学习工作,有时我需要评估多个数据集中的一个算法,或者一个数据集中的多个算法,等等。我只需要运行一个带有一些参数的函数并获取一个数字。 我不需要 RPC、共享数据,什么
创建进程池或简单地遍历一个进程以创建更多进程之间有任何区别(以任何方式)吗? 这有什么区别?: pool = multiprocessing.Pool(5) pool.apply_async(work
multiprocessing.BoundedSemaphore(3) 与 multiprocessing.Sempahore(3) 有何不同? 我希望 multiprocessing.Bounded
我尝试通过 multiprocessing 包中的 Queue 对 Pipe 的速度进行基准测试。我认为 Pipe 会更快,因为 Queue 在内部使用 Pipe。 奇怪的是,Pipe 在发送大型 n
我有这样一个简单的任务: def worker(queue): while True: try: _ = queue.get_nowait()
我正在尝试编写一个与 multiprocessing.Pool 同时应用函数的应用程序。我希望这个函数成为一个实例方法(所以我可以在不同的子类中以不同的方式定义它)。这似乎是不可能的;正如我在其他地方
在 python 2 中,multiprocessing.dummy.Pool 和 multiprocessing.pool.ThreadPool 之间有什么区别吗?源代码似乎暗示它们是相同的。 最佳
我正在开发一个用于财务目的的模型。我将整个 S&P500 组件放在一个文件夹中,存储了尽可能多的 .hdf 文件。每个 .hdf 文件都有自己的多索引(年-周-分)。 顺序代码示例(非并行化): im
到目前为止,我是这样做的: rets=set(pool.map_async(my_callback, args.hosts).get(60*4)) 如果超时,我会得到一个异常: File "/usr
参见下面的示例和执行结果: #!/usr/bin/env python3.4 from multiprocessing import Pool import time import os def in
我的任务是监听 UDP 数据报,对其进行解码(数据报具有二进制信息),将解码后的信息放入字典中,将字典转储为 json 字符串,然后将 json 字符串发送到远程服务器(ActiveMQ)。 解码和发
我在 macOS 上工作,最近被 Python 3.8 多处理中“fork”到“spawn”的变化所困扰(参见 doc )。下面显示了一个简化的工作示例,其中使用“fork”成功但使用“spawn”失
multiprocessing.Queue 的文档指出从项目入队到其腌制表示刷新到底层管道之间存在一点延迟。显然,您可以将一个项目直接放入管道中(它没有说明其他情况,并且暗示情况就是如此)。 为什么管
我运行了一些测试代码来检查在 Linux 中使用 Pool 和 Process 的性能。我正在使用 Python 2.7。 multiprocessing.Pool 的源代码似乎显示它正在使用 mul
我在 Windows Standard Embedded 7 上运行 python 3.4.3。我有一个继承 multiprocessing.Process 的类。 在类的 run 方法中,我为进程对
我知道multiprocessing.Process类似于 threading.Thread当我子类 multiprocessing.Process 时要创建一个进程,我发现我不必调用 __init_
我有教科书声明说在多处理器系统中不建议禁用中断,并且会花费太多时间。但我不明白这一点,谁能告诉我多处理器系统禁用中断的过程?谢谢 最佳答案 在 x86(和其他架构,AFAIK)上,启用/禁用中断是基于
我正在执行下面的代码并且它工作正常,但它不会产生不同的进程,而是有时所有都在同一个进程中运行,有时 2 个在一个进程中运行。我正在使用 4 cpu 机器。这段代码有什么问题? def f(values
我是一名优秀的程序员,十分优秀!