gpt4 book ai didi

json - 在spark-streaming中解析json

转载 作者:行者123 更新时间:2023-12-02 23:04:11 26 4
gpt4 key购买 nike

我对 Spark 还很陌生,我正在尝试从 kafka 主题接收结构为 json 的 DStream,并且我想解析每个 json 的内容。我收到的 json 是这样的:

{"type":"position","ident":"IBE32JZ","air_ground":"A","alt":"34000","clock":"1409733420","id":"IBE32JZ-1409715361-ed-0002:0","gs":"446","heading":"71","lat":"44.50987","lon":"2.98972","reg":"ECJRE","squawk":"1004","updateType":"A","altChange":" "}

我只是尝试提取 ident 字段,至少现在是这样,并且我正在使用 lift-json 库来解析数据。我的程序如下所示:

object ScalaExample {
val kafkaHost = "localhost"
val kafkaPort = 9092
val zookeeperHost = "localhost"
val zookeeperPort = 2181

implicit val formats = DefaultFormats
case class PlaneInfo(ident: String)


def parser(json: String): String = {
val parsedJson = parse(json)
val m = paso1.extract[PlaneInfo]
return m.ident
}

def main(args : Array[String]) {
val zkQuorum = "localhost:2181"
val group = "myGroup"
val topic = Map("flightStatus" -> 1)
val sparkContext = new SparkContext("local[4]", "KafkaConsumer")
val ssc = new StreamingContext(sparkContext, Seconds(10))


val json = KafkaUtils.createStream(ssc, zkQuorum, group, topic)

val id = json.map(_._2).map(parser)

id.print

ssc.start()
}
}

但它引发了我以下异常:

java.lang.NoClassDefFoundError: scala/reflect/ClassManifest
at net.liftweb.json.JsonAST$JValue.extract(JsonAST.scala:300)
at aero.catec.stratio.ScalaExample$.parser(ScalaExample.scala:33)
at aero.catec.stratio.ScalaExample$$anonfun$2.apply(ScalaExample.scala:48)
at aero.catec.stratio.ScalaExample$$anonfun$2.apply(ScalaExample.scala:48)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$$anon$10.next(Iterator.scala:312)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
at scala.collection.AbstractIterator.to(Iterator.scala:1157)
at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
at org.apache.spark.rdd.RDD$$anonfun$28.apply(RDD.scala:1003)
at org.apache.spark.rdd.RDD$$anonfun$28.apply(RDD.scala:1003)
at org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1083)
at org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1083)
at org.apache.spark.scheduler.DAGScheduler.runLocallyWithinThread(DAGScheduler.scala:575)
at org.apache.spark.scheduler.DAGScheduler$$anon$1.run(DAGScheduler.scala:560)
Caused by: java.lang.ClassNotFoundException: scala.reflect.ClassManifest
at java.net.URLClassLoader$1.run(URLClassLoader.java:372)
at java.net.URLClassLoader$1.run(URLClassLoader.java:361)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:360)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)

问题是,如果不使用 Spark(从文件中读取)运行相同的操作,则它可以完美运行。当我尝试将其放入 Spark 程序时,问题就开始了。另外,如果我将解析器函数更改为如下所示:

def parser(json: String): JValue = {
val parsedJson = parse(json)
return (parsedJson \\ "ident")
}

它也有效。但是当我尝试提取实际的字符串时,我得到了同样的错误。

感谢您的帮助。我希望我已经解释清楚了。

最佳答案

发生这种情况是因为您缺少序列化/反序列化记录所需的 scala 反射依赖项。尝试添加与spark版本匹配的scala Reflect jar。

提示:“org.scala-lang”%“scala-reflect”%Version.scala

关于json - 在spark-streaming中解析json,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/25643872/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com