gpt4 book ai didi

azure - 如何与 asyncio 同时将 parquet 文件从 Azure Blob 读取到 Pandas DataFrame 中?

转载 作者:行者123 更新时间:2023-12-03 03:22:42 25 4
gpt4 key购买 nike

以下问题:How to read parquet files from Azure Blobs into Pandas DataFrame?我想通过使用 asyncio“并行”下载多个文件来添加并发性。

我不知道如何使用 Python 3.11 的 TaskGroup 功能来启动我的任务并等待它完成。如何检索已下载流的列表?

到目前为止我的代码:

import logging
import asyncio
import pandas as pd
from azure.storage.blob.aio import ContainerClient
from io import BytesIO
from itertools import product

class BlobStorageAsync:
def __init__(self, connection_string, container_name, logging_enable):
self.connection_string = connection_string
self.container_name = container_name
container_client = ContainerClient.from_connection_string(
conn_str=connection_string,
container_name=container_name,
# This client will log detailed information about its HTTP sessions, at DEBUGlevel
logging_enable=logging_enable
)
self.container_client = container_client

async def list_blobs_in_container_async(self, name_starts_with):
blobs_list = []
async for blob in self.container_client.list_blobs(name_starts_with=name_starts_with):
blobs_list.append(blob)
return blobs_list

async def download_blob_async(self, blob_name):
blob_client = self.container_client.get_blob_client(blob=blob_name)
async with blob_client:
stream = await blob_client.download_blob()
data = await stream.readall() # data returned as bytes-like object
# return data as bytes (in-memory binary stream)
return BytesIO(data)

async def download_blobs_async(self, blobs_list):
tasks = []
for blob_name in blobs_list:
task = asyncio.create_task(self.download_blob_async(blob_name))
tasks.append(task)
await asyncio.gather(*tasks)

现在我陷入困境,因为我无法创建和运行任务,并等待结果,即这不起作用:

async def main():
blobs_list = ...
connection_string = ...
container_name = ...
BSA = BlobStorageAsync(connection_string, container_name, logging_enable)
result = asyncio.run(BSA.download_blobs_async(blobs_list))
# process the result: read the first stream and print it, for instance
df = pd.read_parquet(result[0])
print(df)

if __name__ == '__main__':
try:
main()
except Exception as ex:
print(ex)

最佳答案

How to read parquet files from Azure Blobs into Pandas DataFrame concurrently with asyncio?

您可以使用以下代码与 asyncio 同时将 parquet 文件从 Azure blob 读取到 Pandas DataFrame 中。

代码:

import asyncio
import pandas as pd
from azure.storage.blob.aio import ContainerClient
from io import BytesIO


class BlobStorageAsync:
def __init__(self, connection_string, container_name):
self.connection_string = connection_string
self.container_name = container_name
container_client = ContainerClient.from_connection_string(
conn_str=connection_string,
container_name=container_name,
)
self.container_client = container_client

async def list_blobs_in_container_async(self, name_starts_with):
blobs_list = []
async for blob in self.container_client.list_blobs(name_starts_with=name_starts_with):
blobs_list.append(blob)
return blobs_list

async def download_blob_async(self, blob_name):
blob_client = self.container_client.get_blob_client(blob=blob_name)
async with blob_client:
stream = await blob_client.download_blob()
data = await stream.readall() # data returned as bytes-like object
# return data as bytes (in-memory binary stream)
return BytesIO(data)

async def download_blobs_async(self, blobs_list):
tasks = []
for blob_name in blobs_list:
task = asyncio.create_task(self.download_blob_async(blob_name))
tasks.append(task)
results = await asyncio.gather(*tasks)
# return the list of downloaded streams
return results

async def main():
blobs_list=["pqt_file4","pqt_file5"]
connection_string =""
container_name = "test1"
BSA = BlobStorageAsync(connection_string, container_name)
try:
results = await BSA.download_blobs_async(blobs_list)
for stream in results:
df = pd.read_parquet(stream, engine="pyarrow")
print(df)
finally:
await BSA.container_client.close()

if __name__ == '__main__':
try:
asyncio.run(main())
except Exception as ex:
print(ex)

输出:

  id       name
0 1 Kala
1 2 Arulmozhi
2 6 Rajaraja
id name
0 1 Aditha
1 2 Arulmozhi
2 3 Kundavai
3 6 Rajaraja

enter image description here

关于azure - 如何与 asyncio 同时将 parquet 文件从 Azure Blob 读取到 Pandas DataFrame 中?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/76581623/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com