gpt4 book ai didi

java - ReactiveX Backpressure 没有按预期工作

转载 作者:行者123 更新时间:2023-12-01 14:20:45 29 4
gpt4 key购买 nike

我正在尝试制作带有背压的流动性。我的想法是,在当前项目之一完成处理之前,不会发出新的可流动项目。我正在使用 ResourceSubscriber 和 subscribeWith() 方法来实现这一点。

flowable 的每个元素都在单独的线程池中异步处理。 (这是我通过使用 flatMap/subscribeOn 实现的)

我希望第二个之后的每个元素都将在调用订阅者的 onNext 方法之后发出。但是,当我尝试运行此代码时,Flowable 会不受控制地发出元素。背压不起作用。

有重现问题的代码:

import io.reactivex.Flowable;
import io.reactivex.schedulers.Schedulers;
import io.reactivex.subscribers.ResourceSubscriber;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.concurrent.atomic.AtomicInteger;

public class RxTest2 {

private static final Logger log = LoggerFactory.getLogger(RxTest.class);

static AtomicInteger integer = new AtomicInteger();

public static void main(String[] args) {
Flowable.generate(emitter -> {
final int i1 = integer.incrementAndGet();
if (i1 >= 20) {
Thread.sleep(10000);
System.exit(0);
}
emitter.onNext(i1);
})
.doOnNext(i -> log.info("Published: " + i))
.flatMap(i -> Flowable.defer(() -> {
log.info("Starting consuming {}", i);
Thread.sleep(100);
log.info("Finished consuming {}", i);
return Flowable.just(i);
}).subscribeOn(Schedulers.computation()))
.doOnNext(i -> log.info("Consuming finished, result: " + i))
.subscribeWith(new BackpressureSubscriber(2));
}
}

class BackpressureSubscriber extends ResourceSubscriber<Object> {

private static final Logger log = LoggerFactory.getLogger(BackpressureSubscriber.class);

private final long initialRequest;

public BackpressureSubscriber(final long initialRequest) {
this.initialRequest = initialRequest;
}

@Override
protected void onStart() {
super.onStart();
log.info("Starting execution with {} initial requests", initialRequest);
request(initialRequest);
}

@Override
public void onNext(final Object message) {
log.info("On next for {}", message);
request(1);
}

@Override
public void onError(final Throwable throwable) {
log.error("Unhandled error: ", throwable);
}

@Override
public void onComplete() {
log.info("On Complete");
}
}

预期的输出是这样的:

[main] INFO RxTest - Published: 1
[main] INFO RxTest - Published: 2
[RxComputationThreadPool-1] INFO RxTest - Starting consuming 1
[RxComputationThreadPool-1] INFO RxTest - Finished consuming 1
[RxComputationThreadPool-2] INFO RxTest - Starting consuming 2
[RxComputationThreadPool-1] INFO RxTest - On next for 1
[main] INFO RxTest - Published: 3
[RxComputationThreadPool-1] INFO RxTest - Finished consuming 2

实际输出:

11:30:32.166 [main] INFO BackpressureSubscriber - Starting execution with 2 initial requests
11:30:32.170 [main] INFO RxTest - Published: 1
11:30:32.189 [main] INFO RxTest - Published: 2
11:30:32.189 [RxComputationThreadPool-1] INFO RxTest - Starting consuming 1
11:30:32.189 [RxComputationThreadPool-2] INFO RxTest - Starting consuming 2
11:30:32.189 [main] INFO RxTest - Published: 3
11:30:32.190 [main] INFO RxTest - Published: 4
11:30:32.190 [RxComputationThreadPool-3] INFO RxTest - Starting consuming 3
11:30:32.190 [main] INFO RxTest - Published: 5
11:30:32.190 [RxComputationThreadPool-4] INFO RxTest - Starting consuming 4
11:30:32.190 [main] INFO RxTest - Published: 6
11:30:32.190 [RxComputationThreadPool-5] INFO RxTest - Starting consuming 5
11:30:32.190 [main] INFO RxTest - Published: 7
11:30:32.191 [RxComputationThreadPool-6] INFO RxTest - Starting consuming 6
11:30:32.191 [main] INFO RxTest - Published: 8
11:30:32.191 [RxComputationThreadPool-7] INFO RxTest - Starting consuming 7
11:30:32.191 [main] INFO RxTest - Published: 9
11:30:32.191 [RxComputationThreadPool-8] INFO RxTest - Starting consuming 8
11:30:32.191 [main] INFO RxTest - Published: 10
11:30:32.191 [RxComputationThreadPool-9] INFO RxTest - Starting consuming 9
11:30:32.191 [main] INFO RxTest - Published: 11
11:30:32.191 [RxComputationThreadPool-10] INFO RxTest - Starting consuming 10
11:30:32.192 [main] INFO RxTest - Published: 12
11:30:32.192 [RxComputationThreadPool-11] INFO RxTest - Starting consuming 11
11:30:32.192 [main] INFO RxTest - Published: 13
11:30:32.192 [main] INFO RxTest - Published: 14
11:30:32.192 [RxComputationThreadPool-12] INFO RxTest - Starting consuming 12
11:30:32.192 [main] INFO RxTest - Published: 15
11:30:32.192 [main] INFO RxTest - Published: 16
11:30:32.192 [main] INFO RxTest - Published: 17
11:30:32.192 [main] INFO RxTest - Published: 18
11:30:32.192 [main] INFO RxTest - Published: 19
11:30:32.294 [RxComputationThreadPool-2] INFO RxTest - Finished consuming 2
11:30:32.294 [RxComputationThreadPool-1] INFO RxTest - Finished consuming 1
11:30:32.294 [RxComputationThreadPool-1] INFO RxTest - Consuming finished, result: 1
11:30:32.294 [RxComputationThreadPool-1] INFO BackpressureSubscriber - On next for 1

在库版本上测试:

2.2.192.1.2

据我对 ReactiveX 文档的理解,我认为这是 RX Bug。但是我可能是错的,如果你指出我将不胜感激

最佳答案

flatMap实际上从上游分批请求,并会缓冲项目,直到下游请求它们。这个事实足以描述您所看到的行为。如果你设置了 bufferSize到 1 你可能会看到你期望的行为。有一个重载可以让你设置 bufferSize .

另外flatMap有一个 maxConcurrent如果你意识到 flatMap,这个参数就更容易理解了实际上是 map , 然后是 merge应用于 map 给出的流的流. merge实际上一次只能订阅有限数量的资源,即 maxConcurrent .默认为 bufferSizemaxConcurrent是 128。

请记住,当合并步骤收到来自下游的请求时,它不知道需要订阅多少流(记住我们在这里处理的是流的流)才能完成请求!前 10 个流可能根本不返回任何值。如果第一个流没有返回任何内容并且在 1 小时内没有完成并且我们有 maxConcurrent=1 那么我们将在第一个小时内完全没有收到任何事件,即使流 2 和流 3 已准备好向我们发送内容。由于这些原因,我们必须为 bufferSize 选择通用默认值。和 maxConcurrent并且通常会选择在某些基准案例中优化性能并最大限度地减少许多边缘案例的问题的值。

关于java - ReactiveX Backpressure 没有按预期工作,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/62566721/

29 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com