gpt4 book ai didi

java - Spark Streaming 中状态函数的问题

转载 作者:行者123 更新时间:2023-12-01 09:10:00 31 4
gpt4 key购买 nike

我尝试使用 Spark Streaming,并希望拥有一个可以在处理每个批处理后更新的全局状态对象。据我发现,至少有两个选择:1. 使用mapWithState,Spark会在每批处理完成后自动更新状态2.使用updateStateByKey函数,我必须自己调用更新

对我来说,这两种情况都很好,但我两次都遇到相同的错误。这是我针对这两种情况及其错误的示例代码:

    Function3<String, Optional<Integer>, State<Integer>, Tuple2<String, Integer>> mappingFunc =
new Function3<String, Optional<Integer>, State<Integer>, Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> call(String word, Optional<Integer> one,
State<Integer> state) {
int sum = one.orElse(0) + (state.exists() ? state.get() : 0);
Tuple2<String, Integer> output = new Tuple2<>(word, sum);
state.update(sum);
return output;
}
};


JavaMapWithStateDStream<String, Integer, Integer, Tuple2<String, Integer>> stateDstream =
wordsDstream.mapWithState(StateSpec.function(mappingFunc).initialState(initialRDD));;
Tuple2<String, Long> output = new Tuple2<>(word, sum);
state.update(sum);
return new String("Test");
}
});

这个是取自Spark本身提供的例子。遗憾的是,我收到有关 StateSpec.function(...) 的以下错误:

The method function(Function3< KeyType,Optional< ValueType>,State< StateType>,MappedType>) in the type StateSpec is not applicable for the arguments (Function3< String,Optional< Integer>,State< Integer>,Tuple2< String,Integer>>)

使用:

JavaPairDStream<String, Long> runningCounts = processed.updateStateByKey(new Function2<List<Long>, Optional<Long>, Optional<Long>>() {
public Optional<Long> call(List<Long> values, Optional<Long> state) {
Long newSum = state.orElse((long)0);
for(long x : values){
newSum +=x;
}
return Optional.of(newSum);
});

导致类似的错误:

The method updateStateByKey(Function2< List< Long>,Optional< S>,Optional< S>>) in the type JavaPairDStream< String,Long> is not applicable for the arguments (new Function2< List,Optional< Long>,Optional< Long>>(){})

我的导入快照是:

import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.Function3;
import org.apache.spark.api.java.function.Function4;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.api.java.function.VoidFunction2;

我希望有人能帮助我找出我的错误。

最佳答案

还有一点需要补充,如果您使用的是最新的 Spark 2.3.0 版本,请使用下面的包导入Optional来解决相同的问题。

Java代码:

import org.apache.spark.api.java.Optional;

关于java - Spark Streaming 中状态函数的问题,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/40976502/

31 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com