gpt4 book ai didi

JavaInputDStream 不工作

转载 作者:太空宇宙 更新时间:2023-11-04 10:25:24 25 4
gpt4 key购买 nike

我收到以下错误:

 Exception in thread "main" java.lang.AbstractMethodError
at org.apache.spark.internal.Logging$class.initializeLogIfNecessary(Logging.scala:99)
at org.apache.spark.streaming.kafka010.KafkaUtils$.initializeLogIfNecessary(KafkaUtils.scala:40)
at org.apache.spark.internal.Logging$class.log(Logging.scala:46)
at org.apache.spark.streaming.kafka010.KafkaUtils$.log(KafkaUtils.scala:40)
at org.apache.spark.internal.Logging$class.logWarning(Logging.scala:66)
at org.apache.spark.streaming.kafka010.KafkaUtils$.logWarning(KafkaUtils.scala:40)
at org.apache.spark.streaming.kafka010.KafkaUtils$.fixKafkaParams(KafkaUtils.scala:157)
at org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.<init>(DirectKafkaInputDStream.scala:65)
at org.apache.spark.streaming.kafka010.KafkaUtils$.createDirectStream(KafkaUtils.scala:126)
at org.apache.spark.streaming.kafka010.KafkaUtils$.createDirectStream(KafkaUtils.scala:149)
at org.apache.spark.streaming.kafka010.KafkaUtils.createDirectStream(KafkaUtils.scala)
at com.spark.kafka.JavaDirectKafkaWordCount.main(JavaDirectKafkaWordCount.java:50)
18/05/29 18:05:43 INFO SparkContext: Invoking stop() from shutdown hook
18/05/29 18:05:43 INFO SparkUI: Stopped Spark web UI at

代码片段:

我正在 eclipse 中编写一个简单的 kafka - Spark 流代码,以使用 Spark 流来使用来自 kafka 代理的消息。下面是代码

我正在尝试执行相应的代码,它使用 JavaInputDStream 运行良好,之后我收到错误。每个进口都很好。有人可以帮忙吗

 package com.spark.kafka;

import java.util.HashMap;
import java.util.HashSet;
import java.util.Arrays;
import java.util.Map;
import java.util.Set;
import java.util.regex.Pattern;
import scala.Tuple2;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.spark.SparkConf;
import org.apache.spark.streaming.api.java.*;
import org.apache.spark.streaming.kafka010.ConsumerStrategies;
import org.apache.spark.streaming.kafka010.KafkaUtils;
import org.apache.spark.streaming.kafka010.LocationStrategies;
import org.apache.spark.streaming.Durations;

public final class JavaDirectKafkaWordCount {
private static final Pattern SPACE = Pattern.compile(" ");

public static void main(String[] args) throws Exception {

String brokers = "localhost:9092";
String topics = "sparktestone";

SparkConf sparkConf = new SparkConf().setAppName("JavaDirectKafkaWordCount").setMaster("local[*]");
JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, Durations.seconds(2));

Set<String> topicsSet = new HashSet<>(Arrays.asList(topics.split(",")));
Map<String, Object> kafkaParams = new HashMap<>();
kafkaParams.put("bootstrap.servers", brokers);

JavaInputDStream<ConsumerRecord<String, String>> messages = KafkaUtils.createDirectStream(jssc,
LocationStrategies.PreferConsistent(), ConsumerStrategies.Subscribe(topicsSet, kafkaParams));

JavaDStream<String> lines = messages.map(ConsumerRecord::value);
JavaDStream<String> words = lines.flatMap(x -> Arrays.asList(SPACE.split(x)).iterator());
JavaPairDStream<String, Integer> wordCounts = words.mapToPair(s -> new Tuple2<>(s, 1))
.reduceByKey((i1, i2) -> i1 + i2);
wordCounts.print();

jssc.start();
jssc.awaitTermination();
}
}

在 Pom 中我添加了各自的依赖项。

 <dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
<version>2.3.0</version>
<scope>provided</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-streaming-kafka-0-10 -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-10_2.10</artifactId>
<version>2.0.0</version>
</dependency>

最佳答案

我也有同样的问题,在 Spark 2.3 中该方法是抽象的。结论,使用Spark 2.2。

<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
<version>2.2.0</version>
<scope>provided</scope>
</dependency>

关于JavaInputDStream 不工作,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/50585061/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com