gpt4 book ai didi

python - 使用spark SQL读取Parquet格式的不存在列

转载 作者:行者123 更新时间:2023-12-01 02:34:34 25 4
gpt4 key购买 nike

我有两个月的 Parquet 文件2017_01.parquet2017_08.parquet,这些架构是:

2017_01. Parquet :

root
|-- value: struct (nullable = true)
| |-- version: struct (nullable = true)
| | |-- major: integer (nullable = true)
| | |-- minor: integer (nullable = true)
| |-- guid: string (nullable = true)

2017_08. Parquet :

root
|-- value: struct (nullable = true)
| |-- version: struct (nullable = true)
| | |-- major: integer (nullable = true)
| | |-- minor: integer (nullable = true)
| | |-- vnum: integer (nullable = true)
| |-- guid: string (nullable = true)

和我的代码

SQL = """
SELECT value.version.major,
value.version.minor,
value.version.vnum
FROM OUT_TABLE
LIMIT 10"""

parquetFile = spark.read.parquet("/mydata/2017_08.parquet")
parquetFile.createOrReplaceTempView("OUT_TABLE")
out_osce = spark.sql(SQL)
out_osce.show()

当我加载 2017_08.parquet show 时:

+-----+-----+----+
|major|minor|vnum|
+-----+-----+----+
| 0001| 4610|1315|
| 0002| 4610|6206|
| 0003| 4610|6125|

但是如果我像这样加载 2017_01.parquetparquetFile = Spark.read.parquet("/mydata/2017_01.parquet")

SQL 显示错误:

pyspark.sql.utils.AnalysisException: u'No such struct field vnum in major, minor; line 4 pos 11'

我知道原因是 2017_01.parquet 没有 vnum 列,我有两种解决方案,一种是使用 mergeSchema 另一个是在读取 parquet 文件时使用 schema,但这些方法也有一个大问题。

第一个解决方案需要读取2017_08.parquet,如果我不需要08的数据就会有问题,如果运气不好vnum是一个选项列而08没有这个列它仍然错误

第二种方案是在读取时给出schema,如spark.read.schema(schema).parquet("/mydata/2017_01.parquet"),这种方式需要先写入schema,但是如果文件是非常复杂的嵌套表,用户可能无法写入架构,并且架构将更新。

我想问任何人有第三种解决方案,然后只读取2017_01.parquet并输出如下:

+-----+-----+----+
|major|minor|vnum|
+-----+-----+----+
| 0001| 4600|null|
| 0002| 4600|null|
| 0003| 4600|null|

最佳答案

我能够通过在创建选择时检查 DF 的列列表来解决类似的问题。就我而言,以下内容就足够了:

 parquetFile = spark.read.parquet("").withColumn("vnum", coalesce(if
parquetFile.columns.contains("vnum") $"vnum" else lit(null)))

在您的情况下,使用嵌套架构,您可以使用类似以下内容的内容:

// Define the full struct type schema to check if nested field exists.

val structToCheck = new StructField("value", new StructType().add("version",new StructType().add("major",StringType).add("minor",StringType).add("vnum",StringType)))

val SQL = """ SELECT value.version.major,
value.version.minor,""" +
if (parquetFile.schema.contains(structToCheck))
"value.version.vnum"
else
"'' as vnum" +
"FROM OUT_TABLE LIMIT 10"

您还可以进行一些更具体的搜索,获取 value.version 结构并检查其元素。

关于python - 使用spark SQL读取Parquet格式的不存在列,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/46397598/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com