gpt4 book ai didi

scala - 线程 "main"java.lang.NoClassDefFoundError : com/fasterxml/jackson/databind/Module in Intellij while running Kafka App? 中的异常

转载 作者:行者123 更新时间:2023-12-04 10:57:26 25 4
gpt4 key购买 nike

我一直在开发一个应用程序,该应用程序使用 Websocket API 在线收集实时流比特币交易数据并在旅途中对其进行分析。我创建了 Kafka 生产者来从特定主题收集数据。我正在使用 Intellij、Scala 和 Kafka 来开发应用程序。

因为我正在研究 Kafka 制作人部分。

我的代码文件结构是 src/scala/main/coinyser。

这是第一个文件(src/scala/main/coinyser/StreamingProducer.scala):

package coinyser

import java.sql.Timestamp
import java.text.SimpleDateFormat
import java.util.TimeZone

import cats.effect.IO
import com.fasterxml.jackson.databind._
import com.fasterxml.jackson.module.scala.DefaultScalaModule
import com.pusher.client.Client
import com.pusher.client.channel.SubscriptionEventListener
import com.typesafe.scalalogging.StrictLogging

object StreamingProducer extends StrictLogging {

def subscribe(pusher: Client)(onTradeReceived: String => Unit): IO[Unit] = {
for {
_ <- IO(pusher.connect())
channel <- IO(pusher.subscribe("live_trades"))

_ <- IO(channel.bind("trade", new SubscriptionEventListener() {
override def onEvent(channel: String, event: String, data:
String): Unit = {
logger.info(s"Received event: $event with data: $data")
onTradeReceived(data)
}
}))
} yield ()
}

val mapper: ObjectMapper = {
// println("I'm running!")
// val m = new ObjectMapper()
val m = new ObjectMapper().registerModule(DefaultScalaModule)
val sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
sdf.setTimeZone(TimeZone.getTimeZone("UTC"))
m.setDateFormat(sdf)
}

def deserializeWebsocketTransaction(s: String): WebsocketTransaction = {
mapper.readValue(s, classOf[WebsocketTransaction])
}

def convertWsTransaction(wsTx: WebsocketTransaction): Transaction =
Transaction(
timestamp = new Timestamp(wsTx.timestamp.toLong * 1000), tid =
wsTx.id, price = wsTx.price, sell = wsTx.`type` == 1, amount =
wsTx.amount)

def serializeTransaction(tx: Transaction): String =
mapper.writeValueAsString(tx)
}

这是第二个文件(src/main/scala/coinyser/StreamingProducerApp.scala):
package coinyser

import cats.effect.{ExitCode, IO, IOApp}
import com.pusher.client.Pusher
import StreamingProducer._
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
import scala.collection.JavaConversions._

object StreamingProducerApp extends IOApp {
val topic = "transactions"

val pusher = new Pusher("de504dc5763aeef9ff52")

val props = Map(
"bootstrap.servers" -> "localhost:9092",
"key.serializer" ->
"org.apache.kafka.common.serialization.IntegerSerializer",
"value.serializer" ->
"org.apache.kafka.common.serialization.StringSerializer")

def run(args: List[String]): IO[ExitCode] = {
val kafkaProducer = new KafkaProducer[Int, String](props)

subscribe(pusher) { wsTx =>
val tx = convertWsTransaction(deserializeWebsocketTransaction(wsTx))
val jsonTx = serializeTransaction(tx)
kafkaProducer.send(new ProducerRecord(topic, tx.tid, jsonTx))
}.flatMap(_ => IO.never)
}
}

