gpt4 book ai didi

java - Flink DataStream API 中 TimeWindow.getStart() 的问题

转载 作者:太空宇宙 更新时间:2023-11-04 09:12:04 24 4
gpt4 key购买 nike

当我使用 ProcessWindowFunction 时,如下所示:

    private static class pwf
extends ProcessWindowFunction<String, Tuple3<String, String,String>, Tuple, TimeWindow> {//IN, OUT, KEY, W
public void process(Tuple key,
Context context,
Iterable<String> elements,
Collector<Tuple3<String, String,String>> out) {
String res = elements.iterator().next();
out.collect(new Tuple3<String,String,String>(String.valueOf((context.window().getStart())),(String)((Tuple1)key).f0, res));
}
}

我的开始时间是这样的:1691580000、1691640000,但我无法理解这个形式,有人可以给我一些建议吗?任何答案将不胜感激!

整个代码可能有帮助:

DataStream<RawLogGroupList> sourceStream = env.addSource(new FlinkLogConsumer<RawLogGroupList>(deserializer, configProps));
DataStream<Tuple3<String,String,String>> resStream = sourceStream
.flatMap(new FlatMapFunction<RawLogGroupList, RawLog>() {
@Override
public void flatMap(RawLogGroupList value, Collector<RawLog> out) throws Exception {
for (RawLogGroup logGroup : value.getRawLogGroups()) {
for (RawLog log : logGroup.getLogs()) {
out.collect(log);
}
}
}
})
.setParallelism(flatmapParallelism)
.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<RawLog>(Time.seconds(30)) {
@Override
public long extractTimestamp(RawLog element) {
return element.getTime()*1000;
}
}).setParallelism(mapParallelism)
.map(new MapFunction<RawLog, Tuple3<String,String,Long>>() {
@Override
public Tuple3<String,String,Long> map(RawLog value) throws Exception {
Map<String,String> contents = value.getContents();
return new Tuple3<>(
contents.getOrDefault("logtime","nullFromMap"),
contents.getOrDefault("method","nullFromMap"),
Long.valueOf(contents.getOrDefault("latency","0"))
);
}
}).setParallelism(mapParallelism)
.keyBy(1)
.timeWindow(Time.seconds(60))
.aggregate(new Med(),new pwf())
.setParallelism(aggregateParallelism)
.returns(Types.TUPLE(Types.STRING,Types.STRING,Types.STRING));

最佳答案

这些数字是时间戳。

Flink 支持不同的时间概念;特别是处理时间事件时间(以及摄取时间)。处理时间是处理事件时的系统时间。事件时间是指事件中编码的时间戳。

如果您正在使用处理时间,那么这些将以毫秒为单位的 unix 时间戳,就像从 System.currentTimeMillis() 返回的内容一样。

如果您正在使用事件时间,那么时间戳只是数字,它们具有您的应用程序与它们关联的任何含义,尽管通常它们也被编码为自纪元以来的毫秒数(就像 System.currentTimeMillis() 一样)。

在你的例子中,这些似乎是以秒为单位的unix时间戳,但日期是2023年,这看起来很奇怪。 1691580000 是 2023 年 8 月 9 日下午 1:20:00。

如果这些时间戳没有意义,则可能是您的 timestamp assigner 有问题。 .

请参阅 more discussion about time in Flink 的文档.

关于java - Flink DataStream API 中 TimeWindow.getStart() 的问题,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/59559145/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com