gpt4 book ai didi

pandas - dask 读取 Parquet 并指定架构

转载 作者:行者123 更新时间:2023-12-03 23:39:19 27 4
gpt4 key购买 nike

在读取 Parquet 文件时,是否有相当于 spark 指定模式的能力?可能使用传递给 pyarrow 的 kwargs?
我在一个存储桶中有一堆 Parquet 文件,但有些字段的名称略有不一致。我可以在阅读这些案例后创建一个自定义延迟函数来处理这些案例,但我希望在通过全局连接打开它们时可以指定模式。也许不是,因为我猜想通过 globing 打开然后将尝试连接它们。由于字段名称不一致,这当前失败。
创建一个 Parquet 文件:

import dask.dataframe as dd

df = dd.demo.make_timeseries(
start="2000-01-01",
end="2000-01-03",
dtypes={"id": int, "z": int},
freq="1h",
partition_freq="24h",
)

df.to_parquet("df.parquet", engine="pyarrow", overwrite=True)
通过dask读入并在阅读后指定模式:
df = dd.read_parquet("df.parquet", engine="pyarrow")
df["z"] = df["z"].astype("float")
df = df.rename(columns={"z": "a"})
通过 spark 读取它并指定架构:
from pyspark.sql import SparkSession
import pyspark.sql.types as T
spark = SparkSession.builder.appName('App').getOrCreate()

schema = T.StructType(
[
T.StructField("id", T.IntegerType()),
T.StructField("a", T.FloatType()),
T.StructField("timestamp", T.TimestampType()),
]
)

df = spark.read.format("parquet").schema(schema).load("df.parquet")

最佳答案

其中一些选项是:

  • 加载后指定数据类型(需要一致的列名):

  • custom_dtypes = {"a": float, "id": int, "timestamp": pd.datetime}
    df = dd.read_parquet("df.parquet", engine="pyarrow").astype(custom_dtypes)

    This currently fails because of the inconsistent field names.


  • 如果文件中的列名不同,您可能需要使用自定义 delayed加载前:

  • @delayed
    def custom_load(path):
    df = pd.read_parquet(path)
    # some logic to ensure consistent columns
    # for example:
    if "z" in df.columns:
    df = df.rename(columns={"z": "a"}).astype(custom_dtypes)
    return df

    dask_df = dd.from_delayed([custom_load(path) for path in glob.glob("some_path/*parquet")])

    关于pandas - dask 读取 Parquet 并指定架构,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/66897523/

    27 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com