gpt4 book ai didi

java - Akka stream - 在不引入延迟的情况下限制流量

转载 作者:塔克拉玛干 更新时间:2023-11-01 22:39:54 24 4
gpt4 key购买 nike

我正在使用 Akka(版本 2.4.17)在 Java 中构建一个观察流(假设类型为 <T> 的元素保持通用)。

我的要求是,这个流程应该是可定制的,以一到达就提供每单位时间的最大数量的观察。例如,它应该能够每分钟最多提供 2 个观察结果(第一个到达,其余的可以丢弃)。

我非常仔细地查看了 Akka 文档,特别是 this page其中详细介绍了内置阶段及其语义。

到目前为止,我尝试了以下方法。

  • throttleshaping()模式(超过限制时不关闭流):

      Flow.of(T.class)
    .throttle(2,
    new FiniteDuration(1, TimeUnit.MINUTES),
    0,
    ThrottleMode.shaping())
  • groupedWith和一个中间自定义方法:

    final int nbObsMax = 2;

    Flow.of(T.class)
    .groupedWithin(Integer.MAX_VALUE, new FiniteDuration(1, TimeUnit.MINUTES))
    .map(list -> {
    List<T> listToTransfer = new ArrayList<>();
    for (int i = list.size()-nbObsMax ; i>0 && i<list.size() ; i++) {
    listToTransfer.add(new T(list.get(i)));
    }
    return listToTransfer;
    })
    .mapConcat(elem -> elem) // Splitting List<T> in a Flow of T objects

以前的方法为我提供了每单位时间的正确观察次数,但这些观察结果被保留并且仅在时间窗口结束时提供(因此存在额外的延迟)。

举一个更具体的例子,如果以下观察结果进入我的流程:

[Obs1 t=0s] [Obs2 t=45s] [Obs3 t=47s] [Obs4 t=121s] [Obs5 t=122s]

它应该只在它们到达时输出以下的(这里可以忽略处理时间):

Window 1: [Obs1 t~0s] [Obs2 t~45s] Window 2: [Obs4 t~121s] [Obs5 t~122s]

任何帮助将不胜感激,感谢阅读我的第一篇 StackOverflow 帖子;)

最佳答案

我想不出开箱即用的解决方案来满足您的需求。 Throttle 将以稳定的流发出,因为它是如何使用桶模型实现的,而不是在每个时间段开始时都有一个允许的租约。

要获得准确的行为,您必须创建自己的自定义速率限制阶段(这可能并不难)。您可以在此处找到有关如何创建自定义阶段的文档:http://doc.akka.io/docs/akka/2.5.0/java/stream/stream-customize.html#custom-linear-processing-stages-using-graphstage

一个可行的设计是有一个配额计数器,它表示你可以在每个间隔重置多少个元素,对于每个传入的元素,你从计数器中减去一个并发出,当配额用完时,你继续向上游但是丢弃元素而不是发出它们。将 TimerGraphStageLogic 用于 GraphStageLogic 允许您设置可以重置限额的定时回调。

关于java - Akka stream - 在不引入延迟的情况下限制流量,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/43513922/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com