gpt4 book ai didi

java - RxJava .subscribeOn(Schedulers.newThread()) 问题

转载 作者:行者123 更新时间:2023-11-29 06:58:08 25 4
gpt4 key购买 nike

我在普通的 JDK 8 上。我有这个简单的 RxJava 示例:

Observable
.from(Arrays.asList("one", "two", "three"))
.doOnNext(word -> System.out.printf("%s uses thread %s%n", word, Thread.currentThread().getName()))
//.subscribeOn(Schedulers.newThread())
.subscribe(word -> System.out.println(word));

它逐行打印出单词,并与有关线程的信息交织在一起,正如预期的那样,这是所有下一次调用的“主要”。

但是,当我取消对 subscribeOn(Schedulers.newThread()) 调用的注释时,根本不会打印任何内容。为什么它不起作用?我原以为它会为每个 onNext() 调用和 doOnNext() 启动一个新线程来打印该线程的名称。现在,我什么也没看到,对于其他调度程序也是如此。

当我在 main 的末尾添加对 Thread.sleep(10000L) 的调用时,我可以看到输出,这表明 RxJava 使用的线程都是守护进程。是这样吗?这可以以某种方式改变,但使用自定义 ThreadFactory 或类似概念,而不必实现自定义调度程序吗?

通过上述更改,线程名称始终是 RxNewThreadScheduler-1,而 newThread 的文档说“Scheduler that creates a new {@link Thread} for each工作单元”。难道不应该为所有排放创建一个新线程吗?

最佳答案

正如 Vladimir 所提到的,RxJava 标准调度程序在守护线程上运行,在您的示例中由于主线程退出而终止。我想强调的是,他们不会在新线程上安排每个值,但他们会在新创建的线程上为每个单独的订阅者安排值流。第二次订阅会给你“RxNewThreadScheduler-2”。

您实际上不需要更改默认调度程序,只需使用 Schedulers.from() 包装您自己的基于执行器的调度程序,并在需要时将其作为参数提供:

ThreadPoolExecutor exec = new ThreadPoolExecutor(
0, 64, 2, TimeUnit.SECONDS, new LinkedBlockingQueue<>());
exec.allowCoreThreadTimeOut(true);

Scheduler s = Schedulers.from(exec);

Observable
.from(Arrays.asList("one", "two", "three"))
.doOnNext(word -> System.out.printf("%s uses thread %s%n", word,
Thread.currentThread().getName()))
.subscribeOn(s)
.subscribe(word -> System.out.println(word));

我有一系列 blog posts关于 RxJava 调度程序应该帮助您实现“更永久”的变体。

关于java - RxJava .subscribeOn(Schedulers.newThread()) 问题,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/30791902/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com