gpt4 book ai didi

java - 如何使用 RxJava 2 创建异步事件总线?

转载 作者:行者123 更新时间:2023-11-30 06:10:58 25 4
gpt4 key购买 nike

使用 RXJava 2,我尝试创建一个异步事件总线。

我有一个单例对象,带有 PublishSubject 属性。发射器可以使用主题上的 onNext 将事件发送到总线。

如果订阅者有一个很长的任务要执行,我希望我的总线在多个线程上分派(dispatch)任务以并发执行这些任务。这意味着我希望在发出项目后立即开始该项目的工作,即使前一个项目的工作尚未完成。

但是,即使将 observeOnscheduler 一起使用,我也无法同时运行我的任务。

示例代码:

public void test() throws Exception {
Subject<Integer> busSubject = PublishSubject.<Integer>create().toSerialized();

busSubject.observeOn(Schedulers.computation())
.subscribe(new LongTaskConsumer());

for (int i = 1; i < 5; i++) {
System.out.println(i + " - event");
busSubject.onNext(i);
Thread.sleep(1000);
}
Thread.sleep(1000);
}

private static class LongTaskConsumer implements Consumer<Integer> {
@Override
public void accept(Integer i) throws Exception {
System.out.println(i + " - start work");
System.out.println(i + " - computation on thread " + Thread.currentThread().getName());
Thread.sleep(2000);
System.out.println(i + " - end work");
}
}

打印:

1 - event
1 - start work
1 - computation on thread RxComputationThreadPool-1
2 - event
3 - event
1 - end work
2 - start work
2 - computation on thread RxComputationThreadPool-1
4 - event
2 - end work
3 - start work
3 - computation on thread RxComputationThreadPool-1
3 - end work
4 - start work
4 - computation on thread RxComputationThreadPool-1
4 - end work

这意味着项目 2 上的工作会等待项目 1 上的工作结束,即使事件 2 已经发出。

最佳答案

当下面的调用发生时,会从 Schedulers.computation() 创建一个工作线程,并用于整个流。这就是为什么您提交的所有工作都是在 RxComputationThreadPool-1 上完成的。

busSubject.observeOn(Schedulers.computation())
.subscribe(new LongTaskConsumer());

要在多个线程上安排工作:

busSubject.flatMap(x ->
Flowable.just(x)
.subscribeOn(Schedulers.computation()
.doOnNext(somethingIntensive))
.subscribe(new LongTaskConsumer());

另请注意,密集型工作是在 flatMap 中执行的,而不是在 LongTaskConsumer 中执行的,因为所有项目都将串行到达 LongTaskConsumer

还有其他并行工作的方法,您可能需要研究这些方法,具体取决于有多少事件正在触发 PublishSubject

关于java - 如何使用 RxJava 2 创建异步事件总线?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/50243364/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com