gpt4 book ai didi

java - Flink TimeStamp,在流中添加计算字段

转载 作者:行者123 更新时间:2023-12-01 09:30:12 26 4
gpt4 key购买 nike

我有一个无序流,我需要对它们进行排序,并将字段值与下一帧中的相同字段求和。我的代码:

env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
DataStream<Message> messageswithTS = messages.assignTimestampsAndWatermarks(new TimeLagWaterMarkGenerator());
DataStream<Message> SumNumber = messageswithTS
.keyBy("deviceId")
.map(new Sumalo())

其中 Sumalo() 是进行加法的函数。提取时间戳的代码:

public class TimeLagWaterMarkGenerator implements AssignerWithPeriodicWatermarks<Message> {

private static final long serialVersionUID = 1L;
private long currentMaxTimestamp;

@Override
public long extractTimestamp(Message element, long previousElementTimestamp) {
long timestamp = element.getDate();
currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp);
return timestamp;
}

@Override
public Watermark getCurrentWatermark() {
return new Watermark(currentMaxTimestamp);
}
}

结果:

1   TRUE    0   21  1473861657491   6af7ecfb-5122-48b6-ada1-0ea39d1d4740
1 FALSE 3 3 1473861657496 c8b4617d-534b-4c5e-825c-a8c5556fcd87
1 TRUE 1 29 1473861657497 f5b72056-ec3d-4c97-b86d-73ed728757c3
1 FALSE 0 29 1473861657501 363d061d-ce02-4709-9683-b3bb233861f3
....

正确结果:

1   TRUE    0   0   1473861657491   6af7ecfb-5122-48b6-ada1-0ea39d1d4740
1 FALSE 3 3 1473861657496 c8b4617d-534b-4c5e-825c-a8c5556fcd87
1 TRUE 1 4 1473861657497 f5b72056-ec3d-4c97-b86d-73ed728757c3
1 FALSE 0 4 1473861657501 363d061d-ce02-4709-9683-b3bb233861f3
....

感谢任何帮助。

最佳答案

Flink 不会在事件时间上自动对事件时间流进行排序,并且也不会提供一个运算符来对事件时间流进行排序(这只能在事件时间,即将无序流转换为有序流)。

但是,您可以通过扩展 AbstractStreamOperator 自己实现这样的运算符。这是一个低级界面,您可以在其中访问事件、为其分配的时间戳和收到的水印。运算符(operator)可以按如下方式工作。它可以将所有到达的元素插入按事件时间排序的堆中。当水印到达时,它会发出时间戳小于水印的所有元素。如果迟到的元素到达(即时间戳小于当前水印的元素),您可以发出它(破坏流的完整顺序)或丢弃它。运算符(operator)还需要通过将堆保持为 Flink 托管状态来参与检查点。您应该意识到这个接口(interface)非常底层,需要很好地理解 Flink 的工作原理。此外,它可能会在次要版本之间发生变化。

关于您的时间戳和水印分配器,您不会向水印添加任何松弛。通过此实现,您可能会有许多后期元素。看一下BoundedOutOfOrdernessTimestampExtractor

关于java - Flink TimeStamp,在流中添加计算字段,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/39493530/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com