gpt4 book ai didi

Python "FileHandler"到 Azure Blob 存储 - 没有这样的事情吗

转载 作者:行者123 更新时间:2023-12-03 02:22:41 26 4
gpt4 key购买 nike

我遇到过这样的情况:blob 存储中有一个相当大的 SAS 文件 (300GB),需要在 Azure ML 服务工作区中进行处理。他们的主要任务是将其转换为一堆 parquet 文件。

当然,我可以将文件下载到 Azure ML 工作区的 FileShare 容器,然后使用 pandas read_sas() 定义适当的 block 大小来处理该文件:

    local_file_path = "./dld/mysas.sas7bat"
with open(local_file_path,"wb") as local_file:
downloader = blobclient.download_blob()
downloader.readinto(local_file)

reader = pd.read_sas(local_file_path, format='sas7bdat', chunksize=1000000)

count = 0
prefix="chunks/testchunk"
chunk: pd.DataFrame
for chunk in reader:
count += 1
name = prefix + str(count) + ".parquet"
chunk.to_parquet(name, engine = "pyarrow")

这当然有效。但是,我希望有一种更有效的方法。就像能够直接从 blob 存储创建“文件流”一样。无需先将其下载到已安装的 Azure 文件共享中。但我还没有发现任何东西。所以我为 blob 存储编写了自己的“流包装器”:

class BlobStorageFileHandler(RawIOBase):

def __init__(self, storageDownloader: StorageStreamDownloader):
self.downloader = storageDownloader
self.chunks = self.downloader.chunks()
self.current_pos = 0
self.current_chunk_len = 0
self.current_chunk = None
self._read_next_chunk()

def _read_next_chunk(self):
self.current_chunk: bytes = next(self.chunks, None)
if self.current_chunk is None:
self.current_chunk_len = 0
else:
self.current_chunk_len = len(self.current_chunk)

def seekable(self) -> bool:
return True

def seek(self, offset, whence=SEEK_SET):
print("seek: ", offset)

def readable(self) -> bool:
return True

def read(self, size=-1):
end_pos = self.current_pos + size

# if more bytes are requested that are left in the current read bytes-chunk
if end_pos > self.current_chunk_len:
# number of bytes that have to be read from the old chunk
rest_part = self.current_chunk_len - self.current_pos
# number of bytes that have to be read from the next chunk
new_part = end_pos - self.current_chunk_len

old_chunk_end: bytes = b''
if rest_part > 0:
old_chunk_end = self.current_chunk[self.current_pos: self.current_pos + rest_part]

self._read_next_chunk()

new_chunk_part: bytes
# if there was no further chunk left to be read
if self.current_chunk is None:
if rest_part > 0:
return old_chunk_end
return b''

if self.current_chunk_len > new_part:
new_chunk_part = self.current_chunk[0: new_part]
else:
new_chunk_part = self.current_chunk

self.current_pos = new_part
return old_chunk_end + new_chunk_part
else:
result = self.current_chunk[self.current_pos:self.current_pos + size]
self.current_pos += size
return result

使用此类,无需首先将数据下载到已安装的 FileShare(或任何本地目录,如果在本地使用):

    sas_file = BlobStorageFileHandler(blobclient.download_blob())
reader = pd.read_sas(sas_file, format='sas7bdat', chunksize=1000000)

这只是第一个解决方案,不是很复杂,但它按预期工作。

但是,我无法想象我是唯一有这种需求的人,因此,我想知道是否还有其他解决方案或现有的包装类,就像我上面展示的那样。

欢迎任何意见。

谢谢汉斯约格

最佳答案

您可以使用 download_blob 分块下载 blob方法也是如此。基本上,您需要指定 offset (起始位置)和 length (要读取的字节数)参数,该方法只会将这些字节返回给您。

在您当前的实现中,由于您没有指定这些参数,因此 SDK 将下载整个 blob。

关于Python "FileHandler"到 Azure Blob 存储 - 没有这样的事情吗,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/68219950/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com