当我尝试运行 StreamingProducerApp.scala 以流式传输实时数据时,出现此错误:
log4j:WARN No appenders could be found for logger (org.apache.kafka.clients.producer.ProducerConfig).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
Exception in thread "main" java.lang.NoClassDefFoundError: com/fasterxml/jackson/databind/Module
at coinyser.StreamingProducerApp$.run(StreamingProducerApp.scala:24)
at cats.effect.IOApp$$anonfun$main$2.apply(IOApp.scala:68)
at cats.effect.IOApp$$anonfun$main$2.apply(IOApp.scala:68)
at cats.effect.internals.IOAppPlatform$$anonfun$mainFiber$2.apply(IOAppPlatform.scala:43)
at cats.effect.internals.IOAppPlatform$$anonfun$mainFiber$2.apply(IOAppPlatform.scala:42)
at cats.effect.internals.IORunLoop$.liftedTree3$1(IORunLoop.scala:217)
at cats.effect.internals.IORunLoop$.step(IORunLoop.scala:217)
at cats.effect.IO.unsafeRunTimed(IO.scala:317)
at cats.effect.IO.unsafeRunSync(IO.scala:251)
at cats.effect.internals.IOAppPlatform$.main(IOAppPlatform.scala:28)
at cats.effect.IOApp$class.main(IOApp.scala:68)
at coinyser.StreamingProducerApp$.main(StreamingProducerApp.scala:9)
at coinyser.StreamingProducerApp.main(StreamingProducerApp.scala)
Caused by: java.lang.ClassNotFoundException: com.fasterxml.jackson.databind.Module
at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:355)
at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
... 13 more

Process finished with exit code 1

build.sbt 文件:
name := "bitcoin-analyser"

version := "0.1"

scalaVersion := "2.11.11"
val sparkVersion = "2.3.1"

libraryDependencies ++= Seq(
"org.apache.spark" %% "spark-core" % sparkVersion % Provided,
"org.apache.spark" %% "spark-core" % sparkVersion % Test classifier "tests",
"org.apache.spark" %% "spark-core" % sparkVersion % Test classifier "test-sources",
"org.apache.spark" %% "spark-sql" % sparkVersion % Provided,
"org.apache.spark" %% "spark-sql" % sparkVersion % Test classifier "tests",
"org.apache.spark" %% "spark-sql" % sparkVersion % Test classifier "test-sources",
"org.apache.spark" %% "spark-catalyst" % sparkVersion % Test classifier "tests",
"org.apache.spark" %% "spark-catalyst" % sparkVersion % Test classifier "test-sources",
"com.typesafe.scala-logging" %% "scala-logging" % "3.9.0",
"org.scalatest" %% "scalatest" % "3.0.4" % "test",
"org.typelevel" %% "cats-core" % "1.1.0",
"org.typelevel" %% "cats-effect" % "1.0.0-RC2",
"org.apache.spark" %% "spark-streaming" % sparkVersion % Provided,
"org.apache.spark" %% "spark-sql-kafka-0-10" % sparkVersion,
"com.pusher" % "pusher-java-client" % "1.8.0",
"org.apache.hadoop" % "hadoop-common" % "2.3.0",
"org.apache.kafka" % "kafka-clients" % "1.1.1")

scalacOptions += "-Ypartial-unification"

// Avoids SI-3623
target := file("/tmp/sbt/bitcoin-analyser")

assemblyOption in assembly := (assemblyOption in assembly).value.copy(includeScala = false)
test in assembly := {}

assemblyMergeStrategy in assembly := {
case PathList("META-INF", xs @ _*) => MergeStrategy.discard
case x => MergeStrategy.first
}

mainClass in assembly := Some("coinyser.BatchProducerAppSpark")

谁能帮我找出为什么会发生此错误并告诉我解决方案?提前致谢。

最佳答案

spark-sql-kafka-0-10已经包含 kafka-clients ,这反过来每个导入 Jackson ,并为它们声明不同的版本将有这样的类路径异常

虽然,不清楚为什么你甚至需要 Spark 或 Hadoop,因为你只是使用普通的 Kafka Producer API 并且从不初始化 Spark 对象

关于scala - 线程 "main"java.lang.NoClassDefFoundError : com/fasterxml/jackson/databind/Module in Intellij while running Kafka App? 中的异常,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/59090222/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com