gpt4 book ai didi

python - 分块处理大文件 : inconsistent seek with readline

转载 作者:太空狗 更新时间:2023-10-29 21:48:06 25 4
gpt4 key购买 nike

我正在尝试使用 Python 以 block 的形式读取和处理一个大文件。我正在关注 this blog它提出了一种非常快速的方法来读取和处理分布在多个进程中的大块数据。我只是稍微更新了现有代码,即使用 stat(fin).st_size 而不是 os.path.getsize。在这个例子中,我也没有实现多处理,因为这个问题也体现在单个进程中。这使得调试更容易。

我在使用这段代码时遇到的问题是,它会返回断句。这是有道理的:指针不考虑行尾,只返回一些给定的字节大小。实际上,人们会假设您可以通过省略获取的一批行中的最后一项来解决这个问题,因为那很可能是虚线。不幸的是,这也不能可靠地工作。

from os import stat


def chunkify(pfin, buf_size=1024):
file_end = stat(pfin).st_size
with open(pfin, 'rb') as f:
chunk_end = f.tell()

while True:
chunk_start = chunk_end
f.seek(buf_size, 1)
f.readline()
chunk_end = f.tell()
yield chunk_start, chunk_end - chunk_start

if chunk_end > file_end:
break


def process_batch(pfin, chunk_start, chunk_size):
with open(pfin, 'r', encoding='utf-8') as f:
f.seek(chunk_start)
batch = f.read(chunk_size).splitlines()

# changing this to batch[:-1] will result in 26 lines total
return batch


if __name__ == '__main__':
fin = r'data/tiny.txt'
lines_n = 0
for start, size in chunkify(fin):
lines = process_batch(fin, start, size)
# Uncomment to see broken lines
# for line in lines:
# print(line)
# print('\n')
lines_n += len(lines)

print(lines_n)
# 29

上面的代码将打印 29 作为已处理行的总数。当您不返回批处理的最后一项时,天真地假设那是一条虚线,您将得到 26。实际行数为27行,测试数据如下。

She returned bearing mixed lessons from a society where the tools of democracy still worked.
If you think you can sense a "but" approaching, you are right.
Elsewhere, Germany take on Brazil and Argentina face Spain, possibly without Lionel Messi.
What sort of things do YOU remember best?'
Less than three weeks after taking over from Lotz at Wolfsburg.
The buildings include the Dr. John Micallef Memorial Library.
For women who do not have the genes, the risk drops to just 2% for ovarian cancer and 12% for breast cancer.
In one interview he claimed it was from the name of the Cornish language ("Kernewek").
8 Goldschmidt was out of office between 16 and 19 July 1970.
Last year a new law allowed police to shut any bar based on security concerns.
But, Frum explains: "Glenn Beck takes it into his head that this guy is bad news."
Carrying on the Romantic tradition of landscape painting.
This area has miles of undeveloped beach adjacent to the headlands.
The EAC was created in 2002 to help avoid a repeat of the disputed 2000 presidential election.
In May 1945, remnants of the German Army continue fight on in the Harz mountains, nicknamed "The Void" by American troops.
Dietler also said Abu El Haj was being opposed because she is of Palestinian descent.
The auction highlights AstraZeneca's current focus on boosting returns to shareholders as it heads into a wave of patent expiries on some of its biggest selling medicines including Nexium, for heartburn and stomach ulcers, and Seroquel for schizophrenia and bipolar disorder.
GAAP operating profit was $13.2 million and $7.1 million in the second quarter of 2008 and 2007, respectively.
Doc, Ira, and Rene are sent home as part of the seventh bond tour.
only I am sick of always hearing him called the Just.
Also there is Meghna River in the west of Brahmanbaria.
The explosives were the equivalent of more than three kilograms of dynamite - equal to 30 grenades," explained security advisor Markiyan Lubkivsky to reporters gathered for a news conference in Kyiv.
Her mother first took her daughter swimming at the age of three to help her with her cerebal palsy.
A U.S. aircraft carrier, the USS "Ticonderoga", was also stationed nearby.
Louis shocked fans when he unexpectedly confirmed he was expecting a child in summer 2015.
99, pp.
Sep 19: Eibar (h) WON 6-1

