gpt4 book ai didi

python - 使用 numpy/scipy 最小化 Python multiprocessing.Pool 中的开销

转载 作者:太空狗 更新时间:2023-10-29 21:30:56 25 4
gpt4 key购买 nike

我花了几个小时在不同的尝试上并行化我的数字运算代码,但是当我这样做时它只会变得更慢。不幸的是,当我尝试将其简化为下面的示例时,问题就消失了,而且我真的不想在这里发布整个程序。所以问题是:在这种类型的程序中我应该避免哪些陷阱?

(注:Unutbu 回答后的跟进在底部。)

以下是情况:

  • 这是关于定义类的模块 BigData有很多内部数据。在这个例子中有一个列表 ff插值函数;在实际程序中,还有更多,例如ffA[k] , ffB[k] , ffC[k] .
  • 计算将被归类为“令人尴尬的并行”:一次可以在较小的数据块上完成工作。在示例中,这是 do_chunk() .
  • 在我的实际程序中,示例中显示的方法会导致最差的性能:每个块大约 1 秒(在单个线程中完成时实际计算时间为 0.1 秒左右)。因此,对于 n=50,do_single()将在 5 秒内运行并且 do_multi()将在 55 秒内运行。
  • 我还尝试通过切片 xi 来拆分工作。和 yi数组成连续块并遍历所有 k每个块中的值。那效果好一点。现在,无论我使用 1、2、3 还是 4 个线程,总执行时间都没有区别。但是,当然,我希望看到实际的加速!
  • 这可能与:Multiprocessing.Pool makes Numpy matrix multiplication slower有关.然而,在程序的其他地方,我使用了一个多处理池来进行更加孤立的计算:一个函数(不绑定(bind)到类)看起来像 def do_chunk(array1, array2, array3)并对该数组进行仅 numpy 的计算。在那里,有一个显着的速度提升。
  • CPU 使用率与预期的并行进程数成比例(三个线程为 300% CPU 使用率)。

  • #!/usr/bin/python2.7

    import numpy as np, time, sys
    from multiprocessing import Pool
    from scipy.interpolate import RectBivariateSpline

    _tm=0
    def stopwatch(msg=''):
    tm = time.time()
    global _tm
    if _tm==0: _tm = tm; return
    print("%s: %.2f seconds" % (msg, tm-_tm))
    _tm = tm

    class BigData:
    def __init__(self, n):
    z = np.random.uniform(size=n*n*n).reshape((n,n,n))
    self.ff = []
    for i in range(n):
    f = RectBivariateSpline(np.arange(n), np.arange(n), z[i], kx=1, ky=1)
    self.ff.append(f)
    self.n = n

    def do_chunk(self, k, xi, yi):
    s = np.sum(np.exp(self.ff[k].ev(xi, yi)))
    sys.stderr.write(".")
    return s

    def do_multi(self, numproc, xi, yi):
    procs = []
    pool = Pool(numproc)
    stopwatch('Pool setup')
    for k in range(self.n):
    p = pool.apply_async( _do_chunk_wrapper, (self, k, xi, yi))
    procs.append(p)
    stopwatch('Jobs queued (%d processes)' % numproc)
    sum = 0.0
    for k in range(self.n):
    # Edit/bugfix: replaced p.get by procs[k].get
    sum += np.sum(procs[k].get(timeout=30)) # timeout allows ctrl-C interrupt
    if k == 0: stopwatch("\nFirst get() done")
    stopwatch('Jobs done')
    pool.close()
    pool.join()
    return sum

    def do_single(self, xi, yi):
    sum = 0.0
    for k in range(self.n):
    sum += self.do_chunk(k, xi, yi)
    stopwatch('\nAll in single process')
    return sum

    def _do_chunk_wrapper(bd, k, xi, yi): # must be outside class for apply_async to chunk
    return bd.do_chunk(k, xi, yi)

    if __name__ == "__main__":
    stopwatch()
    n = 50
    bd = BigData(n)
    m = 1000*1000
    xi, yi = np.random.uniform(0, n, size=m*2).reshape((2,m))
    stopwatch('Initialized')
    bd.do_multi(2, xi, yi)
    bd.do_multi(3, xi, yi)
    bd.do_single(xi, yi)

    输出:
    Initialized: 0.06 seconds
    Pool setup: 0.01 seconds
    Jobs queued (2 processes): 0.03 seconds
    ..
    First get() done: 0.34 seconds
    ................................................Jobs done: 7.89 seconds
    Pool setup: 0.05 seconds
    Jobs queued (3 processes): 0.03 seconds
    ..
    First get() done: 0.50 seconds
    ................................................Jobs done: 6.19 seconds
    ..................................................
    All in single process: 11.41 seconds

    时序在具有 2 个内核、4 个线程、运行 64 位 Linux 的 Intel Core i3-3227 CPU 上。对于实际程序,多进程版本(池机制,即使只使用一个内核)比单进程版本慢 10 倍。

    跟进

    Unutbu 的回答让我走上了正确的道路。在实际程序中, self被腌制成一个 37 到 140 MB 的对象,需要传递给工作进程。更糟糕的是,Python 酸洗非常慢;酸洗本身需要几秒钟,这发生在传递给工作进程的每一块工作中。除了酸洗和传递大数据对象之外, apply_async的开销在 Linux 中非常小;对于一个小函数(添加几个整数参数),每 apply_async 只需要 0.2 毫秒/ get对。因此,将工作分成非常小的块本身并不是问题。因此,我将所有大数组参数作为索引传输到全局变量。出于 CPU 缓存优化的目的,我保持块大小较小。

    全局变量存储在一个全局 dict 中。 ;设置工作池后,条目会立即在父进程中删除。只有 dict 的 key 被传送到工作进程。酸洗/IPC 的唯一大数据是 worker 创建的新数据。
    #!/usr/bin/python2.7

    import numpy as np, sys
    from multiprocessing import Pool

    _mproc_data = {} # global storage for objects during multiprocessing.

    class BigData:
    def __init__(self, size):
    self.blah = np.random.uniform(0, 1, size=size)

    def do_chunk(self, k, xi, yi):
    # do the work and return an array of the same shape as xi, yi
    zi = k*np.ones_like(xi)
    return zi

    def do_all_work(self, xi, yi, num_proc):
    global _mproc_data
    mp_key = str(id(self))
    _mproc_data['bd'+mp_key] = self # BigData
    _mproc_data['xi'+mp_key] = xi
    _mproc_data['yi'+mp_key] = yi
    pool = Pool(processes=num_proc)
    # processes have now inherited the global variabele; clean up in the parent process
    for v in ['bd', 'xi', 'yi']:
    del _mproc_data[v+mp_key]

    # setup indices for the worker processes (placeholder)
    n_chunks = 45
    n = len(xi)
    chunk_len = n//n_chunks
    i1list = np.arange(0,n,chunk_len)
    i2list = i1list + chunk_len
    i2list[-1] = n
    klist = range(n_chunks) # placeholder

    procs = []
    for i in range(n_chunks):
    p = pool.apply_async( _do_chunk_wrapper, (mp_key, i1list[i], i2list[i], klist[i]) )
    sys.stderr.write(".")
    procs.append(p)
    sys.stderr.write("\n")

    # allocate space for combined results
    zi = np.zeros_like(xi)

    # get data from workers and finish
    for i, p in enumerate(procs):
    zi[i1list[i]:i2list[i]] = p.get(timeout=30) # timeout allows ctrl-C handling

    pool.close()
    pool.join()

    return zi

    def _do_chunk_wrapper(key, i1, i2, k):
    """All arguments are small objects."""
    global _mproc_data
    bd = _mproc_data['bd'+key]
    xi = _mproc_data['xi'+key][i1:i2]
    yi = _mproc_data['yi'+key][i1:i2]
    return bd.do_chunk(k, xi, yi)


    if __name__ == "__main__":
    xi, yi = np.linspace(1, 100, 100001), np.linspace(1, 100, 100001)
    bd = BigData(int(1e7))
    bd.do_all_work(xi, yi, 4)

    这是速度测试的结果(同样,2 个内核,4 个线程),改变工作进程的数量和块中的内存量( xiyizi 数组切片的总字节数) )。这些数字以“每秒百万个结果值”为单位,但这对于比较来说并不重要。 “1 个进程”的行是对 do_chunk 的直接调用使用完整的输入数据,没有任何子流程。

    #Proc   125K    250K    500K   1000K   unlimited
    1 0.82
    2 4.28 1.96 1.3 1.31
    3 2.69 1.06 1.06 1.07
    4 2.17 1.27 1.23 1.28

    内存中数据大小的影响非常显着。 CPU 具有 3 MB 共享 L3 缓存,外加每个内核 256 KB L2 缓存。注意计算还需要访问 BigData的几MB内部数据。对象。因此,我们从中学到的是,进行这种速度测试很有用。对于这个程序,2 个进程最快,其次是 4 个,3 个进程最慢。

    最佳答案

    尽量减少进程间通信。
    multiprocessing模块所有(单机)进程间通信通过队列完成。通过队列的对象
    腌制。所以尝试通过队列发送更少和/或更小的对象。

  • 不送selfBigData的实例,通过队列。它比较大,随着self中数据量的增加而变大。成长:
    In [6]: import pickle
    In [14]: len(pickle.dumps(BigData(50)))
    Out[14]: 1052187


    时间 pool.apply_async( _do_chunk_wrapper, (self, k, xi, yi))被称为,self在主进程中pickled,在worker进程中unpickled。该len(pickle.dumps(BigData(N)))的尺寸长出 N增加。
  • 让数据从全局变量中读取。在 Linux 上,您可以利用 Copy-on-Write。如 Jan-Philip Gehrcke explains :

    After fork(), parent and child are in an equivalent state. It would be stupid to copy the entire memory of the parent to another place in the RAM. That's [where] the copy-on-write principle [comes] in. As long as the child does not change its memory state, it actually accesses the parent's memory. Only upon modification, the corresponding bits and pieces are copied into the memory space of the child.



    因此,您可以避免传递 BigData 的实例。通过队列
    通过简单地将实例定义为全局,bd = BigData(n) , (正如您已经在做的那样)并在工作进程中引用其值(例如 _do_chunk_wrapper )。它基本上相当于删除 self从调用 pool.apply_async :
    p = pool.apply_async(_do_chunk_wrapper, (k_start, k_end, xi, yi))

    并访问 bd作为全局,并对 do_chunk_wrapper 进行必要的随附更改的通话签名。
  • 尝试传递运行时间更长的函数,func , 至 pool.apply_async .
    如果您有许多快速完成的调用 pool.apply_async然后通过队列传递参数和返回值的开销成为整个时间的重要部分。相反,如果您减少拨打 pool.apply_async 的电话并给每个 func在返回结果之前要做的工作越多,那么进程间通信就只占总时间的一小部分。

    下面,我修改了_do_chunk_wrapper接受 k_startk_end参数,以便每次调用 pool.apply_async将计算 k 的许多值的总和在返回结果之前。

  • import math
    import numpy as np
    import time
    import sys
    import multiprocessing as mp
    import scipy.interpolate as interpolate

    _tm=0
    def stopwatch(msg=''):
    tm = time.time()
    global _tm
    if _tm==0: _tm = tm; return
    print("%s: %.2f seconds" % (msg, tm-_tm))
    _tm = tm

    class BigData:
    def __init__(self, n):
    z = np.random.uniform(size=n*n*n).reshape((n,n,n))
    self.ff = []
    for i in range(n):
    f = interpolate.RectBivariateSpline(
    np.arange(n), np.arange(n), z[i], kx=1, ky=1)
    self.ff.append(f)
    self.n = n

    def do_chunk(self, k, xi, yi):
    n = self.n
    s = np.sum(np.exp(self.ff[k].ev(xi, yi)))
    sys.stderr.write(".")
    return s

    def do_chunk_of_chunks(self, k_start, k_end, xi, yi):
    s = sum(np.sum(np.exp(self.ff[k].ev(xi, yi)))
    for k in range(k_start, k_end))
    sys.stderr.write(".")
    return s

    def do_multi(self, numproc, xi, yi):
    procs = []
    pool = mp.Pool(numproc)
    stopwatch('\nPool setup')
    ks = list(map(int, np.linspace(0, self.n, numproc+1)))
    for i in range(len(ks)-1):
    k_start, k_end = ks[i:i+2]
    p = pool.apply_async(_do_chunk_wrapper, (k_start, k_end, xi, yi))
    procs.append(p)
    stopwatch('Jobs queued (%d processes)' % numproc)
    total = 0.0
    for k, p in enumerate(procs):
    total += np.sum(p.get(timeout=30)) # timeout allows ctrl-C interrupt
    if k == 0: stopwatch("\nFirst get() done")
    print(total)
    stopwatch('Jobs done')
    pool.close()
    pool.join()
    return total

    def do_single(self, xi, yi):
    total = 0.0
    for k in range(self.n):
    total += self.do_chunk(k, xi, yi)
    stopwatch('\nAll in single process')
    return total

    def _do_chunk_wrapper(k_start, k_end, xi, yi):
    return bd.do_chunk_of_chunks(k_start, k_end, xi, yi)

    if __name__ == "__main__":
    stopwatch()
    n = 50
    bd = BigData(n)
    m = 1000*1000
    xi, yi = np.random.uniform(0, n, size=m*2).reshape((2,m))
    stopwatch('Initialized')
    bd.do_multi(2, xi, yi)
    bd.do_multi(3, xi, yi)
    bd.do_single(xi, yi)

    产量
    Initialized: 0.15 seconds

    Pool setup: 0.06 seconds
    Jobs queued (2 processes): 0.00 seconds

    First get() done: 6.56 seconds
    83963796.0404
    Jobs done: 0.55 seconds
    ..
    Pool setup: 0.08 seconds
    Jobs queued (3 processes): 0.00 seconds

    First get() done: 5.19 seconds
    83963796.0404
    Jobs done: 1.57 seconds
    ...
    All in single process: 12.13 seconds

    与原始代码相比:
    Initialized: 0.10 seconds
    Pool setup: 0.03 seconds
    Jobs queued (2 processes): 0.00 seconds

    First get() done: 10.47 seconds
    Jobs done: 0.00 seconds
    ..................................................
    Pool setup: 0.12 seconds
    Jobs queued (3 processes): 0.00 seconds

    First get() done: 9.21 seconds
    Jobs done: 0.00 seconds
    ..................................................
    All in single process: 12.12 seconds

    关于python - 使用 numpy/scipy 最小化 Python multiprocessing.Pool 中的开销,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/37068981/

    25 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com