gpt4 book ai didi

java - RX Java 2 顺序处理契约(Contract)

转载 作者:行者123 更新时间:2023-11-30 10:02:06 30 4
gpt4 key购买 nike

TL; DR - 是否保证默认情况下,在观察 Observable 发出的事件时,在任何给定时间内只使用一个线程?

在我看来,RxJava2 通常是顺序的,除非通过 parallel() 等方式表示。即使使用 observeOn/subscribeOn,我也看到有例如doOnNext() 永远不会有两个线程同时运行:

AtomicInteger counter = new AtomicInteger();

PublishSubject<Integer> testSubject = PublishSubject.create();

testSubject
.observeOn(Schedulers.io())
.doOnNext(val -> {
if(counter.incrementAndGet() > 1)
System.out.println("Whoa!!!!"); // <- never happens

Thread.sleep(20);

counter.decrementAndGet();
})
.subscribe();

for (int i = 0; i < 10000; i++) {
Thread.sleep(10);
testSubject.onNext(i);
}

无论我如何更改此示例 - 除非我使用 .toFlowable(...).parallel().runOn(...) 进行硬核处理,否则我不会看到 doOnNext 同时在不同线程上运行。

我想依靠这个特性,这样我就可以忽略我的运算符(operator)中的同步问题,但是我从来没有在 RxJava2、RxJava1 甚至一般的 RX 的文档中看到它明确指定。也许我只是错过了它,谁能告诉我契约(Contract)的这一部分在哪里描述?

谢谢!

最佳答案

来自 http://reactivex.io/documentation/contract.html

Observables must issue notifications to observers serially (not in parallel). They may issue these notifications from different threads, but there must be a formal happens-before relationship between the notifications.

在您的示例中,您没有违反此可观察的契约(Contract)。但是如果你实现 Observable 错误,两个线程将同时运行:

AtomicInteger counter = new AtomicInteger();

Observable.create(emitter -> {
new Thread(() -> {
for (int i = 0; i < 10000; i++) {
try {
Thread.sleep(1);
emitter.onNext(i);
} catch (InterruptedException e1) {
e1.printStackTrace();
}
}
}).start();
for (int i = 0; i < 10000; i++) {
Thread.sleep(1);
emitter.onNext(i);

}
}).doOnNext(integer -> {
if (counter.incrementAndGet() > 1)
System.out.println("Whoaa!");
counter.decrementAndGet();
Thread.sleep(1);

}).subscribe();

似乎您可以使用 observeOn 修复此行为 https://github.com/ReactiveX/RxJava/issues/5550#issuecomment-325114185

关于java - RX Java 2 顺序处理契约(Contract),我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/57161315/

30 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com