gpt4 book ai didi

python - 读取和处理大文件时的多线程(内存太大)

转载 作者:太空宇宙 更新时间:2023-11-04 04:20:41 25 4
gpt4 key购买 nike

我有以下运行速度非常慢的代码。这是一个拆分大文件(80 gig)并将其放入树状文件夹结构以便快速查找的程序。我在代码中做了一些注释以帮助您理解它。

# Libraries
import os


# Variables
file="80_gig_file.txt"
outputdirectory="sorted"
depth=4 # This is the tree depth


# Preperations
os.makedirs(outputdirectory)

# Process each line in the file
def pipeline(line):
# Strip symbols from line
line_stripped=''.join(e for e in line if e.isalnum())
# Reverse the line
line_stripped_reversed=line_stripped[::-1]
file=outputdirectory
# Create path location in folderbased tree
for i in range(min((depth),len(line_stripped))):
file=os.path.join(file,line_stripped_reversed[i])
# Create folders if they don't exist
os.makedirs(os.path.dirname(file), exist_ok=True)
# Name the file, with "-file"
file=file+"-file"
# This is the operation that slows everything down.
# It opens, writes and closes a lot of small files.
# I cannot keep them open because currently half a million possibilities (and thus files) are worst case open (n=26^4).
f = open(file, "a")
f.write(line)
f.close()


# Read file line by line and by not loading it entirely in memory
# Here it is possible to work with a queue I think, but how to do it properly without loading too much in memory?
with open(file) as infile:
for line in infile:
pipeline(line)

有没有办法让多线程工作?因为我自己尝试了几个在网上找到的例子,它把所有东西都放在内存中,导致我的电脑多次死机。

最佳答案

首先,(IMO)最简单的解决方案

如果这些行看起来完全独立,只需将您的文件分成 N 个 block ,将文件名作为程序参数传递给 open 并运行当前脚本的多个实例,在多个命令行上手动启动它们。

优点:

  • 无需深入研究多进程、进程间通信等
  • 不需要过多改动代码

缺点:

  • 您需要对大文件进行预处理,将其分成多个 block (尽管这会比您当前的执行时间快得多,因为您不会遇到每行打开-关闭的情况)
  • 您需要自己启动进程,为每个进程传递适当的文件名

这将实现为:

预处理:

APPROX_CHUNK_SIZE = 1e9 #1GB per file, adjust as needed
with open('big_file.txt') as fp:
chunk_id = 0
next_chunk = fp.readlines(APPROX_CHUNK_SIZE)
while next_chunk:
with open('big_file_{}.txt'.format(chunk_id), 'w') as ofp:
ofp.writelines(next_chunk)
chunk_id += 1
next_chunk = fp.readlines(APPROX_CHUNK_SIZE)

来自readlines docs :

If the optional sizehint argument is present, instead of reading up to EOF, whole lines totalling approximately sizehint bytes (possibly after rounding up to an internal buffer size) are read.

这样做并不能确保所有 block 中的行数为偶数,但会使预处理速度更快,因为您是按 block 读取而不是逐行读取。根据需要调整 block 大小。另外,请注意,通过使用 readlines,我们可以确定 block 之间不会有断行,但由于该函数返回行列表,我们使用 writelines 来编写在我们的输出文件中(相当于循环遍历列表和 ofp.write(line))。为了完整起见,请注意您还可以连接内存中的所有字符串并仅调用一次 write (即执行 ofp.write(''.join(next_chunk)) ),这可能会给您带来一些(次要的)性能优势,但要付出(高得多)更高的 RAM 使用率。

主脚本:

您唯一需要的修改是在最顶部:

import sys
file=sys.argv[1]
... # rest of your script here

通过使用 argv,您可以将命令行参数传递给您的程序(在本例中为要打开的文件)。然后,只需按以下方式运行脚本:

python process_the_file.py big_file_0.txt

这将运行一个进程。打开多个终端并使用 big_file_N.txt 为每个终端运行相同的命令,它们将彼此独立。

注意:我使用 argv[1] 因为对于所有程序来说,argv 的第一个值(即 argv[0 ]) 始终是程序名称。


然后,多处理解决方案

虽然有效,但第一个解决方案并不十分优雅,尤其是如果您从一个大小为 80GB 的文件开始,您将拥有 80 个文件。

更简洁的解决方案是使用 python 的 multiprocessing 模块(重要:不是 threading!如果您不知道其中的区别,请查找“global interpreter lock”以及为什么 python 中的多线程无法按照您认为的方式工作)。

想法是让一个“生产者”进程打开大文件并不断地将其中的行放入队列中。然后,从队列中提取行并进行处理的“消费者”进程池。

优点:

  • 一个脚本搞定一切
  • 无需打开多个终端打字

缺点:

  • 复杂性
  • 使用进程间通信,这有一些开销

这将按如下方式实现:

# Libraries
import os
import multiprocessing

outputdirectory="sorted"
depth=4 # This is the tree depth

# Process each line in the file
def pipeline(line):
# Strip symbols from line
line_stripped=''.join(e for e in line if e.isalnum())
# Reverse the line
line_stripped_reversed=line_stripped[::-1]
file=outputdirectory
# Create path location in folderbased tree
for i in range(min((depth),len(line_stripped))):
file=os.path.join(file,line_stripped_reversed[i])
# Create folders if they don't exist
os.makedirs(os.path.dirname(file), exist_ok=True)
# Name the file, with "-file"
file=file+"-file"
# This is the operation that slows everything down.
# It opens, writes and closes a lot of small files.
# I cannot keep them open because currently half a million possibilities (and thus files) are worst case open (n=26^4).
f = open(file, "a")
f.write(line)
f.close()

if __name__ == '__main__':
# Variables
file="80_gig_file.txt"

# Preperations
os.makedirs(outputdirectory)
pool = multiprocessing.Pool() # by default, 1 process per CPU
LINES_PER_PROCESS = 1000 # adapt as needed. Higher is better, but consumes more RAM

with open(file) as infile:
next(pool.imap(pipeline, infile, LINES_PER_PROCESS))
pool.close()
pool.join()

if __name__ == '__main__' 行是将运行在每个进程上的代码与仅运行在“父进程”上的代码分开的障碍。每个进程都定义了 pipeline,但实际上只有父亲产生了一个工作池并应用了该函数。您会找到有关 multiprocessing.map here 的更多详细信息

编辑:

添加了关闭和加入池以防止主进程退出并杀死进程中的子进程。

关于python - 读取和处理大文件时的多线程(内存太大),我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/54532740/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com