gpt4 book ai didi

java - SlidingWindows 用于 Apache Beam 上的慢速数据(大间隔)

转载 作者:塔克拉玛干 更新时间:2023-11-03 05:23:55 24 4
gpt4 key购买 nike

我正在使用 Chicago Traffic Tracker数据集,每 15 分钟发布一次新数据。当有新数据可用时,它表示记录与“实时”相差 10-15 分钟(example,查找 _last_updt)。

例如,在 00:20,我得到时间戳为 00:10 的数据;在 00:35,我从 00:20 开始;在 00:50,我从 00:40 开始。因此,我可以“固定”获取新数据的时间间隔(每 15 分钟一次),尽管时间戳的时间间隔略有变化。

我正在尝试在 Dataflow (Apache Beam) 上使用这些数据,为此我正在玩滑动窗口。我的想法是收集和处理 4 个连续的数据点(4 x 15 分钟 = 60 分钟),理想情况下,一旦有新数据点可用,就更新我的总和/平均值计算。为此,我从代码开始:

PCollection<TrafficData> trafficData = input        
.apply("MapIntoSlidingWindows", Window.<TrafficData>into(
SlidingWindows.of(Duration.standardMinutes(60)) // (4x15)
.every(Duration.standardMinutes(15))) . // interval to get new data
.triggering(AfterWatermark
.pastEndOfWindow()
.withEarlyFirings(AfterProcessingTime.pastFirstElementInPane()))
.withAllowedLateness(Duration.ZERO)
.accumulatingFiredPanes());

不幸的是,看起来当我从我的输入中收到一个新的数据点时,我没有从我之后的 GroupByKey 中得到一个新的(更新的)结果。

我的 SlidingWindows 有问题吗?还是我遗漏了什么?

最佳答案

一个问题可能是水印超出了窗口的末尾,并丢弃了所有后面的元素。您可以在水印通过后尝试提供几分钟:

PCollection<TrafficData> trafficData = input        
.apply("MapIntoSlidingWindows", Window.<TrafficData>into(
SlidingWindows.of(Duration.standardMinutes(60)) // (4x15)
.every(Duration.standardMinutes(15))) . // interval to get new data
.triggering(AfterWatermark
.pastEndOfWindow()
.withEarlyFirings(AfterProcessingTime.pastFirstElementInPane())
.withLateFirings(AfterProcessingTime.pastFirstElementInPane()))
.withAllowedLateness(Duration.standardMinutes(15))
.accumulatingFiredPanes());

如果这有帮助,请告诉我。

关于java - SlidingWindows 用于 Apache Beam 上的慢速数据(大间隔),我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/50578459/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com