gpt4 book ai didi

java - 为什么 subscribeOn 方法不切换上下文?

转载 作者:行者123 更新时间:2023-12-03 20:19:42 26 4
gpt4 key购买 nike

我的这个测试基于 4.5.2. The subscribeOn Method (忽略缺少的 new Thread(...) 部分):

    @Test
public void theSubscribeOnMethod() throws InterruptedException {
Scheduler s = Schedulers.newParallel("parallel-scheduler", 4);

final Flux<String> flux = Flux
.range(1, 2)
.map(i -> 10 + i)
.subscribeOn(s)
.doOnSubscribe(sub -> System.out.println(
"[doOnSubscribe] " + Thread.currentThread().getName()))
.map(i -> Thread.currentThread().getName() + ", value " + i);

flux.subscribe(System.out::println);

Thread.sleep(1000);
}

打印此内容(注意 main 线程 [doOnSubscribe] ):
[doOnSubscribe] main
parallel-scheduler-1, value 11
parallel-scheduler-1, value 12

刚搬家 subscribeOndoOnSubscribe产生这个结果(从我的角度来看是正确的):
[doOnSubscribe] parallel-scheduler-1
parallel-scheduler-1, value 11
parallel-scheduler-1, value 12

根据 Flux.subscribeOn文档:

Run subscribe, onSubscribe and request on a specified Scheduler's Scheduler.Worker. As such, placing this operator anywhere in the chain will also impact the execution context of onNext/onError/onComplete signals from the beginning of the chain up to the next occurrence of a publishOn.



所以在这个例子中我期望 doOnSubscribe发生在 parallel-scheduler线程而不是 main尽管我放的地方 subscribeOn .

这是一个错误吗?为什么会发生这种情况,我该如何解决?

PS:使用 reactor-core:3.1.0-RELEASE

最佳答案

这不是一个错误。这是预期的行为。这是事件的顺序,乍一看可能会令人困惑。

我稍微修改了你的例子:

Scheduler s = Schedulers.newParallel("parallel-scheduler", 4);

final Flux<String> flux = Flux
.range(1, 2)
.map(i -> 10 + i)
.subscribeOn(s)
.doOnNext(value -> System.out.println(
"[doOnNext] " + Thread.currentThread().getName() + " value: " + value))
.doOnSubscribe(sub -> System.out.println(
"[doOnSubscribe] " + Thread.currentThread().getName()));

flux.subscribe();

Thread.sleep(1000);

输出:
[doOnSubscribe] main
[doOnNext] parallel-scheduler-1 value: 11
[doOnNext] parallel-scheduler-1 value: 12

当您进行命令式思考时,您会期望运算符按声明顺序被调用,因此在另一个之下的一个将在以后被调用。这通常也适用于函数式/响应式(Reactive)编程,但情况并非总是如此。

在上面的例子中 doOnSubscribe实际上在管道中的其他任何东西之前运行。运营商首先收到有关订阅的通知,然后将订阅转发给其上游运营商,后者将其转发给其上游,依此类推……

所以事件发生的顺序是:
  • flux.subscribe()订阅管道并触发主线程上的执行
  • doOnSubscribe获取有关订阅的通知并在主线程上触发提供的使用者,然后将订阅转发到其上游
  • doOnNext收到有关订阅的通知,但它只对 onNext 感兴趣事件,因此除了将订阅转发到其上游(在主线程上)之外,它还没有做任何事情(!)
  • subscribeOn(s) - 上下文切换发生,此后的每个事件都在提供的调度程序的上下文中发生
  • 订阅转发给上游运营商:第一个 map然后 range , 都在平行线程
  • range是源,所以它发出第一项:onNext(1)平行线程
  • maponNext 上触发事件,进行转换并将转换后的元素作为 onNext 发送并行线程上的事件
  • subscribeOn除了转发 onNext 之外什么都不做 Activity
  • doOnNext消费者被触发,因为它收到了 onNext事件,仍在并行线程上
  • doOnSubscribe忽略 onNext事件,只是将事件转发给下一个运算符(operator)
  • onNext(2) 重复了 6-10 个事件项目
  • 流完成

  • TLDR :订阅事件从底部操作符传播到顶部操作符( doOnSubscribesusbcribeOn 在此传播过程中按此顺序触发),之后实际数据从顶部源操作符( range)反向流动) 至底部。

    doOnSubscribe之前运行 susbcribeOn ,它仍然会在主线程上运行。

    关于java - 为什么 subscribeOn 方法不切换上下文?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/59553733/

    26 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com