gpt4 book ai didi

java - 了解 akka 流中的背压 Source.queue

转载 作者:行者123 更新时间:2023-12-02 19:04:23 24 4
gpt4 key购买 nike

我正在尝试 akka 流,但在我的简单示例中我无法获得背压来工作。诚然,我对 akka(流)没有经验,所以可能我错过了一些大东西。

我生成(在队列上提供)整数的速度比消耗它们的速度快,所以我认为背压会起作用。我的目标是始终消耗放入队列中的最新项目(这就是为什么我有 bufferSize = 1 和源队列上的 OverflowStrategy.dropHead())。

public class SimpleStream {
public static void main(String[] argv) throws InterruptedException {
final ActorSystem system = ActorSystem.create("akka-streams");
final Materializer materializer = ActorMaterializer.create(system);

final Procedure<Integer> slowConsumer = (i) -> {
System.out.println("consuming [" + i + "]");
ThreadUtils.sleepQuietly(1000, TimeUnit.MILLISECONDS);
};

final SourceQueue<Integer> q = Sink
.<Integer>foreach(slowConsumer)
.runWith(Source.<Integer>queue(1, OverflowStrategy.dropHead()), materializer);

final AtomicInteger i = new AtomicInteger(0);
final Thread t = new Thread(() -> {
while (!Thread.currentThread().isInterrupted()) {
int n = i.incrementAndGet();
q.offer(n);
System.out.println("produced: [" + n + "]");
ThreadUtils.sleepQuietly(500, TimeUnit.MILLISECONDS);
}
});
t.setName("ticking");
t.start();

// run some time... to observe the effects.
ThreadUtils.sleepQuietly(1, TimeUnit.HOURS);
t.interrupt();
t.join();

// eventually shutdown akka here...
}
}

然而这是结果:

produced: [1]
consuming [1]
produced: [2]
produced: [3]
consuming [2] <-- Expected to be consuming 3 here.
produced: [4]
produced: [5]
consuming [3] <-- Expected to be consuming 5 here.
produced: [6]
produced: [7]

请忽略这里和那里的线程内容,只是为了假装从外部获取数据源代码(就像我必须在实际项目中使用它时会发生的情况一样)。

知道我错过了什么吗?

最佳答案

Source.queue 终止背压信号。这就是 Source.queue 方法采用 OverflowStrategy 的原因。如果可以向队列上游发出反压信号,则无需处理队列可能溢出的情况。但由于背压不会传播到队列之外,因此需要定义策略来处理比消费者更快的生产者。

对于典型的流,最终的Source接收来自Sink的需求以产生更多结果。但是,对于从 Source.queue 创建的流,“最终源”是队列。该队列只能排出内容(如果有)。它无法向上游发出信号以生成更多结果,因为上游位于 offer 方法的另一侧。

关于java - 了解 akka 流中的背压 Source.queue,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/44228563/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com