gpt4 book ai didi

json - 如何在Spark SQL中查询StringType的1个字段具有json值的数据框

转载 作者:行者123 更新时间:2023-12-04 21:09:09 24 4
gpt4 key购买 nike

我正在尝试在 spark 数据框上使用 SQL。但是数据框有 1 个值具有字符串(类似于 JSON 的结构):

我将数据框保存到临时表:TestTable

当我做 desc 时:

col_name                       data_type
requestId string
name string
features string

但 features 值是一个 json :
{"places":11,"movies":2,"totalPlacesVisited":0,"totalSpent":13,"SpentMap":{"Movie":2,"Park Visit":11},"benefits":{"freeTime":13}}

我只想在 TestTable 上查询 totalSpent > 10。有人能告诉我该怎么做吗?

我的 JSON 文件如下所示:
   {
"requestId": 232323,
"name": "ravi",
"features": "{"places":11,"movies":2,"totalPlacesVisited":0,"totalSpent":13,"SpentMap":{"Movie":2,"Park Visit":11},"benefits":{"freeTime":13}}"
}

特征是一个字符串。我只需要 totalSpent 。我试过:
val features = StructType( 
Array(StructField("totalSpent",LongType,true),
StructField("movies",LongType,true)
))

val schema = StructType(Array(
StructField("requestId",StringType,true),
StructField("name",StringType,true),
StructField("features",features,true),
)
)

val records = sqlContext.read.schema(schema).json(filePath)

由于每个请求都有一个 JSON 功能字符串。但这给了我错误。

当我尝试
val records = sqlContext.jsonFile(filePath)

records.printSchema

给我看:
root
|-- requestId: string (nullable = true)
|-- features: string (nullable = true)
|-- name: string (nullable = true)

我可以在创建架构时在 StructField 中使用并行化吗?我试过:
I first tried with : 

val customer = StructField("features",StringType,true)
val events = sc.parallelize(customer :: Nil)


val schema = StructType(Array(
StructField("requestId",StringType,true),
StructField("name", StructType(events, true),true),
StructField("features",features,true),
)
)

这也给了我错误。也试过:
import net.liftweb.json.parse

case class KV(k: String, v: Int)

val parseJson = udf((s: String) => {
implicit val formats = net.liftweb.json.DefaultFormats
parse(s).extract[KV]
})

val parsed = records.withColumn("parsedJSON", parseJson($"features"))
parsed.show

This gives me :
<console>:78: error: object liftweb is not a member of package net
import net.liftweb.json.parse

试过:

我试过:
 val parseJson = udf((s: String) => {
sqlContext.read.json(s)
})

val parsed = records.withColumn("parsedJSON", parseJson($"features"))
parsed.show

但是又报错。

试过:
import org.json4s._ 
import org.json4s.jackson.JsonMethods._

val parseJson = udf((s: String) => {
parse(s)
})

val parsed = records.withColumn("parsedJSON", parseJson($"features"))
parsed.show

但它给了我:
java.lang.UnsupportedOperationException: Schema for type org.json4s.JValue is not supported
at org.apache.spark.sql.catalyst.ScalaReflection$class.schemaFor(ScalaReflection.scala:153)
at org.apache.spark.sql.catalyst.ScalaReflection$.schemaFor(ScalaReflection.scala:29)
at org.apache.spark.sql.catalyst.ScalaReflection$class.schemaFor(ScalaReflection.scala:64)
at org.apache.spark.sql.catalyst.ScalaReflection$.schemaFor(ScalaReflection.scala:29)

这给了我正确的模式(基于 zero323 给出的答案:
val extractFeatures = udf((features: String) => Try {
implicit val formats = DefaultFormats
parse(features).extract[Features]
}.toOption)

val parsed = records.withColumn("features", extractFeatures($"features"))

parsed.printSchema

但是当我查询时:
val value = parsed.filter($"requestId" === "232323" ).select($"features.totalSpent")
value.show gives null .

最佳答案

当您从 UDF 返回数据时,它必须可以表示为 SQL 类型,而 JSON AST 则不是。一种方法是创建一个与此类似的案例类:

case class Features(
places: Integer,
movies: Integer,
totalPlacesVisited: Integer,
totalSpent: Integer,
SpentMap: Map[String, Integer],
benefits: Map[String, Integer]
)

并将其用于 extract对象:
val df = Seq((
232323, "ravi",
"""{"places":11,"movies":2,"totalPlacesVisited":0,"totalSpent":13,"SpentMap":{"Movie":2,"Park Visit":11},"benefits":{"freeTime":13}}"""
)).toDF("requestId", "name", "features")

val extractFeatures = udf((features: String) =>
parse(features).extract[Features])

val parsed = df.withColumn("features", extractFeatures($"features"))
parsed.show(false)

// +---------+----+-----------------------------------------------------------------+
// |requestId|name|features |
// +---------+----+-----------------------------------------------------------------+
// |232323 |ravi|[11,2,0,13,Map(Movie -> 2, Park Visit -> 11),Map(freeTime -> 13)]|
// +---------+----+-----------------------------------------------------------------+

parsed.printSchema

// root
// |-- requestId: integer (nullable = false)
// |-- name: string (nullable = true)
// |-- features: struct (nullable = true)
// | |-- places: integer (nullable = true)
// | |-- movies: integer (nullable = true)
// | |-- totalPlacesVisited: integer (nullable = true)
// | |-- totalSpent: integer (nullable = true)
// | |-- SpentMap: map (nullable = true)
// | | |-- key: string
// | | |-- value: integer (valueContainsNull = true)
// | |-- benefits: map (nullable = true)
// | | |-- key: string
// | | |-- value: integer (valueContainsNull = true)

根据其他记录和预期用途,您应该调整表示并添加相关的错误处理逻辑。

您还可以使用 DSL 以字符串形式访问各个字段:
val getMovieSpent = udf((s: String) => 
compact(render(parse(s) \\ "SpentMap" \\ "Movie")))

df.withColumn("movie_spent", getMovieSpent($"features").cast("bigint")).show
// +---------+----+--------------------+-----------+
// |requestId|name| features|movie_spent|
// +---------+----+--------------------+-----------+
// | 232323|ravi|{"places":11,"mov...| 2|
// +---------+----+--------------------+-----------+

有关替代方法,请参阅 How to query JSON data column using Spark DataFrames?

关于json - 如何在Spark SQL中查询StringType的1个字段具有json值的数据框,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/38418079/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com