- android - 多次调用 OnPrimaryClipChangedListener
- android - 无法更新 RecyclerView 中的 TextView 字段
- android.database.CursorIndexOutOfBoundsException : Index 0 requested, 光标大小为 0
- android - 使用 AppCompat 时,我们是否需要明确指定其 UI 组件(Spinner、EditText)颜色
我正在尝试使用 SSL 进行 Kafka-Spark 集成。我已经在启用 SSL 的情况下测试了 Kafka,它在示例消费者和生产者中工作得很好。
此外,我已经尝试了 Spark - Kafka 的集成,当在 spark-job 中没有 SSL 时,它也可以正常工作.
现在,当我在 spark-job 中启用 SSL 时,出现异常并且集成不起作用。
我为在 spark-job 中启用 SSL 所做的唯一更改是在我的工作中包含以下代码行:
sparkConf.set("security.protocol", "SSL");
sparkConf.set("ssl.truststore.location", "PATH/truststore.jks");
sparkConf.set("ssl.truststore.password", "passwrd");
sparkConf.set("ssl.keystore.location", "PATH/keystore.jks");
sparkConf.set("ssl.keystore.password", "kstore");
sparkConf.set("ssl.key.password", "keypass");
而这个sparkConf是在创建streaming context时传递的。
JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new Duration(10000));
当我运行作业时,我得到的错误如下:
17/05/24 18:16:39 WARN ConsumerFetcherManager$LeaderFinderThread: [test-consumer-group_bmj-cluster-1495664195784-5f49cbd0-leader-finder-thread], Failed to find leader for Set([bell,0])
java.lang.NullPointerException
at org.apache.kafka.common.utils.Utils.formatAddress(Utils.java:312)
at kafka.cluster.Broker.connectionString(Broker.scala:62)
at kafka.client.ClientUtils$$anonfun$fetchTopicMetadata$5.apply(ClientUtils.scala:89)
at kafka.client.ClientUtils$$anonfun$fetchTopicMetadata$5.apply(ClientUtils.scala:89)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:89)
at kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:66)
at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)
Kafka 版本 - 2.11-0.10.2.0
Spark 版本 - 2.1.0
Scala 版本 - 2.11.8
流媒体库
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-streaming_2.10 -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
<version>2.1.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-streaming-kafka_2.10 -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-8_2.11</artifactId>
<version>2.1.0</version>
</dependency>
关于克服这个问题有什么帮助吗?
最佳答案
通过深入挖掘,我能够找出我遇到的问题。
首先,为了启用SSL相关的SSL,kafka-params需要传入KafkaUtils.createDirectStream()方法和不是JavaStreamingContext 的sparkConf。
然后,给定的 SSL 参数:
"security.protocol", "SSL"
"ssl.truststore.location", "PATH/truststore.jks"
"ssl.truststore.password", "passwrd"
"ssl.keystore.location", "PATH/keystore.jks"
"ssl.keystore.password", "kstore"
"ssl.key.password", "keypass"
我使用的 spark-kafka-streaming 版本“0-8_2.11”不支持,因此我不得不将其更改为版本“0-10_2.11”。
这反过来对方法进行了完整的 API 更改:KafkaUtils.createDirectStream() 用于连接到 Kafka。
文档中给出了有关如何使用它的解释 here .
所以我连接到 Kafka 的最终代码片段如下所示:
final JavaInputDStream<ConsumerRecord<String, String>> stream =
KafkaUtils.createDirectStream(
javaStreamingContext,
LocationStrategies.PreferConsistent(),
ConsumerStrategies.<String, String>Subscribe(topicsCollection, kafkaParams)
);
kafka-params 是一个包含所有 SSL 参数的映射。
谢谢
沙比尔
关于apache-spark - 无法使用 Kafka-Spark 集成找到 Set([topic,0]) 的领导者,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/44169502/
根据槌documentation ,可以逐步训练主题模型: "-output-model [FILENAME] This option specifies a file to write a seri
下面我创建了一个完整的可重现示例来计算给定 DataFrame 的主题模型。 import numpy as np import pandas as pd data = pd.DataFrame({
在启动 Kafka 时出现以下错误。 KeeperErrorCode = NoNode for /brokers/topics/test-topic/partitions 任何帮助将不胜感激。 最佳答
我是这个范例的新手,所以请容忍我的愚蠢。我开始阅读这个主题是因为我正在构建一个物联网系统,该系统将使用消息队列来允许设备相互通信。 我目前正在使用 Azure 的服务总线,但是我相信我的问题通常适用于
我有三个 Kafka 经纪人( kafka_2.11-0.10.0.0 )每个经纪人的安全配置如下, listeners=PLAINTEXT://xxxx:9093,SASL_PLAINTEXT://
我使用apache-rocketmq发送消息,但出现异常。我尝试了csdn上的很多解决方案,但不起作用。现在我不知道如何处理它。 这是一个 Linux 服务器,运行 Rocketmq 4.2.0、ja
我收集了一些文档,其中大部分都是关于同一主题的,其余的基本上都是随机主题。我希望将文档分类为它们是关于“多数主题”还是这些随机“少数主题”之一。如果我在这个只有 2 个主题的语料库上使用主题建模算法会
我正在使用 gensim 包中的 LDA 算法来查找给定文本中的主题。 我被问到生成的主题将包含每个主题的不同词,例如,如果主题 A 中包含“猴子”一词,那么其他主题的列表中不应包含“猴子”一词。 到
我想使用 htaccess 更改我网站的 URL URL currently: example.com/index.php?p=my-topic-title I want to be like : e
我正在学习 Spring Boot 并制作了一个演示,但是当我发布一个添加对象的请求时,它不起作用! 错误信息是: { "timestamp": 1516897619316, "sta
背景 我们公司有由 Zookeeper 管理的 Apache Kafka。我们的 Spring Boot 应用程序之一需要检查所有可用主题的列表,并列出哪些主题启用了日志压缩 (cleanup.pol
在我的工作场所,我偶然发现了以下需要我解决的问题。尽管不是绝对需要,但首选解决方案。 有一个包含一组故事的数据库,每个故事都有一组与之关联的主题。主题存储在单独的表中,格式为 (storyid, to
我有一个由主题名称、描述组成的 Pandas 数据框 我正在尝试将其转换为主题名称、单词(出现在描述列中)及其频率 我已经试过了,但它只给了我描述的单词和频率,而不是主题名称级别。 import nl
我尝试使用以下命令删除现有主题(我使用 kafka 管理控制台检查过); #./kafka-topics.sh --zookeeper zookeeper.xx.com:2181/chroot --d
试图在一台机器上运行 Kafka 生产者,在另一台机器上运行消费者。 设置以下属性: 广告.host.name 广告端口 但是在控制台消费者上收到以下错误: bin/kafka-console-con
我正在使用 Gmail API,并尝试使用 Python 3.9 设置推送通知。当我尝试在 Gmail 收件箱上调用 watch() 时,出现错误,即使我已遵循针对类似问题给出的所有建议。错误如下:
我希望构建一个主题 map 来对内容进行分类。 例如,主题“艺术”可能有“艺术史”、“绘画”、“雕塑”等子类别。 我已经抓取了一些在线资源,但遇到了一个与我希望如何使用层次结构有关的问题。 我有很多内
我想在我的 Java 中编译 mallet(而不是使用命令行),所以我将 jar 包含在我的项目中,并引用了来自以下示例的代码:http://mallet.cs.umass.edu/topics-de
我正在尝试根据父级中的记录内容从kafka中的一个主题(父级)写入另一个主题(子级)。如果我从父主题消费,则示例记录为 {"date":{"string":"2017-03-20"},"time":{
我有一个 Kafka 生产者类,运行良好。生产者填充了 Kafka 主题。其代码如下: public class kafka_test { private final static String TO
我是一名优秀的程序员,十分优秀!