- iOS/Objective-C 元类和类别
- objective-c - -1001 错误,当 NSURLSession 通过 httpproxy 和/etc/hosts
- java - 使用网络类获取 url 地址
- ios - 推送通知中不播放声音
我有一个调用 Web 服务的方法,我认为它在 IO 线程上运行,直到服务停止并且 UI 卡住。
所以我开始了一些简单的测试来检查线程
implementation 'io.reactivex.rxjava2:rxandroid:2.0.1'
implementation 'io.reactivex.rxjava2:rxjava:2.1.8'
public void test() {
disposableRx.add(
Observable.just(1, 2)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.doOnNext(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
System.out.println("Emitting item on: " + Thread.currentThread().getName());
}
})
.map(new Function<Integer, Integer>() {
@Override
public Integer apply(@NonNull Integer integer) throws Exception {
System.out.println("Processing item on: " + Thread.currentThread().getName());
return integer * 2;
}
})
.subscribeWith(new DisposableObserver<Integer>() {
@Override
public void onNext(@NonNull Integer integer) {
System.out.println("Consuming item on: " + Thread.currentThread().getName());
}
@Override
public void onError(@NonNull Throwable e) {
}
@Override
public void onComplete() {
}
})
);
}
是否导致以下输出表明一切都在主线程上运行,尽管有订阅和观察?
Emitting item on: main
Processing item on: main
Consuming item on: main
Emitting item on: main
Processing item on: main
Consuming item on: main
但是如果我将 observeOn 移动到 .subscribeWith 之前,如下所示...
public void test() {
disposableRx.add(
Observable.just(1, 2)
.subscribeOn(Schedulers.io())
.doOnNext(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
System.out.println("Emitting item on: " + Thread.currentThread().getName());
}
})
.map(new Function<Integer, Integer>() {
@Override
public Integer apply(@NonNull Integer integer) throws Exception {
System.out.println("Processing item on: " + Thread.currentThread().getName());
return integer * 2;
}
})
.observeOn(AndroidSchedulers.mainThread())
.subscribeWith(new DisposableObserver<Integer>() {
@Override
public void onNext(@NonNull Integer integer) {
System.out.println("Consuming item on: " + Thread.currentThread().getName());
}
@Override
public void onError(@NonNull Throwable e) {
}
@Override
public void onComplete() {
}
})
);
}
输出是我正在寻找的,我必须说即使在阅读了很多关于 RxJava 的博客之后我仍然感到困惑。
Emitting item on: RxCachedThreadScheduler-1
Processing item on: RxCachedThreadScheduler-1
Emitting item on: RxCachedThreadScheduler-1
Processing item on: RxCachedThreadScheduler-1
Consuming item on: main
Consuming item on: main
我已经剥离了我原来的方法,直到它几乎是博客文章中示例方法的副本
这意味着这应该在 IO 线程上运行 loadPerson(),在主线程上发出。它没有。
disposableRx.add(
repo.loadPersonProfile(id).subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribeWith(new DisposableMaybeObserver<String>() {
@Override
public void onSuccess(@NonNull String response) {
loadPersonDetailsResponse.setValue(ViewModelResponse.success(response));
isLoading.setValue(false);
}
@Override
public void onError(@NonNull Throwable e) {
loadPersonDetailsResponse.setValue(ViewModelResponse.error(e));
isLoading.setValue(false);
}
@Override
public void onComplete() {
}
})
);
从我的方法中转出线程表明它正在主线程上运行?
这是什么原因造成的?
最佳答案
放置 observeOn()
和 sunbscribeOn()
以及其他运算符的顺序非常重要。
subscribeOn()
运算符告诉源 Observable 在哪个线程上发射和转换项目。
请小心放置 observeOn() 运算符的位置,因为它会发生变化执行工作的线程!在大多数情况下,您可能想延迟切换到观察线程,直到 Rx 链的末尾。
Observable.just("long", "longer", "longest")
.subscribeOn(Schedulers.computation())
.observeOn(AndroidSchedulers.mainThread())
.map(String::length)
.filter(length -> length == 6)
.subscribe(length -> System.out.println("item length " + length));
这里的映射、过滤和消费是在主线程中执行的
observeOn()
在 map()
之前没有理由在 map() 运算符之上应用 observeOn() 运算符。事实上,这段代码会导致 NetworkOnMainThreadException!我们不想在主线程上读取 HTTP 响应 —— 应该在我们切换回主线程之前完成。
您也可以使用多个 observeOn() 来切换线程,就像这个例子。
Observable.just("long", "longer", "longest")
.doOnNext(s -> System.out.println("first doOnNext: processing item on thread " + Thread.currentThread().getName()))
.observeOn(Schedulers.computation())
.map(String::toString)
.doOnNext(s -> System.out.println("second doOnNext: processing item on thread " + Thread.currentThread().getName()))
.observeOn(Schedulers.io())
.map(String::toString)
.subscribeOn(Schedulers.newThread())
.map(String::length)
.subscribe(length -> System.out.println("received item length " + length + " on thread " + Thread.currentThread().getName()));
输出:
first doOnNext: processing item on thread RxNewThreadScheduler-1
first doOnNext: processing item on thread RxNewThreadScheduler-1
first doOnNext: processing item on thread RxNewThreadScheduler-1
second doOnNext: processing item on thread RxComputationThreadPool-1
second doOnNext: processing item on thread RxComputationThreadPool-1
second doOnNext: processing item on thread RxComputationThreadPool-1
received item length 4 on thread RxCachedThreadScheduler-1
received item length 6 on thread RxCachedThreadScheduler-1
received item length 7 on thread RxCachedThreadScheduler-1
注意根据这个answer subscribeOn()
不适用于下游运算符,因此它不能保证您的操作将在不同的线程上进行。
subscribeOn
effects go upstream and closer to the source of events.
关于你的问题,我已经做了测试,结果如下
private void testRxJava2Async() {
io.reactivex.Observable.fromCallable(new Callable<String>() {
@Override
public String call() throws Exception {
Log.d(TAG,"callable (expensive assync method) was called on --> "+Thread.currentThread().getName());
return null;
}
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribeWith(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(String s) {
Log.d(TAG,"onNext() was called on --> "+Thread.currentThread().getName());
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
}
测试结果:
callable (expensive assync method) was called on --> RxCachedThreadScheduler-1
onNext() was called on--> main
关于android - RxJava2 .subscribeOn .observeOn 困惑。在主线程上运行,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/48339584/
我有一个 View 模型,它在与 UI 线程分开的线程上触发各种对象的可观察对象。同时,有订阅这些可观察对象的 View 代码,需要更新 UI 线程上的控件。 就编码风格而言,使用 ObserveOn
我是 RX 的新手,正在尝试一些示例,关于为什么订阅中的 Console.writeLine 没有被调用有什么想法吗? var obs = Observable.Create(i => {
在 Scala 中,我编写了两个 MongoDB 可观察对象,并在传递自定义执行上下文时调用了 observeOn。在第一个可观察对象上调用 observeOn,但自定义执行上下文不会传播到第二个可观
我正在尝试使用 Rx 并行处理项目。看来我不能告诉 Rx 并行运行我的观察者的 OnNext() 。下面是测试代码来演示 [Test] public void ObservableObserveOnN
我刚刚发现了 SubscribeOn,这让我想知道我是否应该使用它而不是 ObserveOn。谷歌带了我here和 here ,但两者都没有帮助我理解差异:它看起来非常微妙。 (在我的上下文中,我在非
基于阅读这个问题:What's the difference between SubscribeOn and ObserveOnObserveOn 设置代码在 Subscribe 处理程序中的位置被执
我有以下代码 Observable.just(10) .doOnTerminate(() -> Log.d("LOG", "ON TERMINATE"))
我正在尝试使用 .net Observable 类实现一个简单的观察者模式。我有这样的代码: Observable.FromEventPattern( Instance.User, "
我对在可观察对象上调用 subscribeOn 和 observeOn 方法的顺序有点困惑。我读了几篇文章,一个人说没关系,只是在他的例子中使用了东西,其他人说这很重要。所以这是我的问题: 例如: s
我正在开发华为 HarmonyOS 应用程序,我正在尝试使用 RxJava 为后台任务实现一个基类。我的问题是我不知道如何在主线程上观察。 在常规 Android 上,我会使用 AndroidSche
Scheduling and Threading Intro to Rx 部分说 the use of SubscribeOn and ObserveOn should only be invoked
也许我只是真正了解 subscribeOn 和 observeOn 的内部工作原理,但我最近遇到了一些非常奇怪的事情。我的印象是,subscribeOn 决定了调度程序最初开始处理的位置(特别是当我们
请看下面的代码: var obs = Observable.Start(() => LongRunningMethodToRetrieveData()); obs.Subscribe(x => M
基本上我的 Android 应用程序有一些元数据需要在不同的情况下报告给后端服务器: data class SearchMetaData( val searchId: String?,
我已经声明: Subject mBehaviorSubject = BehaviorSubject.createDefault("default").toSerialized(); 似乎工作正常。但我
有人可以帮助解释为什么当我“阻塞并继续”观察者的 onNext 序列订阅了一个具有时间可观察序列的缓冲区时,Scheduler.NewThread 不再适用吗? 例如: 如果我通过缓冲一个数字序列 v
我遇到了一个问题,我的可观察对象在 IO 线程上订阅并在 android 主 (UI) 线程上观察,但 doFinally 运算符在 IO 线程上运行,它需要在 UI 线程上运行。 用例几乎和这个me
我有一个调用 Web 服务的方法,我认为它在 IO 线程上运行,直到服务停止并且 UI 卡住。 所以我开始了一些简单的测试来检查线程 implementation 'io.reactivex.rxja
当我使用 RxAndroid 和 .observeOn(AndroidSchedulers.mainThread()),并使用 Android Studio 在模拟器上运行测试时,整个测试运行崩溃:
我有一个像这样的简单流: Observable.error(Exception()).startWith(1).subscribe { println("Item is $it")
我是一名优秀的程序员,十分优秀!