gpt4 book ai didi

java - 订阅者如何通过 react 性拉背压来控制发布者?

转载 作者:行者123 更新时间:2023-12-02 08:52:05 26 4
gpt4 key购买 nike

我有一个发布者,其发布速度可能比订阅者处理数据的速度快。为了解决这个问题,我开始使用背压。因为我不想丢弃任何数据,所以我使用 react 拉背压。我将其理解为订阅者能够告诉发布者何时发布更多数据,如 this and the follwing paragraphs 中所述。 。

发布者是一个 Flowable,它异步并行地工作,然后合并到顺序 Flowable 中。数据应缓冲最多 10 个元素,当该缓冲区已满时,Flowable 不应发布更多数据并等待下一个请求。

订阅者是一个 DisposableSubscriber,在开始时请求 10 个项目。每个消耗的项目都需要一些计算,之后将请求新的项目。

我的 MWE 看起来像这样:

List<Integer> src = new ArrayList<>();
for (int i = 0; i < 200; i++) {
src.add(i);
}
Flowable.fromIterable(src)
.parallel(10, 1)
.runOn(Schedulers.from(Executors.newFixedThreadPool(10)))
.flatMap(i -> Single.fromCallable(() -> {
System.out.println("publisher: " + i);
Thread.sleep(200);
return i;
}).toFlowable())
.sequential(1)
.onBackpressureBuffer(10)
.observeOn(Schedulers.newThread())
.subscribeOn(Schedulers.newThread())
.doOnError(Throwable::printStackTrace)
.subscribeWith(new DisposableSubscriber<Integer>() {
@Override
protected void onStart() {
request(10);
}
@Override
public void onNext(Integer integer) {
System.out.println("subscriber: " + integer);
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
request(1);
}
@Override
public void onError(Throwable t) {
}
@Override
public void onComplete() {
}
});
try {
Thread.sleep(1000000);
} catch (InterruptedException e) {
e.printStackTrace();
}

我期望此代码执行以下操作:订阅者请求前 10 项。出版商出版前 10 项。然后,订阅者在 onNext 中进行计算并请求更多项目,发布者将发布这些项目。

实际发生的情况:起初,发布者似乎无限制地发布项目。在某个时刻,例如发布 14 个项目后,订阅者处理其第一个项目。在此期间,发布者会继续发布项目。发布大约 30 个项目后,会抛出 io.reactivex.exceptions.MissingBackPressureException: Buffer is full 并且流结束。

我的问题:我做错了什么? 如何让订阅者控制发布者是否以及何时发布数据?显然,我正在做一些非常错误的事情。否则,期望与现实不会有如此大的差异。

上述 MWE 的输出示例:

publisher: 5
publisher: 7
publisher: 8
publisher: 0
publisher: 2
publisher: 6
publisher: 9
publisher: 3
publisher: 4
publisher: 1
publisher: 18
publisher: 17
publisher: 15
subscriber: 0
publisher: 11
publisher: 10
publisher: 19
publisher: 13
publisher: 14
publisher: 12
publisher: 16
publisher: 27
publisher: 28
publisher: 23
publisher: 21
publisher: 29
publisher: 20
publisher: 25
publisher: 22
publisher: 26
io.reactivex.exceptions.MissingBackpressureException: Buffer is full

最佳答案

不是 Rx 方面的专家,但让我尝试一下。.observeOn(...) 有自己的默认缓冲区大小 128。所以,从一开始它就会从上游请求的内容超出了您的缓冲区所能容纳的内容。

observeOn(...) 接受可选的缓冲区大小覆盖,但即使您提供它,ParallelFlowable 也会调用您的 flatMap(...) 方法的频率比您想要的要高。我不是 100% 确定为什么,也许它有自己的内部缓冲,在将轨道合并回顺序时执行。

我认为您可以通过使用 flatMap(...) 而不是 parralel(...) 并提供 maxConcurrency 参数来更接近您想要的行为。

要记住的另一件事是,您不想调用 subscribeOn(...) - 它会影响整个上游 Flowable。因此,如果您已经在调用 parallel(...).runOn(...),则它不会产生任何效果,或者效果将出乎意料。

有了上述内容,我认为这会让您更接近您正在寻找的东西:

    List<Integer> src = new ArrayList<>();
for (int i = 0; i < 200; i++) {
src.add(i);
}
Scheduler scheduler = Schedulers.from(Executors.newFixedThreadPool(10));
Flowable.fromIterable(src)
.flatMap(
i -> Flowable.just( i )
.subscribeOn(scheduler) // here subscribeOn(...) affects just this nested Flowable
.map( __ -> {
System.out.println("publisher: " + i);
Thread.sleep(200);
return i;
} ),
10) // max concurrency
.observeOn(Schedulers.newThread(), false, 10) // override buffer size
.doOnError(Throwable::printStackTrace)
.subscribeWith(new DisposableSubscriber<Integer>() {
@Override
protected void onStart() {
request(10);
}
@Override
public void onNext(Integer integer) {
System.out.println("subscriber: " + integer);
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
request(1);
}
@Override
public void onError(Throwable t) {
}
@Override
public void onComplete() {
}
});
try {
Thread.sleep(1000000);
} catch (InterruptedException e) {
e.printStackTrace();
}

关于java - 订阅者如何通过 react 性拉背压来控制发布者?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/60706697/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com