gpt4 book ai didi

python - 在 python 中将字节 block 流式传输到 csv 行

转载 作者:行者123 更新时间:2023-12-04 04:31:08 35 4
gpt4 key购买 nike

我需要逐行处理大型远程 CSV,而不需要完全下载它。

下面是我得到的最接近的。我从 Azure 迭代字节 block ,并有一些代码来处理截断的行。但是,如果 csv 值包含换行符,则此方法无法工作,因为我无法区分值换行符和 csv 换行符。

# this does not work
def azure_iter_lines(logger_scope, client, file_path):
# get a StorageStreamDownloader
file_client = client.get_file_client(file_path)
file_handle = file_client.download_file()

truncated_line = ''
for chunk in file_handle.chunks():
# have the previous truncated line appended to the next block
chunk_txt = truncated_line + chunk.decode("utf-8")
lines = chunk_txt.split('\n') # THIS CANNOT WORK AS VALUES CONTAIN NEWLINES
for line in lines[0:len(lines)-2]:
yield line
truncated_line = lines[len(lines)-1]

# process the last chunk (same code)
chunk_txt = truncated_line
lines = chunk_txt.split('\n') # THIS CANNOT WORK AS VALUES CONTAIN NEWLINES
for line in lines[0:len(lines)-2]:
yield line
truncated_line = lines[len(lines)-1]

理想情况下,我会使用 csv.DictReader() 但我无法这样做,因为它会完全下载文件。

# this does not work
def azure_iter_lines(logger_scope, client, file_path):
file_client = client.get_file_client(file_path)
file_handle = file_client.download_file()
buffer = io.BytesIO()
file_handle.readinto(buffer) # THIS DOWNLOADS THE FILE ENTIRELY
csvreader = csv.DictReader(buffer, delimiter=";")
return csvreader

这是使用 @H.Leger 的一些提示进行的更新


file_client = client.get_file_client(file_path)
file_handle = file_client.download_file()
stream = codecs.iterdecode(file_handle.chunks(), 'utf-8')
csvreader = csv.DictReader(stream, delimiter=";")
for row in csvreader:
# => _csv.Error: new-line character seen in unquoted field - do you need to open the file in universal-newline mode?


编辑:更新解决方案以使用 io 而不是编解码器以加快解析速度

import io
import csv
import ctypes as ct

# bytes chunk iterator to python stream adapter

class ChunksAdapter:
def __init__(self, chunks):
self.chunks = chunks
self.buf = b''
self.closed = False

def readable(self):
return True

def writable(self):
return False

def seekable(self):
return False

def close(self):
self.closed = True

def read(self, size):
if not self.buf:
self.buf = next(self.chunks, b'')
res, self.buf = self.buf[:size], self.buf[size:]
return res

# get the downloader object
file_client = client.get_file_client(file_path)
downloader = file_client.download_file()
# adapt the downloader iterator to a byte stream
file_object = ChunksAdapter(downloader.chunks())
# decode bytes stream to utf-8
text_stream = io.TextIOWrapper(file_object, encoding='utf-8', newline='')

# update csv field limit to handle large fields
csv.field_size_limit(int(ct.c_ulong(-1).value // 2))

csvreader = csv.DictReader(text_stream, delimiter=";", quotechar='"', quoting=csv.QUOTE_MINIMAL)
for row in csvreader:


免责声明:我对 Azure 的细节知之甚少。最终,您也希望传输单独的 block 。

在Python中,给定file object ,您可以通过以下方式设置 CSV 流:

import codecs
import csv
codec = codecs.getreader('utf-8')
text_stream = codec(file_object)
csvreader = csv.DictReader(text_stream)

现在您可以迭代 csvreader,它将以流式传输方式从 file_object 读取。

编辑:正如@Martijn Pieters建议的那样,我们可以使用TextIOWrapper而不是编解码器来获得性能:

text_stream = io.TextIOWrapper(file_object, encoding='utf-8', newline='')

检查the comment in csv modulenewline 参数上。

但是 Azure 的 StorageStreamDownloader不提供python的文件对象接口(interface)。它有 .chunks() 生成器(我假设它将调用单独的 HTTP 请求来检索下一个 block )。

您可以使用简单的适配器将 .chunks() 改编为文件对象:

class ChunksAdapter:
def __init__(self, chunks):
self.chunks = chunks
self.buf = b''

def read(self, size):
if not self.buf:
self.buf = next(self.chunks, b'')
res, self.buf = self.buf[:size], self.buf[size:]
return res


downloader = file_client.download_file()
file_object = ChunksAdapter(downloader.chunks())

请务必为the appropriate CSV dialect配置DictReader .

并为 the blob client 上的 max_single_get_sizemax_chunk_get_size 设置适当的值.

关于python - 在 python 中将字节 block 流式传输到 csv 行,我们在Stack Overflow上找到一个类似的问题:

35 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号