如果打印出创建的行,您会发现确实出现了断句。我觉得这很奇怪。 难道 f.readline() 不应该确保读取文件直到下一行吗? 在下面的输出中,空行将两个批处理分开。这意味着您不能检查一行与批处理中的下一行,如果它是子字符串则将其删除 - 断句属于另一个批处理而不是完整句子。

...
This area has miles of undeveloped beach adjacent to the headlands.
The EAC was created in 2002 to help avoid a repeat of the disputed 2000 presidential election.
In May 1945, r


In May 1945, remnants of the German Army continue fight on in the Harz mountains, nicknamed "The Void" by American troops.
...

有没有办法去掉这些断句,而不用去掉太多?

您可以下载更大的测试文件(100,000 行)here .


经过大量挖掘,似乎实际上是一些无法访问的缓冲区导致了搜索的不一致行为,如所讨论的herehere .我尝试了建议的解决方案,将 iter(f.readline, '')seek 一起使用,但这仍然给我不一致的结果。我已经更新我的代码以在每批 1500 行之后返回文件指针,但实际上返回的批处理会重叠。

from os import stat
from functools import partial


def chunkify(pfin, max_lines=1500):
file_end = stat(pfin).st_size
with open(pfin, 'r', encoding='utf-8') as f:
chunk_end = f.tell()

for idx, l in enumerate(iter(f.readline, '')):
if idx % max_lines == 0:
chunk_start = chunk_end
chunk_end = f.tell()
# yield start position, size, and is_last
yield chunk_start, chunk_end - chunk_start

chunk_start = chunk_end
yield chunk_start, file_end


def process_batch(pfin, chunk_start, chunk_size):
with open(pfin, 'r', encoding='utf-8') as f:
f.seek(chunk_start)
chunk = f.read(chunk_size).splitlines()

batch = list(filter(None, chunk))

return batch


if __name__ == '__main__':
fin = r'data/100000-ep+gutenberg+news+wiki.txt'

process_func = partial(process_batch, fin)
lines_n = 0

prev_last = ''
for start, size in chunkify(fin):
lines = process_func(start, size)

if not lines:
continue

# print first and last ten sentences of batch
for line in lines[:10]:
print(line)
print('...')
for line in lines[-10:]:
print(line)
print('\n')

lines_n += len(lines)

print(lines_n)

重叠批处理的示例如下。最后一批的前两句半是从前一批的最后一句复制而来的。我不知道如何解释或解决这个问题。

...
The EC ordered the SFA to conduct probes by June 30 and to have them confirmed by a certifying authority or it would deduct a part of the funding or the entire sum from upcoming EU subsidy payments.
Dinner for two, with wine, 250 lari.
It lies a few kilometres north of the slightly higher Weissmies and also close to the slightly lower Fletschhorn on the north.
For the rest we reached agreement and it was never by chance.
Chicago Blackhawks defeat Columbus Blue Jackets for 50th win
The only drawback in a personality that large is that no one els


For the rest we reached agreement and it was never by chance.
Chicago Blackhawks defeat Columbus Blue Jackets for 50th win
The only drawback in a personality that large is that no one else, whatever their insights or artistic pedigree, is quite as interesting.
Sajid Nadiadwala's reboot version of his cult classic "Judwaa", once again directed by David Dhawan titled "Judwaa 2" broke the dry spell running at the box office in 2017.
They warned that there will be a breaking point, although it is not clear what that would be.
...

除此之外,我还尝试从原始代码中删除 readline,并跟踪剩余的、不完整的 block 。然后将不完整的 block 传递给下一个 block 并添加到其前面。我现在遇到的问题是,因为文本是按字节 block 读取的,所以一个 block 可能会在没有完全完成一个字符的字节的情况下结束。这将导致解码错误。

