- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我在相同的通量上使用 publishOn 和 subscribeOn ,如下所示:
System.out.println("*********Calling Concurrency************");
List<Integer> elements = new ArrayList<>();
Flux.just(1, 2, 3, 4)
.map(i -> i * 2)
.log()
.publishOn(Schedulers.elastic())
.subscribeOn(Schedulers.parallel())
.subscribe(elements::add);
System.out.println("-------------------------------------");
*********Calling Concurrency************
[info] | onSubscribe([Synchronous Fuseable] FluxArray.ArraySubscription)
[info] | request(256)
[info] | onNext(1)
[info] | onNext(2)
[info] | onNext(3)
[info] | onNext(4)
[info] | onComplete()
-------------------------------------
最佳答案
我花了一些时间才理解它,也许是因为publishOn
通常在 subscribeOn
之前解释,这是一个希望更简单的外行解释。subscribeOn
表示运行初始源发射,例如 subscribe(), onSubscribe() and request()
在指定的调度程序工作人员(其他线程)上,对于任何后续操作也相同,例如 onNext/onError/onComplete, map etc
并且无论 subscribeOn() 的位置如何,都会发生这种行为
如果你没有做任何publishOn
在流利的调用中就是这样,一切都将在这样的线程上运行。
但是,只要您调用 publishOn()
假设在中间,那么任何后续运算符(operator)调用都将在提供的调度程序工作人员上运行到这样的 publishOn()
.
这是一个例子
Consumer<Integer> consumer = s -> System.out.println(s + " : " + Thread.currentThread().getName());
Flux.range(1, 5)
.doOnNext(consumer)
.map(i -> {
System.out.println("Inside map the thread is " + Thread.currentThread().getName());
return i * 10;
})
.publishOn(Schedulers.newElastic("First_PublishOn()_thread"))
.doOnNext(consumer)
.publishOn(Schedulers.newElastic("Second_PublishOn()_thread"))
.doOnNext(consumer)
.subscribeOn(Schedulers.newElastic("subscribeOn_thread"))
.subscribe();
1 : subscribeOn_thread-4
Inside map the thread is subscribeOn_thread-4
2 : subscribeOn_thread-4
Inside map the thread is subscribeOn_thread-4
10 : First_PublishOn()_thread-6
3 : subscribeOn_thread-4
Inside map the thread is subscribeOn_thread-4
20 : First_PublishOn()_thread-6
4 : subscribeOn_thread-4
10 : Second_PublishOn()_thread-5
30 : First_PublishOn()_thread-6
20 : Second_PublishOn()_thread-5
Inside map the thread is subscribeOn_thread-4
30 : Second_PublishOn()_thread-5
5 : subscribeOn_thread-4
40 : First_PublishOn()_thread-6
Inside map the thread is subscribeOn_thread-4
40 : Second_PublishOn()_thread-5
50 : First_PublishOn()_thread-6
50 : Second_PublishOn()_thread-5
doOnNext()
和以下
map()
在名为
subscribeOn_thread
的线程上运行,直到任何
publishOn()
调用,然后任何后续调用将在提供的调度程序上运行到该
publishOn()
这将再次发生在任何后续调用中,直到有人调用另一个
publishOn()
.
关于reactive-programming - Project Reactor 3 中的 publishOn 与 subscribeOn,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/48073315/
publishOn 和parallel 有什么区别?据我了解,publishOn 通过使用几个线程执行操作符链来影响操作符链的进一步处理。但是 parallel 似乎和它的工作原理完全一样。默认情况下
我无法从文档中清楚地了解 publishOn()(或 observeOn() 在 RxJava 的情况下)运算符如何在线程亲和性方面工作。我以为这个运算符保证任何订阅者都将在同一个线程中处理,但下面的
每当我同时使用 subscribeOn 和 publishOn 时,都不会打印任何内容。 如果我只使用一个,它将打印。 如果我使用 subscribeOn(Schedulers.immediate()
Stream中的并行处理非常简单,只要加上parallel(),就可以将stream并行化: @Test public void streamParallel () { Str
我正在使用 Project Reactor对于我正在写的新服务。我正在使用 Spring 5与 Netty .我正在与一堆不同的服务和关系数据库进行交互。所有这些服务都有一个阻塞的客户端和 JDBC也
当我将 Reactive Streams ( https://github.com/reactor/reactor-core ) 与自定义 Publisher 一起使用时结合 publishOn功能,
我在相同的通量上使用 publishOn 和 subscribeOn ,如下所示: System.out.println("*********Calling Concurrency******
我是一名优秀的程序员,十分优秀!