gpt4 book ai didi

python - 在 Python 中并行处理大型 .csv 文件

转载 作者:IT老高 更新时间:2023-10-28 21:11:12 26 4
gpt4 key购买 nike

我正在使用 Python 脚本处理大型 CSV 文件(大约数 GB 和 10M 行)。

文件的行长不同,无法完全加载到内存中进行分析。

每一行都由我脚本中的一个函数单独处理。分析一个文件大约需要 20 分钟,看来磁盘访问速度不是问题,而是处理/函数调用问题。

代码看起来像这样(非常简单)。实际代码使用了 Class 结构,但这是类似的:

csvReader = csv.reader(open("file","r")
for row in csvReader:
handleRow(row, dataStructure)

鉴于计算需要共享数据结构,在 Python 中利用多核并行运行分析的最佳方法是什么?

一般来说,如何从 Python 中的 .csv 一次读取多行以传输到线程/进程?在行上使用 for 循环听起来效率不高。

谢谢!

最佳答案

这可能为时已晚,但无论如何我都会为 future 的用户发布。另一个海报提到使用多处理。我可以保证,并且可以更详细地介绍。我们每天使用 Python 处理数百 MB/数 GB 的文件。因此,这绝对取决于任务。我们处理的一些文件不是 CSV,因此解析可能相当复杂,并且比磁盘访问需要更长的时间。但是,无论文件类型如何,方法都是相同的。

您可以同时处理多个大文件。这是我们如何做的伪代码:

import os, multiprocessing as mp

# process file function
def processfile(filename, start=0, stop=0):
if start == 0 and stop == 0:
... process entire file...
else:
with open(file, 'r') as fh:
fh.seek(start)
lines = fh.readlines(stop - start)
... process these lines ...

return results

if __name__ == "__main__":

# get file size and set chuck size
filesize = os.path.getsize(filename)
split_size = 100*1024*1024

# determine if it needs to be split
if filesize > split_size:

# create pool, initialize chunk start location (cursor)
pool = mp.Pool(cpu_count)
cursor = 0
results = []
with open(file, 'r') as fh:

# for every chunk in the file...
for chunk in xrange(filesize // split_size):

# determine where the chunk ends, is it the last one?
if cursor + split_size > filesize:
end = filesize
else:
end = cursor + split_size

# seek to end of chunk and read next line to ensure you
# pass entire lines to the processfile function
fh.seek(end)
fh.readline()

# get current file location
end = fh.tell()

# add chunk to process pool, save reference to get results
proc = pool.apply_async(processfile, args=[filename, cursor, end])
results.append(proc)

# setup next chunk
cursor = end

# close and wait for pool to finish
pool.close()
pool.join()

# iterate through results
for proc in results:
processfile_result = proc.get()

else:
...process normally...

就像我说的,这只是伪代码。它应该让任何需要做类似事情的人开始。我面前没有代码,只是凭内存做的。

但我们在第一次运行时获得了超过 2 倍的速度提升,而无需对其进行微调。您可以根据您的设置微调池中的进程数量以及 block 的大小以获得更高的速度。如果您像我们一样有多个文件,请创建一个池以并行读取多个文件。请注意不要让过多的进程使盒子重载。

注意:您需要将其放在“if main” block 中,以确保不会创建无限进程。

关于python - 在 Python 中并行处理大型 .csv 文件,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/8424771/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com