gpt4 book ai didi

java - Apache 弗林克 : Windowed ReduceFunction is never executed

转载 作者:行者123 更新时间:2023-12-01 20:20:42 25 4
gpt4 key购买 nike

下面是代码片段,我在其中使用基于 Tumbling EventTime 的窗口

DataStream<OHLC> ohlcStream = stockStream.assignTimestampsAndWatermarks(new TimestampExtractor()).map(new mapStockToOhlc()).keyBy((KeySelector<OHLC, Long>) o -> o.getMinuteKey())
.timeWindow(Time.seconds(60))
.reduce(new myAggFunction());

不幸的是,它看起来从未执行过reduce函数。如果使用上面没有窗口的代码,reduce 函数可以正常工作。下面是 TimestampExtractor 的代码。 30秒的水印延迟只是作为测试值,但一分钟的滚动窗口是m

    public static class TimestampExtractor implements AssignerWithPeriodicWatermarks<StockTrade> {
@Nullable
@Override
public Watermark getCurrentWatermark() {
return new Watermark(System.currentTimeMillis() - 30000);
}

@Override
public long extractTimestamp(StockTrade stockTrade, long l) {
BigDecimal bd = new BigDecimal(stockTrade.getTime());
// bd contains miliseconds timestamp 1498658629.036
return bd.longValue();
}
}

bd.longValue() 返回秒时间戳 1498658629,因为我的窗口也是以秒为单位定义的。
当我使用 bd.longValue()/60(返回分钟时间戳)时,会调用 reduce 函数。我的输出文件包含每个归约操作的所有记录

{time=1498717692.000, minuteTime=24978628, n=1, open=2248.0}
{time=1498717692.000, minuteTime=24978628, n=2, open=2248.0}
...
{time=1498717692.000, minuteTime=24978628, n=8, open=2248.0}

那么,谁能向我解释一下,发生了什么?非常感谢。

最佳答案

通常,水印应与数据中的时间戳相关,而不应基于系统时钟。使用事件时间的好处之一是,可以使用同一个应用程序重新处理历史数据或处理当前数据,但如果您将时间戳与系统时钟进行比较,则这是不可能的,就像您在此处所做的那样。

水印可以被认为是所有时间戳小于水印的数据已经到达的声明。或者换句话说,任何时间戳小于当前水印的数据都将被视为迟到。我的猜测是,您没有看到任何结果,因为您的水印导致所有数据被视为延迟,并且窗口运算符(operator)正在删除所有这些延迟数据。

我建议您使用BoundedOutOfOrdernessTimestampExtractor反而。它的工作原理是跟踪数据流中迄今为止看到的最大时间戳,并从该最大时间戳(而不是系统时钟)中减去延迟。 source code ,如果你好奇的话。

关于java - Apache 弗林克 : Windowed ReduceFunction is never executed,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/44818012/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com