- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我正在尝试使用 Spark Streaming 编写一个简单的应用程序,以从 Kafka 读取数据,并持续计算从主题读取单词的次数。我在调用非常重要的 updateStateByKey 方法时遇到问题,看起来我遇到了泛型问题,但我不确定出了什么问题。
错误:
The method updateStateByKey(Function2<List<Integer>,Optional<S>,Optional<S>>)
in the type JavaPairDStream<String,Integer> is not applicable for the arguments
(Function2<List<Integer>,Optional<Integer>,Optional<Integer>>)
我的代码:
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;
import java.util.regex.Pattern;
import java.util.Arrays;
import scala.Tuple2;
import scala.collection.immutable.List;
import org.apache.spark.SparkConf;
import org.apache.spark.streaming.Duration;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaPairReceiverInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka.KafkaUtils;
import com.google.common.base.Optional;
public class SimpleSparkApp {
static String appName = "Streaming";
static String master = "local[*]";
static String zk = "localhost:2181";
static String consumerGroupId = "sparkStreaming";
static String[] topics = {"testTopic", };
static Integer numThreads = new Integer(1);
static final Pattern SPACE = Pattern.compile(" ");
static String checkpointDir = "/tmp";
public static void main(String[] args) {
SparkConf conf = new SparkConf().setAppName(appName).setMaster(master);
JavaStreamingContext jsc = new JavaStreamingContext(conf, new Duration(10000));
jsc.checkpoint(checkpointDir);
Map<String, Integer> topicMap = new HashMap<String, Integer>();
for (String topic: topics) {
topicMap.put(topic, numThreads);
}
JavaPairReceiverInputDStream<String, String> messages =
KafkaUtils.createStream(jsc, zk, consumerGroupId, topicMap);
JavaDStream<String> lines = messages.map(new Function<Tuple2<String, String>, String>() {
@Override
public String call(Tuple2<String, String> tuple2) {
return tuple2._2();
}
});
JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
@Override
public Iterable<String> call(String x) {
return Arrays.asList(SPACE.split(x));
}
});
JavaPairDStream<String, Integer> wordCounts = words.mapToPair(
new PairFunction<String, String, Integer>() {
@Override
public Tuple2<String, Integer> call(String s) {
return new Tuple2<>(s, 1);
}
}).reduceByKey(new Function2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer i1, Integer i2) {
return i1 + i2;
}
});
Function2<List<Integer>,Optional<Integer>,Optional<Integer>> UPDATE_FUNCTION =
new Function2<List<Integer>, Optional<Integer>, Optional<Integer>>() {
@Override
public Optional<Integer> call(List<Integer> values, Optional<Integer> state) {
Integer newSum = state.get();
scala.collection.Iterator<Integer> i = values.iterator();
while(i.hasNext()){
newSum += i.next();
}
runningCount.addAndGet(newSum);
System.out.print("Total number of words: " + String.valueOf(runningCount.get()));
return Optional.of(newSum);
}
};
//ERROR is here
JavaPairDStream<String, Integer> runningCounts =
wordCounts.updateStateByKey(UPDATE_FUNCTION);
runningCounts.print();
jsc.start();
jsc.awaitTermination();
}
}
我认为泛型和与 Scala 交互可能存在问题?当我进入 updateStateByKey 时,我看到一个适当的函数声明,所以我不确定这里缺少什么:
/**
* Return a new "state" DStream where the state for each key is updated by applying
* the given function on the previous state of the key and the new values of each key.
* Hash partitioning is used to generate the RDDs with Spark's default number of partitions.
* @param updateFunc State update function. If `this` function returns None, then
* corresponding state key-value pair will be eliminated.
* @tparam S State type
*/
def updateStateByKey[S](updateFunc: JFunction2[JList[V], Optional[S], Optional[S]])
: JavaPairDStream[K, S] = {
implicit val cm: ClassTag[S] = fakeClassTag
dstream.updateStateByKey(convertUpdateStateFunction(updateFunc))
}
最佳答案
问题已解决 - 结果是我导入了错误的 List 和 Iterator 类(我责怪 Eclipse):
注释掉:
//import scala.collection.immutable.List;
//import scala.collection.Iterator;
添加于:
import java.util.Iterator;
import java.util.List;
更新功能略有改变:
Function2<List<Integer>,Optional<Integer>,Optional<Integer>> UPDATE_FUNCTION =
new Function2<List<Integer>, Optional<Integer>, Optional<Integer>>() {
@Override
public Optional<Integer> call(List<Integer> values, Optional<Integer> state) {
Integer newSum = state.get();
Iterator<Integer> i = values.iterator();
while(i.hasNext()){
newSum += i.next();
}
runningCount.addAndGet(newSum);
System.out.print("Total number of words: " + String.valueOf(runningCount.get()));
return Optional.of(newSum);
}
};
关于java - Spark Streaming 中调用 updateStateByKey 出现异常,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/37572413/
想知道为什么 StatefulNetworkWordCount.scala 示例调用臭名昭著的 updateStateByKey() 函数,该函数应该只将函数作为参数,而不是: val stateDs
我编写了一个与 updateStateByKey 一起使用的简单函数,以查看问题是否是因为我的 updateFunc。我认为这一定是由于其他原因。我在 --master local[4] 上运行它。
我正在运行一个 24X7 的 Spark 流并使用 updateStateByKey 函数来保存计算的历史数据,就像 NetworkWordCount Example 的情况一样.. 我试图流式传输一
如何通过 INPUT PostgreSQL 表的更改触发的 Spark 结构化流计算来更新 OUTPUT TABLE 的状态? 作为现实生活中的场景,USERS 表已被user_id = 0002 更
我正在使用 updateStateByKey()在我的 Spark Streaming 应用程序中维护状态的操作。输入数据来自 Kafka 主题。 我想了解 DStreams 是如何分区的? 分区如何
我正在尝试使用 Spark Streaming 编写一个简单的应用程序,以从 Kafka 读取数据,并持续计算从主题读取单词的次数。我在调用非常重要的 updateStateByKey 方法时遇到问题
我正在尝试合并两个流,其中一个应该是有状态的(比如不经常更新的静态数据): SparkConf conf = new SparkConf().setAppName("Test Application"
我正在尝试通过从 Kafka 读取的(假)apache Web 服务器日志运行有状态 Spark Streaming 计算。目标是“ session 化”类似于 this blog post 的网络流
我正在 24/7 全天候运行 Spark 流并使用 updateStateByKey是否可以 24/7 全天候运行 Spark Streaming?如果是,updateStateByKey 不会变大,
我在 Scala 中有这个通用方法 def updateStateByKey[S](updateFunc: JFunction2[JList[V], Optional[S], Optional[S]]
当我遇到 updateStateByKey() 函数时,我刚刚开始寻找使用 Spark Streaming 进行有状态计算的解决方案。 我试图解决的问题: 10,000 个传感器每分钟产生一个二进制值
我在 Spark Streaming 应用程序中使用 updateStateByKey 函数来持久化和更新每个键的状态。问题是我想知道 “ key ”在更新函数里面。 input.updateStat
我是一名优秀的程序员,十分优秀!