gpt4 book ai didi

android - RxJava2 .subscribeOn .observeOn 困惑。在主线程上运行

转载 作者:塔克拉玛干 更新时间:2023-11-02 23:48:02 24 4
gpt4 key购买 nike

我有一个调用 Web 服务的方法,我认为它在 IO 线程上运行,直到服务停止并且 UI 卡住。

所以我开始了一些简单的测试来检查线程

implementation 'io.reactivex.rxjava2:rxandroid:2.0.1'
implementation 'io.reactivex.rxjava2:rxjava:2.1.8'


public void test() {

disposableRx.add(
Observable.just(1, 2)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.doOnNext(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
System.out.println("Emitting item on: " + Thread.currentThread().getName());
}
})
.map(new Function<Integer, Integer>() {
@Override
public Integer apply(@NonNull Integer integer) throws Exception {
System.out.println("Processing item on: " + Thread.currentThread().getName());
return integer * 2;
}
})

.subscribeWith(new DisposableObserver<Integer>() {
@Override
public void onNext(@NonNull Integer integer) {
System.out.println("Consuming item on: " + Thread.currentThread().getName());
}

@Override
public void onError(@NonNull Throwable e) {
}

@Override
public void onComplete() {
}
})
);
}

是否导致以下输出表明一切都在主线程上运行,尽管有订阅和观察?

Emitting item on: main
Processing item on: main
Consuming item on: main
Emitting item on: main
Processing item on: main
Consuming item on: main

但是如果我将 observeOn 移动到 .subscribeWith 之前,如下所示...

public void test() {

disposableRx.add(
Observable.just(1, 2)
.subscribeOn(Schedulers.io())
.doOnNext(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
System.out.println("Emitting item on: " + Thread.currentThread().getName());
}
})
.map(new Function<Integer, Integer>() {
@Override
public Integer apply(@NonNull Integer integer) throws Exception {
System.out.println("Processing item on: " + Thread.currentThread().getName());
return integer * 2;
}
})
.observeOn(AndroidSchedulers.mainThread())
.subscribeWith(new DisposableObserver<Integer>() {
@Override
public void onNext(@NonNull Integer integer) {
System.out.println("Consuming item on: " + Thread.currentThread().getName());
}

@Override
public void onError(@NonNull Throwable e) {
}

@Override
public void onComplete() {
}
})
);
}

输出是我正在寻找的,我必须说即使在阅读了很多关于 RxJava 的博客之后我仍然感到困惑。

Emitting item on: RxCachedThreadScheduler-1
Processing item on: RxCachedThreadScheduler-1
Emitting item on: RxCachedThreadScheduler-1
Processing item on: RxCachedThreadScheduler-1
Consuming item on: main
Consuming item on: main

我已经剥离了我原来的方法,直到它几乎是博客文章中示例方法的副本

Multithreading like a boss

这意味着这应该在 IO 线程上运行 loadPerson(),在主线程上发出。它没有。

disposableRx.add(
repo.loadPersonProfile(id).subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribeWith(new DisposableMaybeObserver<String>() {

@Override
public void onSuccess(@NonNull String response) {
loadPersonDetailsResponse.setValue(ViewModelResponse.success(response));
isLoading.setValue(false);
}

@Override
public void onError(@NonNull Throwable e) {
loadPersonDetailsResponse.setValue(ViewModelResponse.error(e));
isLoading.setValue(false);
}

@Override
public void onComplete() {

}
})
);

从我的方法中转出线程表明它正在主线程上运行?

这是什么原因造成的?

最佳答案

放置 observeOn()sunbscribeOn() 以及其他运算符的顺序非常重要

  • subscribeOn() 运算符告诉源 Observable 在哪个线程上发射和转换项目。

  • 请小心放置 observeOn() 运算符的位置,因为它会发生变化执行工作的线程!在大多数情况下,您可能想延迟切换到观察线程,直到 Rx 链的末尾。

    Observable.just("long", "longer", "longest")
    .subscribeOn(Schedulers.computation())
    .observeOn(AndroidSchedulers.mainThread())
    .map(String::length)
    .filter(length -> length == 6)
    .subscribe(length -> System.out.println("item length " + length));

这里的映射、过滤和消费是在主线程中执行的

  • observeOn()map() 之前没有理由在 map() 运算符之上应用 observeOn() 运算符。事实上,这段代码会导致 NetworkOnMainThreadException!我们不想在主线程上读取 HTTP 响应 —— 应该在我们切换回主线程之前完成。

  • 您也可以使用多个 observeOn() 来切换线程,就像这个例子。

     Observable.just("long", "longer", "longest")
    .doOnNext(s -> System.out.println("first doOnNext: processing item on thread " + Thread.currentThread().getName()))
    .observeOn(Schedulers.computation())
    .map(String::toString)
    .doOnNext(s -> System.out.println("second doOnNext: processing item on thread " + Thread.currentThread().getName()))
    .observeOn(Schedulers.io())
    .map(String::toString)
    .subscribeOn(Schedulers.newThread())
    .map(String::length)
    .subscribe(length -> System.out.println("received item length " + length + " on thread " + Thread.currentThread().getName()));

输出:

first doOnNext: processing item on thread RxNewThreadScheduler-1
first doOnNext: processing item on thread RxNewThreadScheduler-1
first doOnNext: processing item on thread RxNewThreadScheduler-1
second doOnNext: processing item on thread RxComputationThreadPool-1
second doOnNext: processing item on thread RxComputationThreadPool-1
second doOnNext: processing item on thread RxComputationThreadPool-1
received item length 4 on thread RxCachedThreadScheduler-1
received item length 6 on thread RxCachedThreadScheduler-1
received item length 7 on thread RxCachedThreadScheduler-1

注意根据这个answer subscribeOn() 不适用于下游运算符,因此它不能保证您的操作将在不同的线程上进行。

subscribeOn effects go upstream and closer to the source of events.

关于你的问题,我已经做了测试,结果如下

   private void testRxJava2Async() {
io.reactivex.Observable.fromCallable(new Callable<String>() {
@Override
public String call() throws Exception {

Log.d(TAG,"callable (expensive assync method) was called on --> "+Thread.currentThread().getName());

return null;
}
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribeWith(new Observer<String>() {


@Override
public void onSubscribe(Disposable d) {

}

@Override
public void onNext(String s) {

Log.d(TAG,"onNext() was called on --> "+Thread.currentThread().getName());

}

@Override
public void onError(Throwable e) {

}

@Override
public void onComplete() {

}
});
}

测试结果:

callable (expensive assync method) was called on --> RxCachedThreadScheduler-1
onNext() was called on--> main

关于android - RxJava2 .subscribeOn .observeOn 困惑。在主线程上运行,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/48339584/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com