gpt4 book ai didi

java - 为什么多个 RXJava Observables 没有发生并行执行?

转载 作者:行者123 更新时间:2023-12-02 02:45:33 27 4
gpt4 key购买 nike

我是 RxJava 新手,正在尝试从链接执行多个 Observables 并行执行的示例: RxJava Fetching Observables In Parallel

虽然上面链接中提供的示例是并行执行 Observable,但是当我在 forEach 方法中添加 Thread.sleep(TIME_IN_MILLISECONDS) 时,系统开始一次执行一个 Observable。请帮助我理解为什么 Thread.sleep 停止 Observables 的并行执行。

下面是修改后的示例,它导致多个可观察对象同步执行:

import rx.Observable;
import rx.Subscriber;
import rx.schedulers.Schedulers;

public class ParallelExecution {

public static void main(String[] args) {
System.out.println("------------ mergingAsync");
mergingAsync();
}

private static void mergingAsync() {
Observable.merge(getDataAsync(1), getDataAsync(2)).toBlocking()
.forEach(x -> { try{Thread.sleep(4000);}catch(Exception ex){};
System.out.println(x + " " + Thread.currentThread().getId());});
}

// artificial representations of IO work
static Observable<Integer> getDataAsync(int i) {
return getDataSync(i).subscribeOn(Schedulers.io());
}

static Observable<Integer> getDataSync(int i) {
return Observable.create((Subscriber<? super Integer> s) -> {
// simulate latency
try {
Thread.sleep(1000);
} catch (Exception e) {
e.printStackTrace();
}
s.onNext(i);
s.onCompleted();
});
}
}

在上面的示例中,我们使用 observable 的 subscribeOn 方法并提供一个 ThreadPool(Schedules.io) 来执行,因此每个 Observable 的订阅将在单独的线程上进行。

Thread.sleep 有可能锁定线程之间的任何共享对象,但我仍然不清楚。请帮忙。

最佳答案

实际上,您的示例并行执行确实发生了,您只是错误地看待它,执行工作的位置和发出通知的位置之间存在差异。

如果你将带有线程ID的日志放在Observable.create中,你会注意到每个Observable同时在不同的线程中执行。但通知是连续发生的。这种行为符合 Observable 合约的预期,即 observable 必须串行(而不是并行)向观察者发出通知。

关于java - 为什么多个 RXJava Observables 没有发生并行执行?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/44622541/

27 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com