gpt4 book ai didi

apache-flink - Flink中如何对WindowedStream进行自定义操作?

转载 作者:行者123 更新时间:2023-12-03 09:16:46 24 4
gpt4 key购买 nike

我想在 Flink 中的 WindowedStream 上执行一些操作,比如平均操作。但可用的预定义操作非常有限,例如求和、最小值、最大值等。

val windowedStream = valueStream
.keyBy(0)
.timeWindow(Time.minutes(5))
.sum(2) //Change this to average?

假设我想求平均值,我该怎么做?

最佳答案

Flink 没有内置函数来计算 WindowStream 上的平均值。您必须为此实现自定义WindowFunction

最有效的方法是实现一个 ReduceFunction 来计算您想要平均的值的计数和总和,以及一个后续的 WindowFunction 来获取结果ReduceFunction 并计算平均值。使用 ReduceFunction 效率更高,因为 Flink 将其直接应用于传入值。因此,它会动态聚合值,而不是在窗口中收集它们。这显着减少了窗口的内存占用。

由于 ReduceFunction 的输出与其输入具有相同的类型,因此您需要在应用 ReduceFunction 之前添加一个计数字段。

像下面这样的东西应该可以解决问题:

val valueStream: DataStream[(String, Double)] = ???

val r: DataStream[(String, Double)] = valueStream
// append a 1L for counting
.map(x => (x._1, x._2, 1l))
// key and window stream
.keyBy(0).timeWindow(Time.minutes(5))
.apply(
// ReduceFunction (compute sum and count)
(x: (String, Double, Long), y: (String, Double, Long)) =>
(x._1, x._2 + y._2, x._3 + y._3),
// WindowFunction
(key, window: TimeWindow, input: Iterable[(String, Double, Long)], out: Collector[(String, Double)]) => {
// get first (and only) value
val x: (String, Double, Long) = input.toIterator.next
// compute average as sum / count
out.collect(x._1, x._2 / x._3)
}
)

关于apache-flink - Flink中如何对WindowedStream进行自定义操作?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/36713085/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com