gpt4 book ai didi

java - Apache Spark 在 updateStateByKey() 之后合并

转载 作者:行者123 更新时间:2023-11-29 04:45:05 24 4
gpt4 key购买 nike

我正在尝试合并两个流,其中一个应该是有状态的(比如不经常更新的静态数据):

SparkConf conf = new SparkConf().setAppName("Test Application").setMaster("local[*]");
JavaStreamingContext context = new JavaStreamingContext(conf, Durations.seconds(10));
context.checkpoint(".");
JavaDStream<String> dataStream = context.socketTextStream("localhost", 9998);
JavaDStream<String> refDataStream = context.socketTextStream("localhost", 9999);

JavaPairDStream<String, String> pairDataStream = dataStream.mapToPair(e -> {
String[] tmp = e.split(" ");
return new Tuple2<>(tmp[0], tmp[1]);
});

JavaPairDStream<String, String> pairRefDataStream = refDataStream.mapToPair(e -> {
String[] tmp = e.split(" ");
return new Tuple2<>(tmp[0], tmp[1]);
}).updateStateByKey((Function2<List<String>, Optional<String>, Optional<String>>) (strings, stringOptional) -> {
if (!strings.isEmpty()) {
return Optional.of(strings.get(0));
}
return Optional.absent();
});

pairDataStream.join(pairRefDataStream).print();


context.start();
context.awaitTermination();

当我将 1 aaa 写入第一个流并将 1 111 写入第二个流时,立即一切正常,我看到了合并结果。但是,当我在一分钟后将 1 bbb 写入第一个流时,我什么也看不到。

我是否正确理解了 updateStateByKey() 的作用?还是我错了?

最佳答案

updateStateByKey 完全按照您的要求进行操作。特别是如果当前窗口不包含任何数据(strings.isEmpty()),您指示它忘记(return Optional.absent();):

if (!strings.isEmpty()) {
return Optional.of(strings.get(0));
}
return Optional.absent();

虽然您可能想要返回以前的状态:

if (!strings.isEmpty()) {
return Optional.of(strings.get(0));
}
return stringOptional;

关于java - Apache Spark 在 updateStateByKey() 之后合并,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/37508653/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com