gpt4 book ai didi

java - 如何计数并阻止在另一个线程上执行的 Observable?

转载 作者:行者123 更新时间:2023-11-30 02:28:55 25 4
gpt4 key购买 nike

我有一个基本的 RxJava 2 Observable,我在主线程上运行。然后我调用observeOn在另一个线程上运行一些计算。然后,我使用 autoConnect 订阅 count 和另一个副作用操作(写出计算结果)。在示例中,这显示为 System.out.println,但假设这是一个可能需要一段时间的操作。我想避免可能的竞争条件,即计数完成,但在另一个线程上写出计算的最后一个元素的结果未完成。

为此,我通过 .count() 调用创建了一个 Future,在副作用操作上称为 blockingSubscribe ,最后在未来调用 .get() ,以确保主线程在完成两个可观察操作时阻塞。这里有更好的解决方案来避免丢失最后一个元素吗?我是否需要这样做,或者 Observables 是否提供了我缺少的一些保证?

还值得注意的是,我想避免重复计算 Observable 的初始元素,因为这可能会在许多昂贵的步骤之后发生。

我的示例代码如下所示。

Stream<Integer> ints = ImmutableList.of(1, 2, 3, 4, 5, 6)
.stream().map(i -> {
System.out.println("emitting:" + i);
return i;
});

Observable<Integer> observable = Observable.fromIterable(() -> ints.iterator())
.observeOn(Schedulers.computation())
.map(x -> x + 1)
.publish()
.autoConnect(2);

Future<Long> count = observable.count().toFuture();
// pretend that instead of printing here, this operation
// could take some time to complete
observable.blockingSubscribe(i -> System.out.println("got " + i));
Long c = count.get();

return count;

最佳答案

您可以在单个流中完成所有工作,而不是创建 2 个流:

Observable<Integer> observable = Observable.fromIterable(() -> ints.iterator())
.observeOn(Schedulers.computation())
.map(x -> x + 1);

observable.doOnNext(integer -> {
//your heavy work here
})
.count()
.subscribe(aLong -> {
//do something with the final count
});

使用副作用 doOnNext() 进行繁重的计算,并在 count() 运算符之后订阅,对最终计数执行某些操作。这将保证你的执行顺序,通知永远不会同时发生,这是Observable合约的一部分,每个doOnNext()都会连续发生,出于同样的原因,最终的onNext() 在订阅者处,将最后发生,因为 count() 运算符基于源 Observable 的完成(onComplete 通知)并且因此,将在 doOnNext() 处发出并处理所有项目后发生。

关于java - 如何计数并阻止在另一个线程上执行的 Observable?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/44750360/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com