gpt4 book ai didi

java - RxJava 调度程序不会通过 sleep 更改线程

转载 作者:行者123 更新时间:2023-11-30 08:31:38 25 4
gpt4 key购买 nike

我面临着我无法理解的非常奇怪的 RxJava 行为。

假设我想并行处理元素。我正在为此使用 flatMap:

public static void log(String msg) {
String threadName = Thread.currentThread().getName();
System.out.println(String.format("%s - %s", threadName, msg));
}

public static void sleep(int ms) {
try {
Thread.sleep(ms);
} catch (InterruptedException e) {
e.printStackTrace();
}
}

public static void main(String[] args) throws InterruptedException {

Scheduler sA = Schedulers.from(Executors.newFixedThreadPool(1));
Scheduler sB = Schedulers.from(Executors.newFixedThreadPool(5));

Observable.create(s -> {
while (true) {
log("start");
s.onNext(Math.random());
sleep(10);
}
}).subscribeOn(sA)
.flatMap(r -> Observable.just(r).subscribeOn(sB))
.doOnNext(r -> log("process"))
.subscribe((r) -> log("finish"));
}

输出非常可预测:

pool-1-thread-1 - start
pool-2-thread-1 - process
pool-2-thread-1 - finish
pool-1-thread-1 - start
pool-2-thread-2 - process
pool-2-thread-2 - finish
pool-1-thread-1 - start
pool-2-thread-3 - process
pool-2-thread-3 - finish

好吧,但是如果我在 flatMap 并行化调度程序停止更改线程后将 n > 10 的 sleep 添加到映射。

public static void main(String[] args) throws InterruptedException {

Scheduler sA = Schedulers.from(Executors.newFixedThreadPool(1));
Scheduler sB = Schedulers.from(Executors.newFixedThreadPool(5));

Observable.create(s -> {
while (true) {
log("start");
s.onNext(Math.random());
sleep(10);
}
}).subscribeOn(sA)
.flatMap(r -> Observable.just(r).subscribeOn(sB))
.doOnNext(r -> sleep(15))
.doOnNext(r -> log("process"))
.subscribe((r) -> log("finish"));
}

什么给出了以下内容:

pool-1-thread-1 - start
pool-1-thread-1 - start
pool-2-thread-1 - process
pool-2-thread-1 - finish
pool-1-thread-1 - start
pool-1-thread-1 - start
pool-2-thread-1 - process
pool-2-thread-1 - finish
pool-1-thread-1 - start
pool-2-thread-1 - process

为什么???为什么在 flatMap 之后所有元素都在同一个线程 (pool-2-thread-1) 中处理?

最佳答案

FlatMap 将所有并行任务序列化回单个线程,您正在查看该线程。试试这个吧

public static void main(String[] args) throws InterruptedException {

Scheduler sA = Schedulers.from(Executors.newFixedThreadPool(1));
Scheduler sB = Schedulers.from(Executors.newFixedThreadPool(5));

Observable.create(s -> {
while (!s.isUnsubscribed()) {
log("start");
s.onNext(Math.random());
sleep(10);
}
}).subscribeOn(sA)
.flatMap(r ->
Observable.just(r)
.subscribeOn(sB)
.doOnNext(r -> sleep(15))
.doOnNext(r -> log("process"))
)
.subscribe((r) -> log("finish"));
}

关于java - RxJava 调度程序不会通过 sleep 更改线程,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/40559085/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com