gpt4 book ai didi

reactive-programming - Project Reactor 3 中的 publishOn 与 subscribeOn

转载 作者:行者123 更新时间:2023-12-05 00:49:13 33 4
gpt4 key购买 nike

我在相同的通量上使用 publishOn 和 subscribeOn ,如下所示:

    System.out.println("*********Calling Concurrency************");
List<Integer> elements = new ArrayList<>();
Flux.just(1, 2, 3, 4)
.map(i -> i * 2)
.log()
.publishOn(Schedulers.elastic())
.subscribeOn(Schedulers.parallel())
.subscribe(elements::add);
System.out.println("-------------------------------------");

虽然,当我同时使用两者时,日志中不会打印任何内容。
但是当我只使用 publishOn 时,我得到了以下信息日志:
*********Calling Concurrency************
[info] | onSubscribe([Synchronous Fuseable] FluxArray.ArraySubscription)
[info] | request(256)
[info] | onNext(1)
[info] | onNext(2)
[info] | onNext(3)
[info] | onNext(4)
[info] | onComplete()
-------------------------------------

是否比 subscribeOn 更推荐 publishOn?或者它比 subscribeOn 有更多的偏好?两者有什么区别,什么时候用哪个?

最佳答案

我花了一些时间才理解它,也许是因为publishOn通常在 subscribeOn 之前解释,这是一个希望更简单的外行解释。
subscribeOn表示运行初始源发射,例如 subscribe(), onSubscribe() and request()在指定的调度程序工作人员(其他线程)上,对于任何后续操作也相同,例如 onNext/onError/onComplete, map etc并且无论 subscribeOn() 的位置如何,都会发生这种行为

如果你没有做任何publishOn在流利的调用中就是这样,一切都将在这样的线程上运行。

但是,只要您调用 publishOn()假设在中间,那么任何后续运算符(operator)调用都将在提供的调度程序工作人员上运行到这样的 publishOn() .

这是一个例子

Consumer<Integer> consumer = s -> System.out.println(s + " : " + Thread.currentThread().getName());

Flux.range(1, 5)
.doOnNext(consumer)
.map(i -> {
System.out.println("Inside map the thread is " + Thread.currentThread().getName());
return i * 10;
})
.publishOn(Schedulers.newElastic("First_PublishOn()_thread"))
.doOnNext(consumer)
.publishOn(Schedulers.newElastic("Second_PublishOn()_thread"))
.doOnNext(consumer)
.subscribeOn(Schedulers.newElastic("subscribeOn_thread"))
.subscribe();

结果将是

1 : subscribeOn_thread-4
Inside map the thread is subscribeOn_thread-4
2 : subscribeOn_thread-4
Inside map the thread is subscribeOn_thread-4
10 : First_PublishOn()_thread-6
3 : subscribeOn_thread-4
Inside map the thread is subscribeOn_thread-4
20 : First_PublishOn()_thread-6
4 : subscribeOn_thread-4
10 : Second_PublishOn()_thread-5
30 : First_PublishOn()_thread-6
20 : Second_PublishOn()_thread-5
Inside map the thread is subscribeOn_thread-4
30 : Second_PublishOn()_thread-5
5 : subscribeOn_thread-4
40 : First_PublishOn()_thread-6
Inside map the thread is subscribeOn_thread-4
40 : Second_PublishOn()_thread-5
50 : First_PublishOn()_thread-6
50 : Second_PublishOn()_thread-5


如您所见,第一个 doOnNext()和以下 map()在名为 subscribeOn_thread 的线程上运行,直到任何 publishOn()调用,然后任何后续调用将在提供的调度程序上运行到该 publishOn()这将再次发生在任何后续调用中,直到有人调用另一个 publishOn() .

关于reactive-programming - Project Reactor 3 中的 publishOn 与 subscribeOn,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/48073315/

33 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com