gpt4 book ai didi

scala - Apache 弗林克 : Count window with timeout

转载 作者:行者123 更新时间:2023-12-02 04:04:42 25 4
gpt4 key购买 nike

这是一个简单的代码示例来说明我的问题:

case class Record( key: String, value: Int )

object Job extends App
{
val env = StreamExecutionEnvironment.getExecutionEnvironment
val data = env.fromElements( Record("01",1), Record("02",2), Record("03",3), Record("04",4), Record("05",5) )
val step1 = data.filter( record => record.value % 3 != 0 ) // introduces some data loss
val step2 = data.map( r => Record( r.key, r.value * 2 ) )
val step3 = data.map( r => Record( r.key, r.value * 3 ) )
val merged = step1.union( step2, step3 )
val keyed = merged.keyBy(0)
val windowed = keyed.countWindow( 3 )
val summed = windowed.sum( 1 )
summed.print()
env.execute("test")
}

这会产生以下结果:

Record(01,6)
Record(02,12)
Record(04,24)
Record(05,30)

正如预期的那样,不会为键“03”生成任何结果,因为计数窗口需要 3 个元素,而流中只存在两个元素。

我想要的是某种带有超时的计数窗口,以便在一定的超时后,如果未达到计数窗口预期的元素数量,则会使用现有元素生成部分结果。

通过这种行为,在我的示例中,达到超时时将生成 Record(03,15)。

最佳答案

您还可以使用自定义窗口Trigger来执行此操作,该窗口在达到计数或超时到期时触发 - 有效地混合内置CountTriggerEventTimeTrigger

关于scala - Apache 弗林克 : Count window with timeout,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/49783676/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com