gpt4 book ai didi

apache-kafka - CDH 5.8.3 中的 Spark Streaming Kafka 以 yarn-cluster 模式集成

转载 作者:行者123 更新时间:2023-12-02 03:08:01 25 4
gpt4 key购买 nike

我在运行从 Kafka 读取的 Spark Streaming 作业时遇到了一个奇怪的问题。我使用的是 CDH 5.8.3 发行版:Spark 版本是 1.6.0,Kafka 版本是 0.9.0。

我的代码很简单:

val kafkaParams = Map[String, String]("bootstrap.servers" -> brokersList, "auto.offset.reset" -> "smallest")
KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, Set(kafkaTopic))

如果我在 yarn-client 模式下运行它,我没有错误。如果我在 yarn-cluster 模式下运行程序,我会收到异常。我的启动命令是:

spark-submit --master yarn-cluster --files /etc/hbase/conf/hbase-site.xml --num-executors 5 --executor-memory 4G --jars (somejars for HBase interaction) --class mypackage.MyClass myJar.jar

但是我收到了这个错误:

java.lang.ClassCastException: kafka.cluster.Broker cannot be cast to kafka.cluster.BrokerEndPoint
at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$2$$anonfun$3$$anonfun$apply$6$$anonfun$apply$7.apply(KafkaCluster.scala:90)
at scala.Option.map(Option.scala:145)
at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$2$$anonfun$3$$anonfun$apply$6.apply(KafkaCluster.scala:90)
at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$2$$anonfun$3$$anonfun$apply$6.apply(KafkaCluster.scala:87)
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:34)
at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251)
at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105)
at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$2$$anonfun$3.apply(KafkaCluster.scala:87)
at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$2$$anonfun$3.apply(KafkaCluster.scala:86)
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
at scala.collection.immutable.Set$Set1.foreach(Set.scala:74)
at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251)
at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105)
at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$2.apply(KafkaCluster.scala:86)
at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$2.apply(KafkaCluster.scala:85)
at scala.util.Either$RightProjection.flatMap(Either.scala:523)
at org.apache.spark.streaming.kafka.KafkaCluster.findLeaders(KafkaCluster.scala:85)
at org.apache.spark.streaming.kafka.KafkaCluster.getLeaderOffsets(KafkaCluster.scala:179)
at org.apache.spark.streaming.kafka.KafkaCluster.getLeaderOffsets(KafkaCluster.scala:161)
at org.apache.spark.streaming.kafka.KafkaCluster.getEarliestLeaderOffsets(KafkaCluster.scala:155)
at org.apache.spark.streaming.kafka.KafkaUtils$$anonfun$5.apply(KafkaUtils.scala:213)
at org.apache.spark.streaming.kafka.KafkaUtils$$anonfun$5.apply(KafkaUtils.scala:211)
at scala.util.Either$RightProjection.flatMap(Either.scala:523)
at org.apache.spark.streaming.kafka.KafkaUtils$.getFromOffsets(KafkaUtils.scala:211)
at org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:484)
at myPackage.Ingestion$.createStreamingContext(Ingestion.scala:120)
at myPackage.Ingestion$$anonfun$1.apply(Ingestion.scala:55)
at myPackage.Ingestion$$anonfun$1.apply(Ingestion.scala:55)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.streaming.StreamingContext$.getOrCreate(StreamingContext.scala:864)
at myPackage.Ingestion$.main(Ingestion.scala:55)
at myPackage.Ingestion.main(Ingestion.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:542)

网上冲浪我最终认为这是一个版本问题,但我无法弄清楚为什么会发生这种情况,因为在 yarn-client 和 yarn-cluster 模式下运行的 jar 是相同的。

你有什么想法吗?

谢谢,马可

最佳答案

看起来 Spark streaming 1.6 与 Kafka 0.8 兼容(参见 documentation)

我猜你使用的是 Kafka 客户端 0.9,它是从你的 jar 中以 client 模式获取的,但是当你切换到 cluster 模式时默认的 Kafka 客户端使用 (0.8.2.1)。

我说的对吗?如果是这样,您可以尝试从构建中删除 kafka 客户端依赖项并使用 spark-streaming-kafka 提供的默认客户端吗? (0.8 客户应与 0.9 经纪人合作)。

关于apache-kafka - CDH 5.8.3 中的 Spark Streaming Kafka 以 yarn-cluster 模式集成,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/41490956/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com