gpt4 book ai didi

apache-spark - PySpark:使用具有 1000 个字段但具有可变列数的行的模式创建 RDD->DF->Parquet

转载 作者:可可西里 更新时间:2023-11-01 14:50:56 25 4
gpt4 key购买 nike

我正在尝试读取一个 ElasticSearch 索引,它有数百万个文档,每个文档都有可变数量的字段。我有一个模式,其中有 1000 个字段,每个字段都有自己的名称和类型。

现在,当我通过 ES-Hadoop 连接器创建一个 RDD 并稍后通过指定模式转换为一个 DataFrame 时,它没有说 -

Input row doesn't have expected number of values required by the schema

我有几个问题。1. 是否有可能有一个 RDD/DF 的行包含可变数量的字段?如果不是,除了为每列中缺失的字段添加空值外,还有什么替代方法?

  1. 我看到默认情况下 Spark 将所有内容转换为 StringType,因为我使用 sc.newAPIHadoopRDD() 调用。我如何根据模式中的字段名称将它们类型转换为正确的类型?某种映射?

  2. 我想用 Parquet 格式编写此文件,并将架构添加到文件中。与具有 1000 个字段的模式相比,那些缺失的字段会发生什么情况。

最佳答案

  1. 您不能拥有可变数量的列,但您可以使用数组或 map 等集合类型的一列,这在 Python 中相当于字典。这允许您在列中存储可变长度数据。否则是的,您需要为架构中的每一列都有一个值。您通常会用空值填充缺失值。

  2. 如果您已经有一个数据框,并且您有一个函数 get_column_type 从列名称中获取类型名称,您可以像这样重铸整个数据框:

    import pyspark.sql.functions as F
    select_expressions = [ F.col(column_name).cast(get_column_type(column_name)) for column_name in column_list]
    recasted_df = df.select(*select_expressions)
  3. parquet 文件将包含数据框中的任何列。如果您想要文件中的 1000 个字段,它们必须在数据框中,因此您必须用空值或其他一些值填充缺失值。

现在,如果你把所有这些点放在一起,你可能想做这样的事情:

  • 将每个弹性文档读入一行,其中包含 id 字段和类型为 MapType 的 doc 字段。
  • 分解 文档字段,所以现在您有 3 列:idkeyvalue,其中一列每个文档中每个键 的行。此时,您可以写入 parquet 文件并完成该过程。

如果您想要具有完整架构的数据框,则必须执行以下额外步骤:

  • 对结果进行透视,为 每个 id 生成一行,并为文档中的每个键生成一列及其对应的值:pivoted_df = df.groupBy('id')。 pivot('key').agg(F.first('value')
  • 此数据框包含数据中存在的所有字段。如果您知道完整的架构,则可以为缺少的那些添加虚拟列:df = df.withColumn('new_column', lit(None).cast(StringType())
  • 最后用第 2 点中的代码重铸列,并删除列 id。您可以将其写入 parquet,它将包含您的大架构中的所有列。

关于apache-spark - PySpark:使用具有 1000 个字段但具有可变列数的行的模式创建 RDD->DF->Parquet,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/55111370/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com