- android - 多次调用 OnPrimaryClipChangedListener
- android - 无法更新 RecyclerView 中的 TextView 字段
- android.database.CursorIndexOutOfBoundsException : Index 0 requested, 光标大小为 0
- android - 使用 AppCompat 时,我们是否需要明确指定其 UI 组件(Spinner、EditText)颜色
我有一个有点昂贵的函数来转换大量数据。顺序运行会花费很多时间,所以我尝试并行化它,但结果都是错误的。我想使用并行函数更改 numpy 数组的元素。
我已阅读 Python map function, passing by reference/value? ,但该方案不适用于并行版本。
我刚刚接触 Python 大约一个月左右,所以也许我问了一些愚蠢的问题。
这是我正在尝试做的事情的一个简单示例。
import numpy as np
import multiprocessing
globalData = np.array([1, 2, 3, 4, 5, 6, 7, 8])
def add(i):
global globalData
globalData[i] += 1
pool = multiprocessing.Pool(8)
globalData = pool.map(add, range(8))
pool.close()
pool.join()
print("Global data:", globalData)
我预计输出为 [2, 3, 4, 5, 6, 7, 8, 9]
,就像我执行时一样
for i in range(8):
add(i)
但是我得到了
[1, 2, 3, 4, 5, 6, 7, 8]
感谢您的帮助。
编辑:这是我最初的问题,一个不那么简单的工作示例。
sample_size = 100
pca_sample = np.randon.rand(sample_size, sample_size)
def knl(x, y):
#Just as an example
return sin(x+y)
K_matrix = np.zeros((sample_size, sample_size))
for i in range(sample_size):
for j in range(sample_size):
K_matrix[i][j] = knl(pca_sample[i], pca_sample[j])
K_cent_matrix = np.zeros((sample_size, sample_size))
def K_centered(K_cent_matrix, i, j):
term1 = K_matrix[i][j]
term2 = 0.
term3 = 0.
term4 = 0.
for k in range(sample_size):
term2 += K_matrix[k][j]
for k in range(sample_size):
term3 += K_matrix[i][k]
for k1 in range(sample_size):
for k2 in range(sample_size):
term4 += K_matrix[k1][k2]
term1 /= sample_size
term2 /= sample_size
term3 /= (sample_size * sample_size)
K_cent_matrix[i][j] = term1 - term2 - term3 + term4
print(f"K_cent_matrix[{i:d}][{j:d}] = {K_cent_matrix[i][j]:f}")
pool = multiprocessing.Pool(8)
pool.starmap(K_centered, [(K_cent_matrix,i,j) for i, j in zip(range(sample_size), range(sample_size))])
pool.close()
pool.join() ```
最佳答案
问题是 globalData
不在共享内存中。当并行处理该数组时,将为每个进程创建一个副本,而原始数组保持不变。如果您想并行处理同一个数组,则必须处理共享内存,这是可行的,但也不是微不足道的。请参阅here和 here .
根据我自己的经验,我建议您返回结果的副本并“重新创建”结果数组,而不是就地更改它。当然,如果您正在处理大量数据,这可能是不可能的。然而,否则,简单性方面的 yield 将超过效率方面的(小) yield 。应用于您的问题,可能如下所示:
import numpy as np
import multiprocessing
globalData = np.array([1, 2, 3, 4, 5, 6, 7, 8])
def add(i):
return globalData[i] + 1
def exe():
global globalData
with multiprocessing.Pool(8) as pool:
globalData = np.array(list(pool.map(add, range(8))))
print("Global data:", globalData)
exe()
结果是
Global data: [2 3 4 5 6 7 8 9]
根据需要。
如果使用 chunksize
参数,代码运行速度会快得多。这将使进程之间的数据通信更快。
请注意,with
语句可以节省您在执行后将进程连接在一起并停止它们的工作。不过,这不适用于顶级代码,这就是为什么我将其放入方法 exe
中。
我派生了一个帮助器类,以便更轻松地处理共享数组或大型数组,而无需“真正”共享它们。
将我在答案末尾提供的代码保存为工作目录中的“concurrent_futures_ext.py”,您可以将代码编写为
import numpy as np
from concurrent_futures_ext import ProcessPoolExecutor
globalData = np.array([1, 2, 3, 4, 5, 6, 7, 8])
def add(globalData, i):
globalData[i] += 1
def exe():
global globalData
shared_np_arrs = [globalData] # list of global arrays
with ProcessPoolExecutor(8, shared_np_arrs=shared_np_arrs) as pool:
any(pool.map(add, range(8)))
globalData = pool.get_shared_arrays()[0] # retrieving the list of global arrays
print("Global data:", globalData)
exe()
只需一份数据副本即可将数组放入共享内存中。
关于您的非最小工作示例:如果您对代码进行矢量化,即使用 numpy 函数而不是 for 循环,则可以进行巨大的优化。完成所有可能的优化超出了您的问题和我的答案的范围,但将使您的代码速度提高几个数量级(比通过并行化实现的效果要好得多(!))。
代码如下:
from concurrent.futures import ProcessPoolExecutor as conc_ProcessPoolExecutor
from concurrent.futures.process import _ExceptionWithTraceback, _get_chunks, _ResultItem
from functools import partial
import multiprocessing
import itertools
import os
import numpy as np
from multiprocessing import sharedctypes
CPU_COUNT = os.cpu_count()
def get_cpu_chunk_counts(task_length, chunk_number=5, min_chunk_size=1):
cpu_count = max(min(CPU_COUNT,
task_length // (chunk_number*min_chunk_size)), 1)
chunk_size = max(min_chunk_size, task_length // (cpu_count*chunk_number))
return cpu_count, chunk_size
def _process_worker(call_queue, result_queue, const_args=[], shared_arrays=[]):
"""Evaluates calls from call_queue and places the results in result_queue.
This worker is run in a separate process.
Args:
call_queue: A multiprocessing.Queue of _CallItems that will be read and
evaluated by the worker.
result_queue: A multiprocessing.Queue of _ResultItems that will written
to by the worker.
shutdown: A multiprocessing.Event that will be set as a signal to the
worker that it should exit when call_queue is empty.
"""
shared_arrays_np = [np.ctypeslib.as_array(arr).view(dtype).reshape(shape)
for arr, dtype, shape in shared_arrays]
while True:
call_item = call_queue.get(block=True)
if call_item is None:
result_queue.put(os.getpid())
return
try:
r = call_item.fn(*call_item.args, const_args=const_args,
shared_arrays=shared_arrays_np,
**call_item.kwargs)
except BaseException as e:
exc = _ExceptionWithTraceback(e, e.__traceback__)
result_queue.put(_ResultItem(call_item.work_id, exception=exc))
else:
result_queue.put(_ResultItem(call_item.work_id,
result=r))
def _process_chunk(fn, chunk, const_args, shared_arrays):
""" Processes a chunk of an iterable passed to map.
Runs the function passed to map() on a chunk of the
iterable passed to map.
This function is run in a separate process.
"""
return [fn(*const_args, *shared_arrays, *args) for args in chunk]
class ProcessPoolExecutor(conc_ProcessPoolExecutor):
'''
classdocs
'''
def __init__(self, max_workers=None, const_args=[], shared_np_arrs=[]):
'''
Constructor
'''
super().__init__(max_workers)
self._const_args = const_args
shared_arrays_ctype = []
shared_arrays_np = []
# TODO do not create copy of shared array, if it already has a suitable
# data structure
for arr in shared_np_arrs:
dtype = arr.dtype
arrShared = np.empty(arr.size*dtype.itemsize, np.int8)
arrShared = np.ctypeslib.as_ctypes(arrShared)
ctypes_arr = sharedctypes.RawArray(arrShared._type_, arrShared)
shared_arrays_ctype.append((ctypes_arr, arr.dtype, arr.shape))
view = np.ctypeslib.as_array(ctypes_arr).view(arr.dtype).reshape(
arr.shape)
view[:] = arr
shared_arrays_np.append(view)
self._shared_arrays_np = shared_arrays_np
self._shared_arrays = shared_arrays_ctype
def _adjust_process_count(self):
for _ in range(len(self._processes), self._max_workers):
p = multiprocessing.Process(
target=_process_worker,
args=(self._call_queue,
self._result_queue,
self._const_args,
self._shared_arrays))
p.start()
self._processes[p.pid] = p
def map(self, fn, *iterables, timeout=None, chunksize=None,
tasklength=None, chunknumber=5, min_chunksize=1):
"""Returns an iterator equivalent to map(fn, iter).
Args:
fn: A callable that will take as many arguments as there are
passed iterables.
timeout: The maximum number of seconds to wait. If None, then there
is no limit on the wait time.
chunksize: If greater than one, the iterables will be chopped into
chunks of size chunksize and submitted to the process pool.
If set to one, the items in the list will be sent one at a time.
tasklength: length of the iterable. If provided, the cpu count
and the chunksize will be adjusted approprietly, if they are not
explicietely given.
Returns:
An iterator equivalent to: map(func, *iterables) but the calls may
be evaluated out-of-order.
Raises:
TimeoutError: If the entire result iterator could not be generated
before the given timeout.
Exception: If fn(*args) raises for any values.
"""
tmp_max_workers = self._max_workers
if tasklength and tasklength > 0:
cpu_count, chunksize_tmp = get_cpu_chunk_counts(tasklength,
chunknumber,
min_chunksize)
if not chunksize:
chunksize = chunksize_tmp
self._max_workers = cpu_count
if not chunksize:
chunksize = 1
if chunksize < 1:
raise ValueError("chunksize must be >= 1.")
results = super(conc_ProcessPoolExecutor, self).map(partial(_process_chunk, fn),
_get_chunks(*iterables, chunksize=chunksize),
timeout=timeout)
self._max_workers = tmp_max_workers
return itertools.chain.from_iterable(results)
def get_shared_arrays(self):
return self._shared_arrays_np
关于python - 使用 multiprocessing.Pool().map 更改传递给它的变量的值,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/57533533/
Github:https://github.com/jjvang/PassIntentDemo 我一直在关注有关按 Intent 传递对象的教程:https://www.javacodegeeks.c
我有一个 View ,其中包含自动生成的 text 类型的 input 框。当我单击“通过电子邮件发送结果”按钮时,代码会将您带到 CalculatedResults Controller 中的 Em
我有一个基本的docker镜像,我将以此为基础构建自己的镜像。我没有基础镜像的Dockerfile。 基本上,基本镜像使用两个--env arg,一个接受其许可证,一个选择在容器中激活哪个框架。我可以
假设我想计算 2^n 的总和,n 范围从 0 到 100。我可以编写以下内容: seq { 0 .. 100 } |> Seq.sumBy ((**) 2I) 但是,这与 (*) 或其他运算符/函数不
我有这个网址: http://www.example.com/get_url.php?ID=100&Link=http://www.test.com/page.php?l=1&m=7 当我打印 $_G
我想将 window.URL.createObjectURL(file) 创建的地址传递给 dancer.js 但我得到 GET blob:http%3A//localhost/b847c5cd-aa
我想知道如何将 typedef 传递给函数。例如: typedef int box[3][3]; box empty, *board[3][3]; 我如何将 board 传递给函数?我
我正在将一些代码从我的 Controller 移动到核心数据应用程序中的模型。 我编写了一个方法,该方法为我定期发出的特定获取请求返回 NSManagedObjectID。 + (NSManagedO
为什么我不能将类型化数组传递到采用 any[] 的函数/构造函数中? typedArray = new MyType[ ... ]; items = new ko.observableArray(ty
我是一名新的 Web 开发人员,正在学习 html5 和 javascript。 我有一个带有“选项卡”的网页,可以使网页的某些部分消失并重新出现。 链接如下: HOME 和 JavaScript 函
我试图将对函数的引用作为参数传递 很难解释 我会写一些伪代码示例 (calling function) function(hello()); function(pass) { if this =
我在尝试调用我正在创建的 C# 项目中的函数时遇到以下错误: System.Runtime.InteropServices.COMException: Operation is not allowed
使用 ksh。尝试重用当前脚本而不修改它,基本上可以归结为如下内容: `expr 5 $1 $2` 如何将乘法命令 (*) 作为参数 $1 传递? 我首先尝试使用“*”,甚至是\*,但没有用。我尝试
我一直在研究“Play for Java”这本书,这本书非常棒。我对 Java 还是很陌生,但我一直在关注这些示例,我有点卡在第 3 章上了。可以在此处找到代码:Play for Java on Gi
我知道 Javascript 中的对象是通过引用复制/传递的。但是函数呢? 当我跳到一些令人困惑的地方时,我正在尝试这段代码。这是代码片段: x = function() { console.log(
我希望能够像这样传递参数: fn(a>=b) or fn(a!=b) 我在 DjangoORM 和 SQLAlchemy 中看到了这种行为,但我不知道如何实现它。 最佳答案 ORM 使用 specia
在我的 Angular 项目中,我最近将 rxjs 升级到版本 6。现在,来自 npm 的模块(在 node_modules 文件夹内)由于一些破坏性更改而失败(旧的进口不再有效)。我为我的代码调整了
这个问题在这里已经有了答案: The issue of * in Command line argument (6 个答案) 关闭 3 年前。 我正在编写一个关于反向波兰表示法的 C 程序,它通过命
$(document).ready(function() { function GetDeals() { alert($(this).attr("id")); } $('.filter
下面是一个例子: 复制代码 代码如下: use strict; #这里是两个数组 my @i =('1','2','3'); my @j =('a','b','c'); &n
我是一名优秀的程序员,十分优秀!