gpt4 book ai didi

python - 使用 multiprocessing.Pool().map 更改传递给它的变量的值

转载 作者:太空宇宙 更新时间:2023-11-03 20:30:41 25 4
gpt4 key购买 nike

我有一个有点昂贵的函数来转换大量数据。顺序运行会花费很多时间,所以我尝试并行化它,但结果都是错误的。我想使用并行函数更改 numpy 数组的元素。

我已阅读 Python map function, passing by reference/value? ,但该方案不适用于并行版本。

我刚刚接触 Python 大约一个月左右,所以也许我问了一些愚蠢的问题。

这是我正在尝试做的事情的一个简单示例。

import numpy as np 
import multiprocessing

globalData = np.array([1, 2, 3, 4, 5, 6, 7, 8])

def add(i):
global globalData
globalData[i] += 1


pool = multiprocessing.Pool(8)
globalData = pool.map(add, range(8))
pool.close()
pool.join()
print("Global data:", globalData)

我预计输出为 [2, 3, 4, 5, 6, 7, 8, 9],就像我执行时一样

for i in range(8):
add(i)

但是我得到了

[1, 2, 3, 4, 5, 6, 7, 8]

感谢您的帮助。

编辑:这是我最初的问题,一个不那么简单的工作示例。

sample_size = 100

pca_sample = np.randon.rand(sample_size, sample_size)

def knl(x, y):
#Just as an example
return sin(x+y)

K_matrix = np.zeros((sample_size, sample_size))
for i in range(sample_size):
for j in range(sample_size):
K_matrix[i][j] = knl(pca_sample[i], pca_sample[j])

K_cent_matrix = np.zeros((sample_size, sample_size))

def K_centered(K_cent_matrix, i, j):
term1 = K_matrix[i][j]
term2 = 0.
term3 = 0.
term4 = 0.
for k in range(sample_size):
term2 += K_matrix[k][j]
for k in range(sample_size):
term3 += K_matrix[i][k]
for k1 in range(sample_size):
for k2 in range(sample_size):
term4 += K_matrix[k1][k2]
term1 /= sample_size
term2 /= sample_size
term3 /= (sample_size * sample_size)
K_cent_matrix[i][j] = term1 - term2 - term3 + term4
print(f"K_cent_matrix[{i:d}][{j:d}] = {K_cent_matrix[i][j]:f}")

