gpt4 book ai didi

java - Reactor Project中的Operator "publishOn()"和thread affinity

转载 作者:行者123 更新时间:2023-12-03 12:47:53 35 4
gpt4 key购买 nike

我无法从文档中清楚地了解 publishOn()(或 observeOn() 在 RxJava 的情况下)运算符如何在线程亲和性方面工作。我以为这个运算符保证任何订阅者都将在同一个线程中处理,但下面的例子打破了我的理解:

Flux<String> started = ep.publishOn(scheduler);
Flux<String> afterStateUpdate = started.doOnNext(e -> {
System.out.println("STATE UPDATE : " + Thread.currentThread().getId());
state.add(e);
}).share();
afterStateUpdate.subscribe();
ep.onNext("1");
ep.onNext("2");
ep.onNext("3");

afterStateUpdate.subscribe(e -> {
System.out.println("MAIN SUBSCRIBER : " + Thread.currentThread().getId());
});
ep.onNext("4");
ep.onNext("5");
ep.onNext("6");

结果我看到以下输出:

STATE UPDATE : 12
STATE UPDATE : 12
STATE UPDATE : 12
STATE UPDATE : 13
MAIN SUBSCRIBER : 13
STATE UPDATE : 13
MAIN SUBSCRIBER : 13
STATE UPDATE : 13
MAIN SUBSCRIBER : 13

这意味着“state updater”一直在线程 12 中工作,但是当第二个订阅者订阅时“state updater”开始在线程 13 中工作。

那么,问题是在这种情况下我如何保证订阅者的线程亲和性?

最佳答案

我发现只有一种方法可以 100% 确定一个订阅者使用同一个线程。它是创建一个像这样的单一调度器池:

Scheduler[] schedulers = new Schedulers[10];
for (int i = 0; i < 10; i++) {
schedulers[i] = Schedulers.newSingle();
}

public Flux<T> wrapIntoSingleThread(Scheduler scheduler) {
return this.flux.publishOn(scheduler);
}

然后为每个订阅者从这个池中获取一个调度器:wrapIntoSingleThread(schedulers[i++ % schedulers.length]).subscribe(...)

如果有人知道提供线程关联的其他方式 - 请纠正我。

关于java - Reactor Project中的Operator "publishOn()"和thread affinity,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/43226924/

35 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com