gpt4 book ai didi

json - 将 JSON 对象转换为 RDD

转载 作者:行者123 更新时间:2023-12-02 04:04:40 26 4
gpt4 key购买 nike

我不知道这个问题是否重复,但不知何故,我遇到的所有答案似乎都不适合我(也许我做错了什么)。

我有一个这样定义的类:

case class myRec(
time: String,
client_title: String,
made_on_behalf: Double,
country: String,
email_address: String,
phone: String)

以及包含表单中的记录或对象的示例 Json 文件

[{...}{...}{...}...] 

[{"time": "2015-05-01 02:25:47",
"client_title": "Mr.",
"made_on_behalf": 0,
"country": "Brussel",
"email_address": "15e29034@gmail.com"},
{"time": "2015-05-01 04:15:03",
"client_title": "Mr.",
"made_on_behalf": 0,
"country": "Bundesliga",
"email_address": "aae665d95c5d630@aol.com"},
{"time": "2015-05-01 06:29:18",
"client_title": "Mr.",
"made_on_behalf": 0,
"country": "Japan",
"email_address": "fef412c714ff@yahoo.com"}...]

我的build.sbtlibraryDependency += "com.owlike"% "genson-scala_2.11"% "1.3" for scalaVersion := "2.11.7",

我有一个这样定义的 scala 函数

//PS: Other imports already made
import com.owlike.genson.defaultGenson_

//PS: Spark context already defined
def prepData(infile:String):RDD[myRec] = {

val input = sc.textFile(infile)
//Read Json Data into my Record Case class
input.mapPartitions( records =>
records.map( record => fromJson[myRec](record))
)}

我正在调用该函数

prepData("file://path/to/abc.json")

有什么办法可以做到这一点,或者有没有其他 Json 库可以用来转换为 RDD

我也尝试过这个,但似乎都不起作用

Using ScalaObjectMapper

PS:我不想通过spark SQL来处理json文件

谢谢!

最佳答案

Jyd,不使用 Spark SQL for JSON 是一个有趣的选择,但它非常可行。 Learning Spark 书中的示例中有一个如何执行此操作的示例(免责声明,我是合著者之一,因此有点偏见)。示例位于 github https://github.com/databricks/learning-spark ,但这是相关的代码片段:

case class Person(name: String, lovesPandas: Boolean) // Note: must be a top level class

object BasicParseJsonWithJackson {

def main(args: Array[String]) {
if (args.length < 3) {
println("Usage: [sparkmaster] [inputfile] [outputfile]")
exit(1)
}
val master = args(0)
val inputFile = args(1)
val outputFile = args(2)
val sc = new SparkContext(master, "BasicParseJsonWithJackson", System.getenv("SPARK_HOME"))
val input = sc.textFile(inputFile)

// Parse it into a specific case class. We use mapPartitions beacuse:
// (a) ObjectMapper is not serializable so we either create a singleton object encapsulating ObjectMapper
// on the driver and have to send data back to the driver to go through the singleton object.
// Alternatively we can let each node create its own ObjectMapper but that's expensive in a map
// (b) To solve for creating an ObjectMapper on each node without being too expensive we create one per
// partition with mapPartitions. Solves serialization and object creation performance hit.
val result = input.mapPartitions(records => {
// mapper object created on each executor node
val mapper = new ObjectMapper with ScalaObjectMapper
mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)
mapper.registerModule(DefaultScalaModule)
// We use flatMap to handle errors
// by returning an empty list (None) if we encounter an issue and a
// list with one element if everything is ok (Some(_)).
records.flatMap(record => {
try {
Some(mapper.readValue(record, classOf[Person]))
} catch {
case e: Exception => None
}
})
}, true)
result.filter(_.lovesPandas).mapPartitions(records => {
val mapper = new ObjectMapper with ScalaObjectMapper
mapper.registerModule(DefaultScalaModule)
records.map(mapper.writeValueAsString(_))
})
.saveAsTextFile(outputFile)
}
}

请注意,这使用 Jackson(特别是 "com.fasterxml.jackson.core"% "jackson-databind"% "2.3.3" & "com.fasterxml.jackson.module"%“jackson-module-scala_2.10”%“2.3.3”依赖项)。

我刚刚注意到你的问题有一些示例输入,并且正如 @zero323 指出的那样,逐行解析不起作用。相反,你会这样做:

    val input = sc.wholeTextFiles(inputFile).map(_._2)

// Parse it into a specific case class. We use mapPartitions beacuse:
// (a) ObjectMapper is not serializable so we either create a singleton object encapsulating ObjectMapper
// on the driver and have to send data back to the driver to go through the singleton object.
// Alternatively we can let each node create its own ObjectMapper but that's expensive in a map
// (b) To solve for creating an ObjectMapper on each node without being too expensive we create one per
// partition with mapPartitions. Solves serialization and object creation performance hit.
val result = input.mapPartitions(records => {
// mapper object created on each executor node
val mapper = new ObjectMapper with ScalaObjectMapper
mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)
mapper.registerModule(DefaultScalaModule)
// We use flatMap to handle errors
// by returning an empty list (None) if we encounter an issue and a
// list with one element if everything is ok (List(_)).
records.flatMap(record => {
try {
mapper.readValue(record, classOf[List[Person]])
} catch {
case e: Exception => None
}
})
})

关于json - 将 JSON 对象转换为 RDD,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/32383380/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com