gpt4 book ai didi

java - Spark Streaming 中调用 updateStateByKey 出现异常

转载 作者:行者123 更新时间:2023-11-30 02:54:49 24 4
gpt4 key购买 nike

我正在尝试使用 Spark Streaming 编写一个简单的应用程序,以从 Kafka 读取数据,并持续计算从主题读取单词的次数。我在调用非常重要的 updateStateByKey 方法时遇到问题,看起来我遇到了泛型问题,但我不确定出了什么问题。

错误:

The method updateStateByKey(Function2<List<Integer>,Optional<S>,Optional<S>>) 
in the type JavaPairDStream<String,Integer> is not applicable for the arguments
(Function2<List<Integer>,Optional<Integer>,Optional<Integer>>)

我的代码:

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;
import java.util.regex.Pattern;
import java.util.Arrays;
import scala.Tuple2;
import scala.collection.immutable.List;
import org.apache.spark.SparkConf;
import org.apache.spark.streaming.Duration;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaPairReceiverInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka.KafkaUtils;
import com.google.common.base.Optional;


public class SimpleSparkApp {
static String appName = "Streaming";
static String master = "local[*]";
static String zk = "localhost:2181";
static String consumerGroupId = "sparkStreaming";
static String[] topics = {"testTopic", };
static Integer numThreads = new Integer(1);
static final Pattern SPACE = Pattern.compile(" ");
static String checkpointDir = "/tmp";

public static void main(String[] args) {


SparkConf conf = new SparkConf().setAppName(appName).setMaster(master);
JavaStreamingContext jsc = new JavaStreamingContext(conf, new Duration(10000));
jsc.checkpoint(checkpointDir);

Map<String, Integer> topicMap = new HashMap<String, Integer>();
for (String topic: topics) {
topicMap.put(topic, numThreads);
}

JavaPairReceiverInputDStream<String, String> messages =
KafkaUtils.createStream(jsc, zk, consumerGroupId, topicMap);


JavaDStream<String> lines = messages.map(new Function<Tuple2<String, String>, String>() {
@Override
public String call(Tuple2<String, String> tuple2) {
return tuple2._2();
}
});

JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
@Override
public Iterable<String> call(String x) {
return Arrays.asList(SPACE.split(x));
}
});

JavaPairDStream<String, Integer> wordCounts = words.mapToPair(
new PairFunction<String, String, Integer>() {
@Override
public Tuple2<String, Integer> call(String s) {
return new Tuple2<>(s, 1);
}
}).reduceByKey(new Function2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer i1, Integer i2) {
return i1 + i2;
}
});



Function2<List<Integer>,Optional<Integer>,Optional<Integer>> UPDATE_FUNCTION =
new Function2<List<Integer>, Optional<Integer>, Optional<Integer>>() {
@Override
public Optional<Integer> call(List<Integer> values, Optional<Integer> state) {
Integer newSum = state.get();
scala.collection.Iterator<Integer> i = values.iterator();
while(i.hasNext()){
newSum += i.next();
}
runningCount.addAndGet(newSum);
System.out.print("Total number of words: " + String.valueOf(runningCount.get()));
return Optional.of(newSum);
}
};


//ERROR is here
JavaPairDStream<String, Integer> runningCounts =
wordCounts.updateStateByKey(UPDATE_FUNCTION);

runningCounts.print();
jsc.start();
jsc.awaitTermination();
}
}

我认为泛型和与 Scala 交互可能存在问题?当我进入 updateStateByKey 时,我看到一个适当的函数声明,所以我不确定这里缺少什么:

  /**
* Return a new "state" DStream where the state for each key is updated by applying
* the given function on the previous state of the key and the new values of each key.
* Hash partitioning is used to generate the RDDs with Spark's default number of partitions.
* @param updateFunc State update function. If `this` function returns None, then
* corresponding state key-value pair will be eliminated.
* @tparam S State type
*/
def updateStateByKey[S](updateFunc: JFunction2[JList[V], Optional[S], Optional[S]])
: JavaPairDStream[K, S] = {
implicit val cm: ClassTag[S] = fakeClassTag
dstream.updateStateByKey(convertUpdateStateFunction(updateFunc))
}

最佳答案

问题已解决 - 结果是我导入了错误的 List 和 Iterator 类(我责怪 Eclipse):

注释掉:

//import scala.collection.immutable.List;
//import scala.collection.Iterator;

添加于:

import java.util.Iterator;
import java.util.List;

更新功能略有改变:

Function2<List<Integer>,Optional<Integer>,Optional<Integer>> UPDATE_FUNCTION =
new Function2<List<Integer>, Optional<Integer>, Optional<Integer>>() {
@Override
public Optional<Integer> call(List<Integer> values, Optional<Integer> state) {
Integer newSum = state.get();
Iterator<Integer> i = values.iterator();
while(i.hasNext()){
newSum += i.next();
}
runningCount.addAndGet(newSum);
System.out.print("Total number of words: " + String.valueOf(runningCount.get()));
return Optional.of(newSum);
}
};

关于java - Spark Streaming 中调用 updateStateByKey 出现异常,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/37572413/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com