gpt4 book ai didi

json - Spark 数据帧 : reading json having duplicate column names but different datatypes

转载 作者:行者123 更新时间:2023-12-04 15:24:31 26 4
gpt4 key购买 nike

我有如下所示的 json 数据,其中版本字段是区分符 -

file_1 = {"version": 1, "stats": {"hits":20}}

file_2 = {"version": 2, "stats": [{"hour":1,"hits":10},{"hour":2,"hits":12}]}

在新格式中,stats 列现在是 Arraytype(StructType)

之前只需要 file_1 所以我用的是

spark.read.schema(schema_def_v1).json(path)

现在我需要读取这些类型的多个 json 文件。我不能在 schema_def 中将 stats 定义为字符串,因为这会影响 corruptrecord 功能(对于 stats 列),该功能检查所有字段的格式错误的 json 和模式合规性。

1 只读中所需的示例 df 输出 -

version | hour | hits
1 | null | 20
2 | 1 | 10
2 | 2 | 12

我尝试使用 mergeSchema 选项进行读取,但这使得统计字段成为字符串类型。

此外,我尝试通过过滤版本字段并应用 spark.read.schema(schema_def_v1).json(df_v1.toJSON) 来制作两个数据帧。这里的 stats 列也变成了 String 类型。

我在想,如果在阅读时,我可以根据数据类型将 df 列标题解析为 stats_v1stats_v2 可以解决问题。请帮助解决任何可能的问题。

最佳答案

UDF 用于检查字符串或数组,如果是字符串则将字符串转换为数组。

import org.apache.spark.sql.functions.udf
import org.json4s.{DefaultFormats, JObject}
import org.json4s.jackson.JsonMethods.parse
import org.json4s.jackson.Serialization.write
import scala.util.{Failure, Success, Try}

object Parse {
implicit val formats = DefaultFormats
def toArray(data:String) = {
val json_data = (parse(data))
if(json_data.isInstanceOf[JObject]) write(List(json_data)) else data
}
}

val toJsonArray = udf(Parse.toArray _)

scala> "ls -ltr /tmp/data".!
total 16
-rw-r--r-- 1 srinivas root 37 Jun 26 17:49 file_1.json
-rw-r--r-- 1 srinivas root 69 Jun 26 17:49 file_2.json
res4: Int = 0

scala> val df = spark.read.json("/tmp/data").select("stats","version")
df: org.apache.spark.sql.DataFrame = [stats: string, version: bigint]

scala> df.printSchema
root
|-- stats: string (nullable = true)
|-- version: long (nullable = true)

scala> df.show(false)
+-------+-------------------------------------------+
|version|stats |
+-------+-------------------------------------------+
|1 |{"hits":20} |
|2 |[{"hour":1,"hits":10},{"hour":2,"hits":12}]|
+-------+-------------------------------------------+

输出

scala> 

import org.apache.spark.sql.types._
val schema = ArrayType(MapType(StringType,IntegerType))

df
.withColumn("json_stats",explode(from_json(toJsonArray($"stats"),schema)))
.select(
$"version",
$"stats",
$"json_stats".getItem("hour").as("hour"),
$"json_stats".getItem("hits").as("hits")
).show(false)

+-------+-------------------------------------------+----+----+
|version|stats |hour|hits|
+-------+-------------------------------------------+----+----+
|1 |{"hits":20} |null|20 |
|2 |[{"hour":1,"hits":10},{"hour":2,"hits":12}]|1 |10 |
|2 |[{"hour":1,"hits":10},{"hour":2,"hits":12}]|2 |12 |
+-------+-------------------------------------------+----+----+

没有 UDF

scala> val schema = ArrayType(MapType(StringType,IntegerType))

scala> val expr = when(!$"stats".contains("[{"),concat(lit("["),$"stats",lit("]"))).otherwise($"stats")

df
.withColumn("stats",expr)
.withColumn("stats",explode(from_json($"stats",schema)))
.select(
$"version",
$"stats",
$"stats".getItem("hour").as("hour"),
$"stats".getItem("hits").as("hits")
)
.show(false)

+-------+-----------------------+----+----+
|version|stats |hour|hits|
+-------+-----------------------+----+----+
|1 |[hits -> 20] |null|20 |
|2 |[hour -> 1, hits -> 10]|1 |10 |
|2 |[hour -> 2, hits -> 12]|2 |12 |
+-------+-----------------------+----+----+

关于json - Spark 数据帧 : reading json having duplicate column names but different datatypes,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/62559096/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com