gpt4 book ai didi

apache-flink - 滑动时间窗口的 F​​link 性能问题

转载 作者:行者123 更新时间:2023-12-02 04:28:01 25 4
gpt4 key购买 nike

我正在尝试使用 flink 进行一些网络监控工作。我的目标是计算每个 src_ip 的不同 dst_ip

我的以下代码有效,但性能真的很差。似乎每个滑动窗口都会重新计算所有事件,但这应该不是必需的。

例如,我们在时间秒 1 - 600 上有事件。Flink 可以获得每秒的累加器,因此我们每秒有 600 个累加器。当第一个滑动窗口过期时,flink 只是合并 1-300 的累加器,并销毁第二个 1 的累加器。这个窗口也可以在最后一秒之前预合并 1-299。当第二个滑动窗口到期时,flink 只是合并 2-301 的累加器,并销毁第二个 2 的累加器。以此类推......

这种方式比将事件分配给多个窗口,并计算每个窗口的聚合效率要高得多。

flink 支持吗?能否通过flink自己实现类似的功能?

非常感谢!

public static class AverageAccumulator2 {
String key;
Set<String> target;
AverageAccumulator2() {
target = new HashSet<>();
}
}

public static class Average2 implements AggregateFunction<ObjectNode, AverageAccumulator2, Tuple3<String, Long, Set<String>>> {
@Override
public AverageAccumulator2 createAccumulator() {
return new AverageAccumulator2();
}

@Override
public AverageAccumulator2 add(ObjectNode value, AverageAccumulator2 accumulator) {
accumulator.key = value.get("value").get("src_ip").asText();
accumulator.target.add(value.get("value").get("dst_ip").asText());
return accumulator;
}
@Override
public Tuple3<String, Long, Set<String>> getResult(AverageAccumulator2 accumulator) {
return new Tuple3<>(accumulator.key, (long) accumulator.target.size(), accumulator.target);
}

@Override
public AverageAccumulator2 merge(AverageAccumulator2 a, AverageAccumulator2 b) {
a.target.addAll(b.target);
return a;
}
}

final SingleOutputStreamOperator<Tuple3<String, Long, Set<String>> > process2 =
stream.keyBy(value -> value.get("value").get("sip").asText())
.timeWindow(Time.seconds(300),Time.seconds(1))
.aggregate(new Average2());

最佳答案

如您所见,Flink 不会尝试优化滑动窗口。对于细粒度滑动,这确实变得非常昂贵。

您可以做的是使用 ProcessFunction 实现您自己的处理状态和计时器的逻辑。 - 您可以按照概述的方式实现。您将拥有一个 processElement 方法,该方法针对每个传入记录更新您将用于累积结果的数据结构,以及一个每秒触发一次的 onTimer 方法,将部分结果合并在一起,并将结果发送到下游。

关于apache-flink - 滑动时间窗口的 F​​link 性能问题,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/51977741/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com