gpt4 book ai didi

json - Spark Streaming Scala 将不同结构的json组合成一个DataFrame

转载 作者:行者123 更新时间:2023-12-02 04:34:20 25 4
gpt4 key购买 nike

我正在尝试处理来自 Kinesis 的 Json 字符串。 Json 字符串可以有几种不同的形式。在 Kinesis 中,我创建了一个 DStream:

val kinesisStream = KinesisUtils.createStream(
ssc, appName, "Kinesis_Stream", "kinesis.ap-southeast-1.amazonaws.com",
"region", InitialPositionInStream.LATEST, kinesisCheckpointInterval, StorageLevel.MEMORY_AND_DISK_2)

val lines = kinesisStream.map(x => new String(x))

lines.foreachRDD((rdd, time) =>{

val sqlContext = SQLContextSingleton.getInstance(rdd.sparkContext)
import sqlContext.implicits.StringToColumn

if(rdd.count() > 0){
// Process jsons here
// Json strings here would have either one of the formats below
}
})

RDD 字符串将具有这些 json 字符串之一。收藏:

[
{
"data": {
"ApplicationVersion": "1.0.3 (65)",
"ProjectId": 30024,
"TargetId": "4138",
"Timestamp": 0
},
"host": "host1"
},
{
"data": {
"ApplicationVersion": "1.0.3 (65)",
"ProjectId": 30025,
"TargetId": "4139",
"Timestamp": 0
},
"host": "host1"
}
]

有些 Json 字符串是单个对象,如下所示:

{
"ApplicationVersion": "1.0.3 (65)",
"ProjectId": 30026,
"TargetId": "4140",
"Timestamp": 0
}

如果它是第一种类型的 Json 字符串,我希望能够从“数据”键中提取对象并与第二种类型的 Json 组合并形成 RDD/DataFrame,我该如何实现?

最终我希望我的数据框是这样的:

+------------------+---------+--------+---------+
|ApplicationVersion|ProjectId|TargetId|Timestamp|
+------------------+---------+--------+---------+
| 1.0.3 (65)| 30024| 4138| 0|
| 1.0.3 (65)| 30025| 4139| 0|
| 1.0.3 (65)| 30026| 4140| 0|
+------------------+---------+--------+---------+

抱歉,Scala 和 Spark 的新手。我一直在查看现有示例,但遗憾的是没有找到解决方案。

非常感谢。

最佳答案

此示例使用 json4s :

import org.json4s._
import org.json4s.jackson.JsonMethods._

implicit val format = DefaultFormats

case class jsonschema ( ApplicationVersion: String, ProjectId: String, TargetId: String, Timestamp:Int )

val string1 = """
[ {
"data" : {
"ApplicationVersion" : "1.0.3 (65)",
"ProjectId" : 30024,
"TargetId" : "4138",
"Timestamp" : 0
},
"host" : "host1"
}, {
"data" : {
"ApplicationVersion" : "1.0.3 (65)",
"ProjectId" : 30025,
"TargetId" : "4139",
"Timestamp" : 0
},
"host" : "host1"
} ]

"""

val string2 = """
[ {
"ApplicationVersion" : "1.0.3 (65)",
"ProjectId" : 30025,
"TargetId" : "4140",
"Timestamp" : 0
}, {
"ApplicationVersion" : "1.0.3 (65)",
"ProjectId" : 30025,
"TargetId" : "4141",
"Timestamp" : 0
} ]
"""

val json1 = (parse(string1) \ "data").extract[List[jsonschema]]

val json2 = parse(string2).extract[List[jsonschema]]

val jsonRDD = json1.union(json2)

val df = sqlContext.createDataFrame(jsonRDD)

df.show


+------------------+---------+--------+---------+
|ApplicationVersion|ProjectId|TargetId|Timestamp|
+------------------+---------+--------+---------+
| 1.0.3 (65)| 30024| 4138| 0|
| 1.0.3 (65)| 30025| 4139| 0|
| 1.0.3 (65)| 30025| 4140| 0|
| 1.0.3 (65)| 30025| 4141| 0|
+------------------+---------+--------+---------+

关于json - Spark Streaming Scala 将不同结构的json组合成一个DataFrame,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/45100772/

25 4 0