from os import stat


def chunkify(pfin, buf_size=1024):
file_end = stat(pfin).st_size
with open(pfin, 'rb') as f:
chunk_end = f.tell()

while True:
chunk_start = chunk_end
f.seek(buf_size, 1)
chunk_end = f.tell()
is_last = chunk_end >= file_end
# yield start position, size, and is_last
yield chunk_start, chunk_end - chunk_start, is_last

if is_last:
break


def process_batch(pfin, chunk_start, chunk_size, is_last, leftover):
with open(pfin, 'r', encoding='utf-8') as f:
f.seek(chunk_start)
chunk = f.read(chunk_size)

# Add previous leftover to current chunk
chunk = leftover + chunk
batch = chunk.splitlines()
batch = list(filter(None, batch))

# If this chunk is not the last one,
# pop the last item as that will be an incomplete sentence
# We return this leftover to use in the next chunk
if not is_last:
leftover = batch.pop(-1)

return batch, leftover


if __name__ == '__main__':
fin = r'ep+gutenberg+news+wiki.txt'

lines_n = 0
left = ''
for start, size, last in chunkify(fin):
lines, left = process_batch(fin, start, size, last, left)

if not lines:
continue

for line in lines:
print(line)
print('\n')

numberlines = len(lines)
lines_n += numberlines

print(lines_n)

运行上面的代码,将不可避免地导致UnicodeDecodeError

Traceback (most recent call last):
File "chunk_tester.py", line 46, in <module>
lines, left = process_batch(fin, start, size, last, left)
File "chunk_tester.py", line 24, in process_batch
chunk = f.read(chunk_size)
File "lib\codecs.py", line 322, in decode
(result, consumed) = self._buffer_decode(data, self.errors, final)
UnicodeDecodeError: 'utf-8' codec can't decode byte 0xa9 in position 0: invalid start byte

最佳答案

你们了!对最终代码进行相对简单的更改(以 bytes 而不是 str 的形式读取数据)就可以(几乎)完成所有工作。

主要问题是因为从二进制文件读取计数字节,但从文本文件读取计数文本,您第一次以字节计数,第二次以< em>字符,导致您对哪些数据已被读取的假设是错误的。这与内部隐藏缓冲区无关。

其他变化:

  • 代码需要在 b'\n' 上拆分,而不是使用 bytes.splitlines(),并且只删除 之后的空行相关检测代码。
  • 除非文件的大小发生变化(在这种情况下,您现有的代码无论如何都会中断),chunkify 可以被一个更简单、更快的循环替换,该循环在功能上是相同的,但没有必须保持文件打开。

这给出了最终代码:

from os import stat

def chunkify(pfin, buf_size=1024**2):
file_end = stat(pfin).st_size

i = -buf_size
for i in range(0, file_end - buf_size, buf_size):
yield i, buf_size, False

leftover = file_end % buf_size
if leftover == 0: # if the last section is buf_size in size
leftover = buf_size
yield i + buf_size, leftover, True

def process_batch(pfin, chunk_start, chunk_size, is_last, leftover):
with open(pfin, 'rb') as f:
f.seek(chunk_start)
chunk = f.read(chunk_size)

# Add previous leftover to current chunk
chunk = leftover + chunk
batch = chunk.split(b'\n')

# If this chunk is not the last one,
# pop the last item as that will be an incomplete sentence
# We return this leftover to use in the next chunk
if not is_last:
leftover = batch.pop(-1)

return [s.decode('utf-8') for s in filter(None, batch)], leftover


if __name__ == '__main__':
fin = r'ep+gutenberg+news+wiki.txt'

lines_n = 0
left = b''
for start, size, last in chunkify(fin):
lines, left = process_batch(fin, start, size, last, left)

if not lines:
continue

for line in lines:
print(line)
print('\n')

numberlines = len(lines)
lines_n += numberlines

print(lines_n)

关于python - 分块处理大文件 : inconsistent seek with readline,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/55835323/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com