pool = multiprocessing.Pool(8)
pool.starmap(K_centered, [(K_cent_matrix,i,j) for i, j in zip(range(sample_size), range(sample_size))])
pool.close()
pool.join() ```

最佳答案

问题是 globalData 不在共享内存中。当并行处理该数组时,将为每个进程创建一个副本,而原始数组保持不变。如果您想并行处理同一个数组,则必须处理共享内存,这是可行的,但也不是微不足道的。请参阅herehere .

根据我自己的经验,我建议您返回结果的副本并“重新创建”结果数组,而不是就地更改它。当然,如果您正在处理大量数据,这可能是不可能的。然而,否则,简单性方面的 yield 将超过效率方面的(小) yield 。应用于您的问题,可能如下所示:

import numpy as np 
import multiprocessing

globalData = np.array([1, 2, 3, 4, 5, 6, 7, 8])

def add(i):
return globalData[i] + 1

def exe():
global globalData
with multiprocessing.Pool(8) as pool:
globalData = np.array(list(pool.map(add, range(8))))

print("Global data:", globalData)

exe()

结果是

Global data: [2 3 4 5 6 7 8 9]

根据需要。

如果使用 chunksize 参数,代码运行速度会快得多。这将使进程之间的数据通信更快。

请注意,with 语句可以节省您在执行后将进程连接在一起并停止它们的工作。不过,这不适用于顶级代码,这就是为什么我将其放入方法 exe 中。

我派生了一个帮助器类,以便更轻松地处理共享数组或大型数组,而无需“真正”共享它们。

将我在答案末尾提供的代码保存为工作目录中的“concurrent_futures_ext.py”,您可以将代码编写为

import numpy as np 
from concurrent_futures_ext import ProcessPoolExecutor

globalData = np.array([1, 2, 3, 4, 5, 6, 7, 8])

def add(globalData, i):
globalData[i] += 1

def exe():
global globalData
shared_np_arrs = [globalData] # list of global arrays
with ProcessPoolExecutor(8, shared_np_arrs=shared_np_arrs) as pool:
any(pool.map(add, range(8)))
globalData = pool.get_shared_arrays()[0] # retrieving the list of global arrays
print("Global data:", globalData)

exe()

只需一份数据副本即可将数组放入共享内存中。

关于您的非最小工作示例:如果您对代码进行矢量化,即使用 numpy 函数而不是 for 循环,则可以进行巨大的优化。完成所有可能的优化超出了您的问题和我的答案的范围,但将使您的代码速度提高几个数量级(比通过并行化实现的效果要好得多(!))。

代码如下:

from concurrent.futures import ProcessPoolExecutor as conc_ProcessPoolExecutor
from concurrent.futures.process import _ExceptionWithTraceback, _get_chunks, _ResultItem
from functools import partial
import multiprocessing
import itertools
import os
import numpy as np
from multiprocessing import sharedctypes
CPU_COUNT = os.cpu_count()


def get_cpu_chunk_counts(task_length, chunk_number=5, min_chunk_size=1):
cpu_count = max(min(CPU_COUNT,
task_length // (chunk_number*min_chunk_size)), 1)
chunk_size = max(min_chunk_size, task_length // (cpu_count*chunk_number))
return cpu_count, chunk_size

def _process_worker(call_queue, result_queue, const_args=[], shared_arrays=[]):
"""Evaluates calls from call_queue and places the results in result_queue.

This worker is run in a separate process.

Args:
call_queue: A multiprocessing.Queue of _CallItems that will be read and
evaluated by the worker.
result_queue: A multiprocessing.Queue of _ResultItems that will written
to by the worker.
shutdown: A multiprocessing.Event that will be set as a signal to the
worker that it should exit when call_queue is empty.
"""

shared_arrays_np = [np.ctypeslib.as_array(arr).view(dtype).reshape(shape)
for arr, dtype, shape in shared_arrays]


while True:
call_item = call_queue.get(block=True)
if call_item is None:
result_queue.put(os.getpid())
return
try:
r = call_item.fn(*call_item.args, const_args=const_args,
shared_arrays=shared_arrays_np,
**call_item.kwargs)
except BaseException as e:
exc = _ExceptionWithTraceback(e, e.__traceback__)
result_queue.put(_ResultItem(call_item.work_id, exception=exc))
else:
result_queue.put(_ResultItem(call_item.work_id,
result=r))


def _process_chunk(fn, chunk, const_args, shared_arrays):
""" Processes a chunk of an iterable passed to map.

Runs the function passed to map() on a chunk of the
iterable passed to map.

This function is run in a separate process.

"""
return [fn(*const_args, *shared_arrays, *args) for args in chunk]



class ProcessPoolExecutor(conc_ProcessPoolExecutor):
'''
classdocs
'''

def __init__(self, max_workers=None, const_args=[], shared_np_arrs=[]):
'''
Constructor
'''
super().__init__(max_workers)
self._const_args = const_args
shared_arrays_ctype = []
shared_arrays_np = []

# TODO do not create copy of shared array, if it already has a suitable
# data structure
for arr in shared_np_arrs:
dtype = arr.dtype
arrShared = np.empty(arr.size*dtype.itemsize, np.int8)
arrShared = np.ctypeslib.as_ctypes(arrShared)
ctypes_arr = sharedctypes.RawArray(arrShared._type_, arrShared)
shared_arrays_ctype.append((ctypes_arr, arr.dtype, arr.shape))
view = np.ctypeslib.as_array(ctypes_arr).view(arr.dtype).reshape(
arr.shape)
view[:] = arr
shared_arrays_np.append(view)
self._shared_arrays_np = shared_arrays_np
self._shared_arrays = shared_arrays_ctype

def _adjust_process_count(self):
for _ in range(len(self._processes), self._max_workers):
p = multiprocessing.Process(
target=_process_worker,
args=(self._call_queue,
self._result_queue,
self._const_args,
self._shared_arrays))
p.start()
self._processes[p.pid] = p

def map(self, fn, *iterables, timeout=None, chunksize=None,
tasklength=None, chunknumber=5, min_chunksize=1):
"""Returns an iterator equivalent to map(fn, iter).

Args:
fn: A callable that will take as many arguments as there are
passed iterables.
timeout: The maximum number of seconds to wait. If None, then there
is no limit on the wait time.
chunksize: If greater than one, the iterables will be chopped into
chunks of size chunksize and submitted to the process pool.
If set to one, the items in the list will be sent one at a time.
tasklength: length of the iterable. If provided, the cpu count
and the chunksize will be adjusted approprietly, if they are not
explicietely given.
Returns:
An iterator equivalent to: map(func, *iterables) but the calls may
be evaluated out-of-order.

Raises:
TimeoutError: If the entire result iterator could not be generated
before the given timeout.
Exception: If fn(*args) raises for any values.
"""
tmp_max_workers = self._max_workers
if tasklength and tasklength > 0:
cpu_count, chunksize_tmp = get_cpu_chunk_counts(tasklength,
chunknumber,
min_chunksize)
if not chunksize:
chunksize = chunksize_tmp
self._max_workers = cpu_count

if not chunksize:
chunksize = 1

if chunksize < 1:
raise ValueError("chunksize must be >= 1.")

results = super(conc_ProcessPoolExecutor, self).map(partial(_process_chunk, fn),
_get_chunks(*iterables, chunksize=chunksize),
timeout=timeout)

self._max_workers = tmp_max_workers

return itertools.chain.from_iterable(results)


def get_shared_arrays(self):
return self._shared_arrays_np

关于python - 使用 multiprocessing.Pool().map 更改传递给它的变量的值,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/57533533/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com