gpt4 book ai didi

apache-spark - 使用 Spark 从 DynamoDB JSON 字符串中提取嵌套的 Json 字段?

转载 作者:行者123 更新时间:2023-12-02 01:21:35 25 4
gpt4 key购买 nike

我正在从 Spark 读取一个 dynamodb 表,该表的一个字段中有一个 JSON 字符串,其他字段中有字符串。我能够读取 JSON 字段但不能读取嵌套的 JSON 字段。这不是 query Json Column using dataframes 的重复.这个问题确实解释了如何从 JSON 字符串中提取列而不是嵌套的 JSON 列。

import com.github.traviscrawford.spark.dynamodb._
val users = sqlContext.read.dynamodb("Dynamodb_table")

用户.show(1)

样本数据集

 |col1                                                        | ID | field2|field3|
-------------------------------------------------------------------------------------
|{"a":[{"b":"value1","x":23},{"b":value2,"x":52}],"c":"valC"}|A1 | X1 |Y1 |

我需要从 col1(JSON 结构)和 ID 字段中提取几个字段。我能够弄清楚如何解析 JSON 字段 (col1) 并按照说明从 col1 获取字段“c”here但无法提取嵌套字段。

我的代码:

val users = sqlContext.read.dynamodb("Dynamodb_table")
val data = users.selectExpr("get_json_object(col1, '$.c')","get_json_object(col1, '$.a')","ID")

data.show(1,false)
|a |c |ID|
---------------------------------------------------------
|[{"b":"value1","x":23},{"b":value2","x":52}...]|valC|A1|

现在,当我尝试在上述数据框上应用相同的 get_json_object 时,我得到了所有空值。

val nestedData = data.selectExpr("get_json_object(a, '$.b')","c","ID")
nestedData.show(false)

|get_json_object(a, '$.b')| c | ID|
------------------------------------
|null |valC|A1 |

我也试过爆炸,因为 col 'a' 有数组和结构。但这也不起作用,因为数据框“data”正在将 col/field“a”作为字符串而不是数组返回。有什么解决办法吗?

更新:我还尝试使用 JSON4s 和 net.liftweb.json.parse 进行解析。这也没有帮助

case class aInfo(b: String) 
case class col1(a: Option[aInfo]), c: String)

import net.liftweb.json.parse
val parseJson = udf((data: String) => {
implicit val formats = net.liftweb.json.DefaultFormats
parse(data).extract[Data]
})

val parsed = users.withColumn("parsedJSON", parseJson($"data"))
parsed.show(1)

当我使用这些解析器时,所有值都为 null。

我的预期结果:我试图从数据集中得到一个扁平化的结构

|b     |x |c   | ID|
--------------------
|value1|23|valC|A1 |
|value2|52|valC|A1 |

最佳答案

我相信所有需要的拼图都已经在这里了,所以让我们一步一步来。您的数据相当于:

val df = Seq((
"""{"a":[{"b":"value1"},{"b": "value2"}],"c":"valC"}""", "A1", "X1", "Y1"
)).toDF("col1", "ID", "field2", "field3")

Spark 提供了 json4s,它实现了与 Lift 相同的查询 API:

import org.json4s._
import org.json4s.jackson.JsonMethods._

我们可以使用例如 LINQ 风格的 API 来定义一个 UDF:

val getBs = udf((s: String) => for { 
JString(b) <- parse(s) \ "a" \ "b"
} yield b)

如果你想提取多个字段,你当然可以扩展它。例如,如果 JSON 字符串有多个字段

{"a":[{"b":"value1","d":1},{"b":"value2","d":2}],"c":"valC"}

你可以:

for  {
JObject(a) <- parse(s) \ "a"
JField("b", JString(b)) <- a
JField("d", JInt(d)) <- a
} yield (b, d)

这假定两个字段都存在,否则将不会匹配。要处理缺失的字段,您可能更喜欢 XPath-like表达式或提取器:

case class A(b: Option[String], d: Option[Int])

(parse(s) \ "a").extract(Seq[A])

像这样的 UDF 可以与 explode 一起使用来提取字段:

val withBs = df.withColumn("b", explode(getBs($"col1")))

结果:

+--------------------+---+------+------+------+
| col1| ID|field2|field3| b|
+--------------------+---+------+------+------+
|{"a":[{"b":"value...| A1| X1| Y1|value1|
|{"a":[{"b":"value...| A1| X1| Y1|value2|
+--------------------+---+------+------+------+

您尝试使用 Lift 是不正确的,因为您希望 aaInfo 的序列,但仅将其定义为 Option[aInfo]。它应该是 Option[Seq[aInfo]]:

case class col1(a: Option[Seq[aInfo]], c: String)

使用这样定义的类,解析应该可以正常工作。

如果您使用当前版本 (Spark 2.1.0),则 SPARK-17699 引入了一个 from_json 方法这需要一个架构:

import org.apache.spark.sql.types._

val bSchema = StructType(Seq(StructField("b", StringType, true)))
val aSchema = StructField("a", ArrayType(bSchema), true)
val cSchema = StructField("c", StringType, true)

val schema = StructType(Seq(aSchema, cSchema))

并且可以应用为:

import org.apache.spark.sql.functions.from_json

val parsed = df.withColumn("col1", from_json($"col1", schema))

之后您可以使用通常的符号选择字段:

parsed.select($"col1.a.b")

关于apache-spark - 使用 Spark 从 DynamoDB JSON 字符串中提取嵌套的 Json 字段?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/39924514/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com