gpt4 book ai didi

Python 在并行处理大文件时不释放 RAM

转载 作者:太空狗 更新时间:2023-10-30 02:57:04 24 4
gpt4 key购买 nike

我有一个 25Gb 的纯文本文件,大约有 1000 万行,每行几百个单词。每行都需要单独处理,我试图将 block 拆分为十几个工作人员以并行处理。目前一次加载一百万行(由于某种原因,这占用了 ~10Gb 内存,即使它在磁盘上只有 ~3Gb 未压缩,)将它平均分成 12 种方式,然后使用 multiprocessing.Pool 将它映射到 12 个工作人员。

问题是,当我的 12 个工作人员中的每一个完成处理分配给他们的数据时,他们的 RAM 没有被释放,只会在下一个百万行迭代中再增加 ~10Gb。

我试过“删除”以前的数据,将以前的数据重置为空分配,删除后使用 eval()、gc.collect() 创建可迭代变量名,并将 IO 完全分离到它自己的功能,都没有运气和完全相同的问题。运行调试显示 python 解释器仅识别预期的数据,并且无法访问上一次迭代的数据,那么为什么 RAM 实际上没有被释放?

下面的代码是我尝试分离所有环境的最新迭代,不是最有效的,但“BigFileOnDisk”在 SSD 上,因此与实际处理数据相比,每次重新读取文件可以忽略不计。以前在分配函数中有“读取”功能,在工作人员完成后删除所有数据,结果相同。

def allocation():
fileCompleted = False
currentLine = 0
while not fileCompleted:
lineData, currentLine, fileCompleted = read(numLines=1000000, startLine=currentLine)
list_of_values(function_object=worker, inputs=lineData, workers=12)


def read(numLines, startLine=0):
currentLine = 0
lines = []
with open(BigFileOnDisk, 'r') as fid:
for line in fid:
if currentLine >= startLine:
lines.append(line)
if currentLine - startLine >= numLines:
return lines, counter, False
currentLine += 1
# or if we've hit the end of the file
return lines, counter, True


def worker(lines):
outputPath = *root* + str(datetime.datetime.now().time())
processedData = {}

for line in lines:
# process data

del lines
with open(outputPath, 'a') as fid:
for item in processedData:
fid.write(str(item) + ', ' + str(processedData[item]) + '\n')


def list_of_values(function_object, inputs, workers = 10):
inputs_split = []
subsection_start = 0
for n in range(workers):
start = int(subsection_start)
end = int(subsection_start + len(inputs) / workers)
subsection_start = end

inputs_split.append( inputs[start:end] )

p = Pool(workers)
p.map(function_object, inputs_split)

最佳答案

您没有加入子进程。在 list_of_values 完成后,由 Pool 创建的进程仍然存在(有点像僵尸,但父进程仍然存在)。他们仍然持有他们所有的值(value)观。你在 main 中看不到他们的数据,因为它在另一个进程中(出于同样的原因 gc.collect 不工作)。

要释放工作人员分配的内存,您需要手动加入 Pool 或使用 with

def list_of_values(function_object, inputs, workers = 10):
inputs_split = []
subsection_start = 0
for n in range(workers):
start = int(subsection_start)
end = int(subsection_start + len(inputs) / workers)
subsection_start = end

inputs_split.append( inputs[start:end] )

with Pool(workers) as p:
p.map(function_object, inputs_split)

关于Python 在并行处理大文件时不释放 RAM,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/38470123/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com