gpt4 book ai didi

java - 如何使用 Apache Flink 避免字数统计中出现重复的键元组

转载 作者:行者123 更新时间:2023-12-02 09:18:57 27 4
gpt4 key购买 nike

我正在尝试使用 Apache Flink 编写一个简单的字数统计程序,因为我正在学习它。

问题是我无法删除结果中重复的键元组。

输入:

a
aaa
ab
aaa
a
a

输出:

(a,1)
(a,2)
(a,3)
(aaa,1)
(aaa,2)
(ab, 1)

预期输出:

(a,3)
(aaa,2)
(ab, 1)

我的代码:

public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

DataStream<String> text = env.readTextFile("data.in");

DataStream<Tuple2<String, Integer>> counts = text
.map(s -> Tuple2.of(s, 1))
.returns(Types.TUPLE(Types.STRING, Types.INT))
.keyBy(0)
.sum(1);

counts.print();
env.execute();
}

最佳答案

Flink 的流 API 并不是为了产生您期望的结果而设计的。相反,流处理背后的想法是输入可能是无限的——换句话说,输入将永远持续到达。实际上,是的,输入可能终止,但话又说回来,也许不会。

由于 Flink 不期望流输入永远终止,因此不能期望它等到结束才产生结果。相反,Flink 的 DataStream API 是围绕产生连续结果的连续输入的理念来组织的。每个新的输入事件都可能产生更新的结果。

但是,有一种方法可以完成您想要的任务,同时仍然使用 DataStream API,但它有点复杂。

事实证明,当您将 Flink 与有界输入源(如文件)一起使用时,当到达该有界输入的末尾时,会通过作业图发送一个信号,指示已到达末尾。事实上,您可以等待这个信号,然后才产生结果。

我所说的这个信号实际上是一个水印,其值为MAX_WATERMARK。因此,您可以做的是让 ProcessFunction 在遥远的将来的某个时刻设置事件时间计时器。仅当出现此特殊水印时,此计时器才会触发。与此同时,这个 ProcessFunction 应该监视流,跟踪最新结果(对于每个键)——只有当这个计时器最终在收到这个极大的水印时触发时,它才会收集到输出。

或者您可以只使用 Flink 的 DataSet API,它是围绕批处理组织的。然后您就会得到您所期望的结果。

关于java - 如何使用 Apache Flink 避免字数统计中出现重复的键元组,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/58828218/

27 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com