gpt4 book ai didi

java - RxJava 调度器——线程行为和饥饿?

转载 作者:塔克拉玛干 更新时间:2023-11-02 19:17:57 26 4
gpt4 key购买 nike

我对 RxJava 的 observeOn()subscribeOn() 有双重看法。我知道他们不会在单个流上并行排放。换句话说,单个排放流只会放在一个线程上,对吗?我下面的测试似乎表明了这一点。我的理解也是你必须 flatMap() 一个调度器,如 .flatMap(v -> Observable.just(v).subscribeOn(Schedulers.computation())),以并行化单个流上的排放。

此外,如果是这种情况,那么调度程序是否会发生线程饥饿?如果我的计算调度程序有 5 个线程,但我有超过 5 个长期运行的异步流正在处理,是否有可能发生饥饿?或者这不太可能只是因为 RxJava 的性质?

public class Test {
public static void main(String[] args) {


Observable<String> airports = Observable.just("ABQ", "HOU",
"PHX", "DAL", "DFW", "AUS","SAN","LAX","JFK");


airports.subscribeOn(Schedulers.io()).map(Test::stall)
.subscribe(s -> System.out.println("Sub1 " + s +
" " + Thread.currentThread().getName()));

airports.subscribeOn(Schedulers.io()).map(Test::stall)
.subscribe(s -> System.out.println("Sub2 " + s +
" " + Thread.currentThread().getName()));

sleep();
}

private static String stall(String str) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}

return str;
}

private static void sleep() {
try {
Thread.sleep(20000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}

}

最佳答案

使用 flatMap,一个异步源可能会被其他异步源淹没,它无法在自己的源上取得进展。然而,在实践中,由于操作系统和 JVM 问题提供了足够的喘息空间,以及 flatMap 本身的背压和仲裁,我还没有看到这种情况发生。如果您担心这种压力过大,可以将 maxConcurrent 参数与 flatMap 重载一起使用,并限制并发订阅的数量。

RxJava 主要是以非阻塞方式编写的,因此当需要合并或组合源代码时,它们实际上并不相互等待。

计算调度器是一个单线程执行器池,并以循环方式分配给调用者。我不知道标准执行者的公平性。

关于java - RxJava 调度器——线程行为和饥饿?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/32515296/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com