gpt4 book ai didi

apache-spark - 无法使用 Kafka-Spark 集成找到 Set([topic,0]) 的领导者

转载 作者:太空宇宙 更新时间:2023-11-03 14:39:48 25 4
gpt4 key购买 nike

我正在尝试使用 SSL 进行 Kafka-Spark 集成。我已经在启用 SSL 的情况下测试了 Kafka,它在示例消费者和生产者中工作得很好。

此外,我已经尝试了 Spark - Kafka 的集成,当在 spark-job 中没有 SSL 时,它也可以正常工作.

现在,当我在 spark-job 中启用 SSL 时,出现异常并且集成不起作用。

我为在 spark-job 中启用 SSL 所做的唯一更改是在我的工作中包含以下代码行:

    sparkConf.set("security.protocol", "SSL");
sparkConf.set("ssl.truststore.location", "PATH/truststore.jks");
sparkConf.set("ssl.truststore.password", "passwrd");
sparkConf.set("ssl.keystore.location", "PATH/keystore.jks");
sparkConf.set("ssl.keystore.password", "kstore");
sparkConf.set("ssl.key.password", "keypass");

而这个sparkConf是在创建streaming context时传递的。

JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new Duration(10000));

当我运行作业时,我得到的错误如下:

17/05/24 18:16:39 WARN ConsumerFetcherManager$LeaderFinderThread: [test-consumer-group_bmj-cluster-1495664195784-5f49cbd0-leader-finder-thread], Failed to find leader for Set([bell,0])
java.lang.NullPointerException
at org.apache.kafka.common.utils.Utils.formatAddress(Utils.java:312)
at kafka.cluster.Broker.connectionString(Broker.scala:62)
at kafka.client.ClientUtils$$anonfun$fetchTopicMetadata$5.apply(ClientUtils.scala:89)
at kafka.client.ClientUtils$$anonfun$fetchTopicMetadata$5.apply(ClientUtils.scala:89)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:89)
at kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:66)
at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)

Kafka 版本 - 2.11-0.10.2.0
Spark 版本 - 2.1.0
Scala 版本 - 2.11.8

流媒体库

  <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-streaming_2.10 -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
<version>2.1.0</version>
</dependency>

<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-streaming-kafka_2.10 -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-8_2.11</artifactId>
<version>2.1.0</version>
</dependency>

关于克服这个问题有什么帮助吗?

最佳答案

通过深入挖掘,我能够找出我遇到的问题。

首先,为了启用SSL相关的SSL,kafka-params需要传入KafkaUtils.createDirectStream()方法和不是JavaStreamingContextsparkConf

然后,给定的 SSL 参数:

"security.protocol", "SSL"
"ssl.truststore.location", "PATH/truststore.jks"
"ssl.truststore.password", "passwrd"
"ssl.keystore.location", "PATH/keystore.jks"
"ssl.keystore.password", "kstore"
"ssl.key.password", "keypass"

我使用的 spark-kafka-streaming 版本“0-8_2.11”不支持,因此我不得不将其更改为版本“0-10_2.11”。

这反过来对方法进行了完整的 API 更改:KafkaUtils.createDirectStream() 用于连接到 Kafka。

文档中给出了有关如何使用它的解释 here .

所以我连接到 Kafka 的最终代码片段如下所示:

final JavaInputDStream<ConsumerRecord<String, String>> stream =
KafkaUtils.createDirectStream(
javaStreamingContext,
LocationStrategies.PreferConsistent(),
ConsumerStrategies.<String, String>Subscribe(topicsCollection, kafkaParams)
);

kafka-params 是一个包含所有 SSL 参数的映射。

谢谢
沙比尔

关于apache-spark - 无法使用 Kafka-Spark 集成找到 Set([topic,0]) 的领导者,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/44169502/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com