gpt4 book ai didi

java - 事件时间过期的消息导致 java 堆空间 OutOfMemory 错误

转载 作者:行者123 更新时间:2023-11-30 02:22:02 27 4
gpt4 key购买 nike

我正在运行一个具有正常翻滚事件时间窗口(窗口大小为 1 小时)的作业。运行足够长的时间后,它将抛出有关 java 堆空间不足的错误。现在,有关正在处理的数据的问题是,今天中午将出现一条消息,接下来的 15k 条左右将来自一周前(这不是数据预期的样子,但它应该以任何一种方式处理)。因此,即使有允许的延迟,水印也远远超过了接下来 15k 消息的事件时间,因此应该丢弃延迟的消息。或者至少我是这么想的,因为他们不再在那个窗口中。

所以我的问题是这样的。 Flink 是否会维护过期的消息,即使它们没有被窗口使用?或者只是为了他们的翻滚窗口,我应该设置其他内容或某些属性以确保过期数据不会耗尽内存?

感谢您的帮助!

编辑

DataStream<OutputObject> outputStream = sourceData
.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<Record>(Time.minutes(1)) {
@Override
public long extractTimestamp(Record record) {
long eventTimeFromRecord = record.eventTimestamp;

return eventTimeFromRecord;
}
})
.keyBy("fieldToKeyBy")
.window(TumblingEventTimeWindows.of(Time.hours(1)))
.apply(new ApplyFunction());

最佳答案

当源的并行度为 n 时,就会有 n 个水印——每个并行子任务都有一个水印。如果 Flink 作业在今天中午收到一 strip 有时间戳的消息,然后收到一周前的许多事件,则该消息只会提前其中一个并行任务的水印,而其他 n-1 个任务仍将具有 Long .min_value 作为他们的水印。因此,那些“迟到”事件只会在并行窗口运算符之一中被识别为迟到,而其他 n-1 个窗口将继续处理这些“迟到”事件。

请注意,如果您刚刚从检查点或保存点恢复,也可能会发生这种情况,因为水印未保存在检查点或保存点中。这意味着您不能指望以前作业的消息流量来更新水印。

关于java - 事件时间过期的消息导致 java 堆空间 OutOfMemory 错误,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/46531141/

27 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com