gpt4 book ai didi

Python外部合并排序运行缓慢

转载 作者:太空宇宙 更新时间:2023-11-03 20:05:45 25 4
gpt4 key购买 nike

我是一名计算机科学学生(因此是编程新手),我正在尝试在 Python 中实现外部合并排序算法,例如 this one 。要排序的数据是一个大型 CSV 文件,大约有 9,000,000 行,如下所示:

1743-11-01,6.068,1.736,StadtA,Denmark,57.05N,10.33E
1744-06-01,5.787,3.623,StadtB,Belgien,47.05N,10.33E

到目前为止,我有这个函数将 csv 文件分成多个预先排序的 block :

    def splitFiles(self, largeFileName, smallFileSize):
largeFileHandler = csv.reader(open(largeFileName), delimiter=',')
tempBuffer = []
size = 0
for row in largeFileHandler:
tempBuffer.append(row)
size += 1
if (size % smallFileSize == 0):
tempBuffer.sort(key=lambda x: (x[5], -float(x[1]), x[3]))
tempFile = tempfile.NamedTemporaryFile(dir=self.cwd + '/temp', delete=False, mode="wb+")
writer = csv.writer(tempFile)
writer.writerows(tempBuffer)
tempFile.seek(0)
sortedTempFileHandlerList.append(tempFile)
tempBuffer = []

这意味着 block 按第五个元素降序排序。如果该值相等,则按第一个元素递增排序,如果也相等,则按第三个元素排序。

在那之前一切正常。但是将这些 block 合并成一个(最终)大文件需要很长时间。

    def merge(self):
stack_tops = []
sink = csv.writer(open("outputfile", "w+"))
for f in sortedTempFileHandlerList:
stack_tops.append(next(csv.reader(f)))

while stack_tops:
c = min(stack_tops, key=lambda x: (x[5], -float(x[1]), x[3]))
sink.writerow(c)
i = stack_tops.index(c)
try:
t = next(csv.reader(sortedTempFileHandlerList[i]))
stack_tops[i] = t
except StopIteration:
del stack_tops[i]
self.sortedTempFileHandlerList[i].close()
del self.sortedTempFileHandlerList[i]

我的问题是,如何才能使第二部分运行得更快?提前致谢!

最佳答案

如果您有旋转的硬盘驱动器,则问题的原因可能是过多的磁盘寻道。如果您并行读取数十个文件,则磁盘驱动器会放弃,每次读取都会持续到磁盘旋转为止。转速为 6000 rpm 时,完整旋转时间为 0-0.01 秒,平均为 0.005 秒。当您的文件之间有许多兆字节的数据时,每秒读取次数总计为 200 次。

解决方案是一次只合并几个文件。

我过去个人编写此代码的方式是保留一堆对象,其中包括{file: ___, size: ___}。然后你的逻辑如下所示:

while more to read:
read a chunk
write a sorted chunk
add file info to stack
while n < len(stack) and sum of sizes of top n-1 elements exceeds the nth:
pop top n objects off of stack
merge them
push merged element onto stack

merge stack to get answer

您必须使用它来确定要合并多少个。但我成功地使用这种策略来处理具有数十亿行的数据集,并且表现良好。我想我将 n 设置为 4。

(在我的现实生活示例中,情况很复杂,由于磁盘限制,我必须压缩所有中间文件。我还在合并和聚合结果时进行操作。我更愿意在数据库中执行此操作,但数据库没有空间了。那是一个有趣的项目...)

<小时/>

更多细节。

首先,由于这是 Python,我建议您将 splitFiles 制作为迭代器。就像这个伪代码:

 def splitFiles (inputFile):
open inputFile
create in memory data
while more to read:
add to data
if data is full:
write sorted temp file
yield (fileSize, tempFile)
if data:
write sorted temp file
yield (fileSize, tempFile)

那些yield使其成为一个迭代器。现在你的主要逻辑是这样的:

stack = []
for chunk in splitFile(dataFile):
stack.append(chunk)
while 3 < len(stack) and stack[-4][0] < sum((stack[-i][0] for i in [1, 2, 3])):
chunks = []
for _ in range(4):
chunks.append(stack.pop())
stack.append(mergeChunks(chunks))

# Now the final merge
while 1 < len(stack):
chunks = []
for _ in range(4):
if len(stack):
chunks.append(stack.pop())
stack.append(mergeChunks(chunks))
return stack[0][1]

在您提供 8 个 3MB block 的示例中,您的堆栈将像这样进行:

[]
[(3_000_000, tempFile1)]
[(3_000_000, tempFile1), (3_000_000, tempFile2)]
[(3_000_000, tempFile1), (3_000_000, tempFile2), (3_000_000, tempFile3)]
[(3_000_000, tempFile1), (3_000_000, tempFile2), (3_000_000, tempFile3), (3_000_000, tempFile4)]
# First merge!
[(12_000_000_000, tempFile5)]
[(12_000_000_000, tempFile5), (3_000_000, tempFile6)]
[(12_000_000_000, tempFile5), (3_000_000, tempFile6), (3_000_000, tempFile7)]
[(12_000_000_000, tempFile5), (3_000_000, tempFile6), (3_000_000, tempFile7), (3_000_000, tempFile8)]
[(12_000_000_000, tempFile5), (3_000_000, tempFile6), (3_000_000, tempFile7), (3_000_000, tempFile8), (3_000_000, tempFile9)]
# Second merge!
[(12_000_000_000, tempFile5), (12_000_000_000, tempFile10)]
# End of input, final merge
[(24_000_000_000, tempFile11)]

最后一个文件就是你的结果。

关于Python外部合并排序运行缓慢,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/58980146/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com