gpt4 book ai didi

java - 消费观察者同时发出的值

转载 作者:搜寻专家 更新时间:2023-11-01 02:43:31 24 4
gpt4 key购买 nike

我正在学习使用 RxJava 进行响应式(Reactive)编程,并希望在单个执行线程中同时使用 emmited 值而不阻塞。

        Observable
.interval(50, TimeUnit.MILLISECONDS)
.take(5)
.subscribe(new Action1<Long>() {
@Override
public void call(Long counter) {
sleep(1000);
System.out.println("Got: " + counter + " thread : "+ Thread.currentThread().getName());
}
});

sleep(10000);

我会得到这个输出

Got: 0 thread : RxComputationThreadPool-1
Got: 1 thread : RxComputationThreadPool-1
Got: 2 thread : RxComputationThreadPool-1
Got: 3 thread : RxComputationThreadPool-1
Got: 4 thread : RxComputationThreadPool-1

我如何异步处理每个事件?像这样

Got: 0 thread : RxComputationThreadPool-1
Got: 1 thread : RxComputationThreadPool-2
Got: 2 thread : RxComputationThreadPool-3
Got: 3 thread : RxComputationThreadPool-4
Got: 4 thread : RxComputationThreadPool-5

最佳答案

在 Rx 中,一个 observable 代表并发性1,因此要相互并发处理通知,您必须将每个通知转换到一个 observable 中。

flatMap 是异步顺序组合运算符。它将来自源可观察对象的每个通知转换到可观察对象中,从而允许您同时处理每个输入值。然后它将每个计算的结果合并到一个扁平化的可观察序列中,并带有非重叠通知。

附录:

flatMap选择器 中,通常有多种方法可以根据目标平台创建并发可观察对象。我不懂 Java,但在 .NET 中,您通常会使用 Observable.Start 来引入并发性,或者使用异步方法 (async/await) 来利用 native 异步,这通常更可取。

1 从技术上讲, observable 的单个订阅(观察者)可以在 Rx 中启用并发,尽管从 observables 的角度考虑通常更方便。参见 this answer了解更多信息。

关于java - 消费观察者同时发出的值,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/28109240/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com