gpt4 book ai didi

python - 如何正确实现apply_async进行数据处理?

转载 作者:太空宇宙 更新时间:2023-11-03 21:16:48 24 4
gpt4 key购买 nike

我刚开始使用并行处理进行数据分析。我有一个相当大的数组,我想对所述数组的每个索引应用一个函数。

这是我到目前为止的代码:

import numpy as np
import statsmodels.api as sm
from statsmodels.regression.quantile_regression import QuantReg
import multiprocessing
from functools import partial

def fit_model(data,q):
#data is a 1-D array holding precipitation values
years = np.arange(1895,2018,1)
res = QuantReg(exog=sm.add_constant(years),endog=data).fit(q=q)
pointEstimate = res.params[1] #output slope of quantile q
return pointEstimate

#precipAll is an array of shape (1405*621,123,12) (longitudes*latitudes,years,months)
#find all indices where there is data
nonNaN = np.where(~np.isnan(precipAll[:,0,0]))[0] #481631 indices
month = 4

#holder array for results
asyncResults = np.zeros((precipAll.shape[0])) * np.nan
def saveResult(result,pos):
asyncResults[pos] = result

if __name__ == '__main__':
pool = multiprocessing.Pool(processes=20) #my server has 24 CPUs
for i in nonNaN:
#use partial so I can also pass the index i so the result is
#stored in the expected position

new_callback_function = partial(saveResult, pos=i)
pool.apply_async(fit_model, args=(precipAll[i,:,month],0.9),callback=new_callback_function)

pool.close()
pool.join()

当我运行这个程序时,我在花费了比我根本没有使用多处理的时间更长的时间后停止了它。函数 fit_model 的时间约为 0.02 秒,那么与 apply_async 相关的突出是否会导致速度减慢?当我在处理完成后将这些数据绘制到 map 上时,我需要保持结果的顺序。非常感谢任何关于我需要改进的地方的想法!

最佳答案

如果您需要使用多处理模块,您可能希望将更多行一起批处理到您提供给工作池的每个任务中。但是,对于您正在做的事情,我建议尝试 Ray由于其 efficient handling of large numerical data .

import numpy as np
import statsmodels.api as sm
from statsmodels.regression.quantile_regression import QuantReg
import ray

@ray.remote
def fit_model(precip_all, i, month, q):
data = precip_all[i,:,month]
years = np.arange(1895, 2018, 1)
res = QuantReg(exog=sm.add_constant(years), endog=data).fit(q=q)
pointEstimate = res.params[1]
return pointEstimate

if __name__ == '__main__':
ray.init()

# Create an array and place it in shared memory so that the workers can
# access it (in a read-only fashion) without creating copies.
precip_all = np.zeros((100, 123, 12))
precip_all_id = ray.put(precip_all)

result_ids = []
for i in range(precip_all.shape[0]):
result_ids.append(fit_model.remote(precip_all_id, i, 4, 0.9))

results = np.array(ray.get(result_ids))

一些注释

上面的示例是开箱即用的,但请注意,我稍微简化了逻辑。特别是,我删除了对 NaN 的处理。

在我的具有 4 个物理核心的笔记本电脑上,这大约需要 4 秒。如果你使用 20 个核心并将数据放大 9000 倍,我预计需要大约 7200 秒,这是一个相当长的时间。加快速度的一种可能方法是使用更多机器或在每次调用 fit_model 时处理多行,以分摊一些开销。

上面的示例实际上将整个 precip_all 矩阵传递到每个任务中。这很好,因为每个 fit_model 任务只能读取共享内存中存储的矩阵副本,因此不需要创建自己的本地副本。对 ray.put(precip_all) 的调用预先将数组放置在共享内存中。

关于differences between Ray and Python multiprocessing 。请注意,我正在帮助开发 Ray。

关于python - 如何正确实现apply_async进行数据处理?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/54634487/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com