gpt4 book ai didi

java - RxJava : subscribeOn and observeOn not working as expected

转载 作者:行者123 更新时间:2023-12-01 16:50:13 28 4
gpt4 key购买 nike

也许我只是真正了解 subscribeOnobserveOn 的内部工作原理,但我最近遇到了一些非常奇怪的事情。我的印象是,subscribeOn 决定了调度程序最初开始处理的位置(特别是当我们有很多map 会改变数据流时)然后可以在这些映射之间的任何位置使用observeOn在适当的时候更改调度程序(首先进行网络,然后计算,最后更改UI线程)。

但是,我注意到,当不直接将这些调用链接到我的 Observable 或 Single 时,它​​将不起作用。这是一个最小的 JUnit 测试工作示例:

import org.junit.Test;
import rx.Single;
import rx.schedulers.Schedulers;

public class SubscribeOnTest {

@Test public void not_working_as_expected() throws Exception {
Single<Integer> single = Single.<Integer>create(singleSubscriber -> {
System.out.println("Doing some computation on thread " + Thread.currentThread().getName());
int i = 1;
singleSubscriber.onSuccess(i);
});
single.subscribeOn(Schedulers.computation()).observeOn(Schedulers.io());

single.subscribe(integer -> {
System.out.println("Observing on thread " + Thread.currentThread().getName());
});
System.out.println("Doing test on thread " + Thread.currentThread().getName());
Thread.sleep(1000);
}

@Test public void working_as_expected() throws Exception {
Single<Integer> single = Single.<Integer>create(singleSubscriber -> {
System.out.println("Doing some computation on thread " + Thread.currentThread().getName());
int i = 1;
singleSubscriber.onSuccess(i);
}).subscribeOn(Schedulers.computation()).observeOn(Schedulers.io());

single.subscribe(integer -> {
System.out.println("Observing on thread " + Thread.currentThread().getName());
});
System.out.println("Doing test on thread " + Thread.currentThread().getName());
Thread.sleep(1000);
}
}

测试 not_working_as_expected() 给出以下输出

Doing some computation on thread main
Observing on thread main
Doing test on thread main

working_as_expected()给了我

Doing some computation on thread RxComputationScheduler-1
Doing test on thread main
Observing on thread RxIoScheduler-2

唯一的区别是,在第一个测试中,在创建 Single 之后有一个分号,然后才应用调度程序,并且在工作示例中,方法调用直接链接到 Single 的创建。但这不应该是无关紧要的吗?

最佳答案

运算符(operator)执行的所有“修改”都是不可变,这意味着它们返回一个新流,该新流以与前一个流不同的方式接收通知。由于您刚刚调用了 subscribeOnobserveOn 运算符并且没有存储其结果,因此稍后进行的订阅位于未更改的流上。

旁注:我不太明白您对 subscribeOn 行为的定义。如果您的意思是 map 运算符(operator)会受到某种影响,那么事实并非如此。 subscribeOn 定义一个 Scheduler,在该 Scheduler 上调用 OnSubscribe 函数。在您的情况下,您传递给 create() 方法的函数。另一方面,observeOn 定义了调度程序,每个连续的流(由应用的运算符返回的流)都在该调度程序上处理来自上游的排放。

关于java - RxJava : subscribeOn and observeOn not working as expected,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/41789270/

28 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com