gpt4 book ai didi

python - 如何在 python 中使用 pyarrow 从 S3 读取分区 Parquet 文件

转载 作者:IT老高 更新时间:2023-10-28 21:11:58 27 4
gpt4 key购买 nike

我正在寻找使用 python 从 s3 中读取多个分区目录中的数据的方法。

data_folder/serial_number=1/cur_date=20-12-2012/abcdsd0324324.snappy.parquetdata_folder/serial_number=2/cur_date=27-12-2012/asdsdfsd0324324.snappy.parquet

pyarrow 的 ParquetDataset 模块具有从分区读取的能力。所以我尝试了以下代码:

>>> import pandas as pd
>>> import pyarrow.parquet as pq
>>> import s3fs
>>> a = "s3://my_bucker/path/to/data_folder/"
>>> dataset = pq.ParquetDataset(a)

它抛出了以下错误:

Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "/home/my_username/anaconda3/lib/python3.6/site-packages/pyarrow/parquet.py", line 502, in __init__
self.metadata_path) = _make_manifest(path_or_paths, self.fs)
File "/home/my_username/anaconda3/lib/python3.6/site-packages/pyarrow/parquet.py", line 601, in _make_manifest
.format(path))
OSError: Passed non-file path: s3://my_bucker/path/to/data_folder/

根据我尝试使用 s3fs 作为文件系统的 pyarrow 文档,即:

>>> dataset = pq.ParquetDataset(a,filesystem=s3fs)

这会引发以下错误:

Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "/home/my_username/anaconda3/lib/python3.6/site-packages/pyarrow/parquet.py", line 502, in __init__
self.metadata_path) = _make_manifest(path_or_paths, self.fs)
File "/home/my_username/anaconda3/lib/python3.6/site-packages/pyarrow/parquet.py", line 583, in _make_manifest
if is_string(path_or_paths) and fs.isdir(path_or_paths):
AttributeError: module 's3fs' has no attribute 'isdir'

我仅限于使用 ECS 集群,因此 spark/pyspark 不是一个选项

有没有一种方法可以让我们在 python 中从 s3 中的此类分区目录轻松读取 Parquet 文件?我觉得列出所有目录然后阅读并不是一个好习惯,正如本文所建议的 link .我需要将读取的数据转换为 pandas 数据帧以进行进一步处理,因此更喜欢与 fastparquet 或 pyarrow 相关的选项。我也对 python 中的其他选项持开放态度。

最佳答案

我设法使用最新版本的 fastparquet 和 s3fs 来解决这个问题。以下是相同的代码:

import s3fs
import fastparquet as fp
s3 = s3fs.S3FileSystem()
fs = s3fs.core.S3FileSystem()

#mybucket/data_folder/serial_number=1/cur_date=20-12-2012/abcdsd0324324.snappy.parquet
s3_path = "mybucket/data_folder/*/*/*.parquet"
all_paths_from_s3 = fs.glob(path=s3_path)

myopen = s3.open
#use s3fs as the filesystem
fp_obj = fp.ParquetFile(all_paths_from_s3,open_with=myopen)
#convert to pandas dataframe
df = fp_obj.to_pandas()

感谢 martin 通过我们的 conversation 为我指明了正确的方向

NB :这将比使用 pyarrow 慢,基于 benchmark .一旦通过 ARROW-1213 在 pyarrow 中实现 s3fs 支持,我将更新我的答案

我使用 pyarrow 对单个迭代进行了快速基准测试,并将文件列表作为 glob 发送到 fastparquet。使用 s3fs 与 pyarrow + 我的 hackish 代码相比,fastparquet 更快。但我认为 pyarrow +s3fs 实现后会更快。

代码和基准如下:

>>> def test_pq():
... for current_file in list_parquet_files:
... f = fs.open(current_file)
... df = pq.read_table(f).to_pandas()
... # following code is to extract the serial_number & cur_date values so that we can add them to the dataframe
... #probably not the best way to split :)
... elements_list=current_file.split('/')
... for item in elements_list:
... if item.find(date_partition) != -1:
... current_date = item.split('=')[1]
... elif item.find(dma_partition) != -1:
... current_dma = item.split('=')[1]
... df['serial_number'] = current_dma
... df['cur_date'] = current_date
... list_.append(df)
... frame = pd.concat(list_)
...
>>> timeit.timeit('test_pq()',number =10,globals=globals())
12.078817503992468

>>> def test_fp():
... fp_obj = fp.ParquetFile(all_paths_from_s3,open_with=myopen)
... df = fp_obj.to_pandas()

>>> timeit.timeit('test_fp()',number =10,globals=globals())
2.961556333000317

2019 年更新

在所有 PR 之后,诸如 Arrow-2038 之类的问题& Fast Parquet - PR#182已经解决了。

使用 Pyarrow 读取 parquet 文件

# pip install pyarrow
# pip install s3fs

>>> import s3fs
>>> import pyarrow.parquet as pq
>>> fs = s3fs.S3FileSystem()

>>> bucket = 'your-bucket-name'
>>> path = 'directory_name' #if its a directory omit the traling /
>>> bucket_uri = f's3://{bucket}/{path}'
's3://your-bucket-name/directory_name'

>>> dataset = pq.ParquetDataset(bucket_uri, filesystem=fs)
>>> table = dataset.read()
>>> df = table.to_pandas()

使用 Fast parquet 读取 parquet 文件

# pip install s3fs
# pip install fastparquet

>>> import s3fs
>>> import fastparquet as fp

>>> bucket = 'your-bucket-name'
>>> path = 'directory_name'
>>> root_dir_path = f'{bucket}/{path}'
# the first two wild card represents the 1st,2nd column partitions columns of your data & so forth
>>> s3_path = f"{root_dir_path}/*/*/*.parquet"
>>> all_paths_from_s3 = fs.glob(path=s3_path)

>>> fp_obj = fp.ParquetFile(all_paths_from_s3,open_with=myopen, root=root_dir_path)
>>> df = fp_obj.to_pandas()

快速基准测试

这可能不是对其进行基准测试的最佳方法。请阅读blog post通过基准测试

#pyarrow
>>> import timeit
>>> def test_pq():
... dataset = pq.ParquetDataset(bucket_uri, filesystem=fs)
... table = dataset.read()
... df = table.to_pandas()
...
>>> timeit.timeit('test_pq()',number =10,globals=globals())
1.2677053569998407

#fastparquet
>>> def test_fp():
... fp_obj = fp.ParquetFile(all_paths_from_s3,open_with=myopen, root=root_dir_path)
... df = fp_obj.to_pandas()

>>> timeit.timeit('test_fp()',number =10,globals=globals())
2.931876824000028

进一步阅读 Pyarrow 的 speed

引用:

关于python - 如何在 python 中使用 pyarrow 从 S3 读取分区 Parquet 文件,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/45082832/

27 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com