gpt4 book ai didi

json - NotSerializableException与Spark上的json4s

转载 作者:行者123 更新时间:2023-12-04 03:58:01 26 4
gpt4 key购买 nike

基本上,我必须使用Spark分析HDFS上的一些复杂的JSON。

我使用“for comprehensions”(预)过滤JSON的“extract”方法
json4s将其包装到case类中

这个很好用!

def foo(rdd: RDD[String]) = {

case class View(C: String,b: Option[Array[List[String]]], t: Time)
case class Time($numberLong: String)
implicit val formats = DefaultFormats

rdd.map { jsonString =>
val jsonObj = parse(jsonString)
val listsOfView = for {
JObject(value) <- jsonObj
JField(("v"), JObject(views)) <- value
normalized <- views.map(x => (x._2))
} yield normalized
}

到现在为止还挺好!

当我尝试将(预)过滤的JSON提取到我的
CaseClass我得到这个:

Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task not serializable: java.io.NotSerializableException: org.json4s.DefaultFormats$



这是提取代码:
def foo(rdd: RDD[String]) = {

case class View(C: String,b: Option[Array[List[String]]], t: Time)
case class Time($numberLong: String)
implicit val formats = DefaultFormats

rdd.map { jsonString =>
val jsonObj = parse(jsonString)
val listsOfView = for {
JObject(value) <- jsonObj
JField(("v"), JObject(views)) <- value
normalized <- views.map(x => (x._2))
} yield normalized.extract[View]
}

我已经在scala ws上尝试了我的代码,它的工作原理是什么!我在使用hdfs和spark的事情上真的很新,所以我将不胜感激。

最佳答案

Spark对RDD转换的闭包进行序列化,然后将其“运送”给工作人员以进行分布式执行。
这就要求闭包中的所有代码(通常也包含对象中的所有代码)都应该可序列化。

看一下org.json4s.DefaultFormat$的含义(该特征的伴随对象):

object DefaultFormats extends DefaultFormats {
val losslessDate = new ThreadLocal(new java.text.SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'"))
val UTC = TimeZone.getTimeZone("UTC")

}

显然,该对象不可序列化,无法实现。 (ThreadLocal本质上是不可序列化的)

您似乎没有在代码上使用 Date类型,因此可以摆脱 implicit val formats = DefaultFormats或将DefaultFormats替换为可序列化的内容?

关于json - NotSerializableException与Spark上的json4s,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/24786377/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com