gpt4 book ai didi

python - 是否有避免内存深拷贝或减少多处理时间的好方法?

转载 作者:太空狗 更新时间:2023-10-30 00:50:38 25 4
gpt4 key购买 nike

我正在使用Python环境的Pandas模块制作基于内存的“大数据”实时计算模块。

所以响应时间是这个模块的质量,非常关键和重要。

为了处理大型数据集,我拆分数据并并行处理子拆分数据。

在存储子数据结果的部分,花费了很多时间(第21行)。

我认为内部内存深拷贝出现或者传递的子数据没有在内存中共享。

如果我用 C 或 C++ 编写模块,我将使用如下指针或引用。

"process=Process(target=addNewDerivedColumn, args=[resultList, &sub_dataframe])"

"process=Process(target=addNewDerivedColumn, args=[resultList, sub_dataframe])

def addNewDerivedColumn(resultList, split_sub_dataframe&):...."

是否有避免内存深拷贝或减少多处理时间的好方法?“不优雅”很好。我准备好让我的代码变脏了。我尝试了 weekref、RawValue、RawArray、Value、Pool,但都失败了。

该模块正在 MacOS 中开发,最终将在 Linux 或 Unix 中运行。

不考虑 Windows 操作系统。

代码来了。

真正的代码在我的办公室,但结构和逻辑与真正的相同。

1 #-*- coding: UTF-8 -*-' 
2 import pandas as pd
3 import numpy as np
4 from multiprocessing import *
5 import time
6
7
8 def addNewDerivedColumn(resultList, split_sub_dataframe):
9
10 split_sub_dataframe['new_column']= np.abs(split_sub_dataframe['column_01']+split_sub_dataframe['column_01']) / 2
11
12 print split_sub_dataframe.head()
13
14 '''
15 i think that the hole result of sub-dataframe is copied to resultList, not reference value
16 and in here time spend much
17 compare elapsed time of comment 21th line with the uncommented one
18 In MS Windows, signifiant difference of elapsed time doesn't show up
19 In Linux or Mac OS, the difference is big
20 '''
21 resultList.append(split_sub_dataframe)
22
23
24
25 if __name__ == "__main__":
26
27 # example data generation
28 # the record count of the real data is over 1 billion with about 10 columns.
29 dataframe = pd.DataFrame(np.random.randn(100000000, 4), columns=['column_01', 'column_02', 'column_03', 'column_04'])
30
31
32 print 'start...'
33 start_time = time.time()
34
35 # to launch 5 process in parallel, I split the dataframe to five sub-dataframes
36 split_dataframe_list = np.array_split(dataframe, 5)
37
38 # multiprocessing
39 manager = Manager()
40
41 # result list
42 resultList=manager.list()
43 processList=[]
44
45 for sub_dataframe in split_dataframe_list:
46 process=Process(target=addNewDerivedColumn, args=[resultList, sub_dataframe])
47 processList.append(process)
48
49 for proc in processList:
50 proc.start()
51 for proc in processList:
52 proc.join()
53
54
55 print 'elapsed time : ', np.round(time.time() - start_time,3)

最佳答案

如果将进程间通信保持在一个最低限度。因此,不是将子数据帧作为参数传递,而是传递指标值。子进程可以对公共(public) DataFrame 本身进行切片。

当一个子进程被派生时,它会得到一份定义在父进程的调用模块。因此,如果大型 DataFrame df生成多处理池之前在全局变量中定义,然后每个生成的子进程将有权访问 df

在 Windows 上,没有 fork() 的地方,一个新的 python 进程被启动并且导入调用模块。因此,在 Windows 上,生成的子进程必须从头开始重新生成 df,这可能需要时间和更多内存。

然而,在 Linux 上,您有写时复制。这意味着生成的subprocess 访问原始全局变量(调用模块的)而不复制它们。只有当子进程尝试修改全局时,Linux 才会然后在修改值之前制作一个单独的副本。

所以如果你避免修改你的全局变量,你可以享受性能提升子流程。我建议仅将子进程用于计算。返回计算值,让主进程整理结果修改原始数据框。

import pandas as pd
import numpy as np
import multiprocessing as mp
import time

def compute(start, end):
sub = df.iloc[start:end]
return start, end, np.abs(sub['column_01']+sub['column_01']) / 2

def collate(retval):
start, end, arr = retval
df.ix[start:end, 'new_column'] = arr

def window(seq, n=2):
"""
Returns a sliding window (of width n) over data from the sequence
s -> (s0,s1,...s[n-1]), (s1,s2,...,sn), ...
"""
for i in range(len(seq)-n+1):
yield tuple(seq[i:i+n])

if __name__ == "__main__":
result = []
# the record count of the real data is over 1 billion with about 10 columns.
N = 10**3
df = pd.DataFrame(np.random.randn(N, 4),
columns=['column_01', 'column_02', 'column_03', 'column_04'])

pool = mp.Pool()
df['new_column'] = np.empty(N, dtype='float')

start_time = time.time()
idx = np.linspace(0, N, 5+1).astype('int')
for start, end in window(idx, 2):
# print(start, end)
pool.apply_async(compute, args=[start, end], callback=collate)

pool.close()
pool.join()
print 'elapsed time : ', np.round(time.time() - start_time,3)
print(df.head())

关于python - 是否有避免内存深拷贝或减少多处理时间的好方法?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/19615560/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com