gpt4 book ai didi

java - 强制驱逐滑动事件窗口以在 Flink 上进行处理(历史流)

转载 作者:行者123 更新时间:2023-12-02 11:40:06 25 4
gpt4 key购买 nike

目前我正在使用Flink进行流处理引擎的研究。在我的研究中,我使用历史流,它由以下形式的元组组成:

事件时间、attribute_1、...、attribute_X

其中 event_time 在处理过程中用作 TimeCharacteristic.EventTime。此外,我通过以下任一方式将数据集插入处理拓扑:(i) 创建内存结构,或 (ii) 通过读取 CSV 文件本身。

不幸的是,我注意到即使足够的元组已到达完成整个窗口的窗口运算符,该窗口也不会被推送到下游进行处理。结果,性能显着下降,并且多次出现 OutOfMemoryError 异常(具有大量历史流)。

为了说明典型用例,我提供以下示例:

StreamExecutionEnvironment env = 
StreamExecutionEnvironment.createLocalEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.setParallelism(1);
env.setMaxParallelism(1);
List<Tuple2<Long, Integer>> l = new ArrayList<>();
l.add(new Tuple2<>(1L, 11));
l.add(new Tuple2<>(2L, 22));
l.add(new Tuple2<>(3L, 33));
l.add(new Tuple2<>(4L, 44));
l.add(new Tuple2<>(5L, 55));
DataStream<Tuple2<Long, Integer>> stream = env.fromCollection(l);
stream.assignTimestampsAndWatermarks(
new AscendingTimestampExtractor<Tuple2<Long, Integer>>() {
@Override
public long extractAscendingTimestamp(Tuple2<Long, Integer> t) {
return t.f0;
}
})
.windowAll(SlidingEventTimeWindows.of(Time.milliseconds(2),
Time.milliseconds(1)))
.sum(1)
.print();
env.execute();

根据l的内容,我需要得到以下窗口结果:

  • [0, 2) 总和:11
  • [1, 3) 总和:33
  • [2, 4) 总和:55
  • [3, 5) 总和:77
  • [4, 6) 总和:99
  • [5, 7) 总和:55

每个列表项都可以读取为[开始时间戳,结束时间戳),总和:X。

我希望每次出现一个时间戳超出打开窗口的结束时间戳的元组时,Flink 都会生成一个窗口结果。例如,我希望当将带有时间戳 4L 的元组输入窗口运算符时,生成窗口 [1, 3) 的求和。然而,当来自l的所有元组被插入流的拓扑中时,处理开始。当我使用较大的历史流时,也会发生同样的事情,这会导致性能下降(甚至耗尽内存)。

问题:如何强制 Flink 在窗口完成时将窗口推送到下游进行处理?

我相信对于 SlidingEventTimeWindows 来说,窗口的逐出是由水印触发的。如果前面的情况成立,我该如何编写我的拓扑,以便它们在具有较晚时间戳的元组到达时触发窗口?

谢谢

最佳答案

AscendingTimestampExtractor 使用周期性水印策略,其中 Flink 将每 n 毫秒调用一次 getCurrentWatermark() 方法,其中 n 为 autowatermarkinterval .

默认间隔为 200 毫秒,与窗口大小相比非常长。然而,它们不能直接比较——200 毫秒是按处理时间而不是事件时间来测量的。尽管如此,我怀疑如果您没有更改此配置设置,那么在发出第一个水印之前会创建很多窗口,我认为这可以解释您所看到的内容。

您可以减少自动水印间隔(也许减少到 1 毫秒)。或者您可以实现 AssignerWithPunctuatedWatermarks ,这会给你更多的控制权。

关于java - 强制驱逐滑动事件窗口以在 Flink 上进行处理(历史流),我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/48647930/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com