gpt4 book ai didi

java - 为什么我的 RxJava Observable 除非阻塞,否则不会发出或完成?

转载 作者:塔克拉玛干 更新时间:2023-11-03 03:56:44 24 4
gpt4 key购买 nike

背景

我有许多 RxJava Observables(从 Jersey 客户端生成,或者使用 Observable.just(someObject) 生成)。它们都应该只发出一个值。我有一个模拟所有 Jersey 客户端并使用 Observable.just(someObject) 的组件测试,我看到了与运行生产代码时相同的行为。

我有几个类作用于这些 observables,执行一些计算(以及一些副作用 - 我可能会让它们稍后直接返回值)并返回空的 void observables。

有一次,在一个这样的类中,我试图压缩我的几个源可观察量然后映射它们 - 如下所示:

public Observable<Void> doCalculation() {
return Observable.zip(
getObservable1(),
getObservable2(),
getObservable3(),
UnifyingObject::new
).concatMap(unifyingObject -> unifyingObject.processToNewObservable())
}

// in Unifying Object
public Observable<Void> processToNewObservable() {
// ... do some calculation ...
return Observable.empty();
}

计算类然后全部合并并等待:

// Wait for rule computations to complete
List<Observable<Void>> calculations = ...;
Observable.zip(calculations, results -> results)
.toBlocking().lastOrDefault(null);

问题

问题是,processToNewObservable() 永远不会被执行。通过消除过程,我可以看到问题出在 getObservable1() - 如果我用 Observable.just(null) 替换它,一切都会按照我想象的那样执行(但在我想要一个真实值的地方有一个空值)。

重申一下,getObservable1() 在生产代码中从 Jersey 客户端返回一个 Observable,但该客户端是一个 Mockito 模拟,在我的代码中返回 Observable.just(someValue)测试。

调查

如果我将 getObservable1() 转换为阻塞,然后将第一个值包装在 just() 中,同样,一切都按照我的想象执行(但我不不想引入阻塞步骤):

Observable.zip(
Observable.just(getObservable1().toBlocking().first()),
getObservable2(),
getObservable3(),
UnifyingObject::new
).concatMap(unifyingObject -> unifyingObject.processToNewObservable())

我的第一个想法是,也许其他东西正在消耗我的 observable 发出的值,而 zip 看到它已经完成,因此确定压缩它们的结果应该是一个空的 observable .我已经尝试将 .cache() 添加到我认为相关的每个可观察源上,但是,这并没有改变行为。

我还尝试在 zip 之前在 getObservable1 上添加 next/error/complete/finally 处理程序(不将其转换为阻塞),但它们都没有执行:

getObservable1()
.doOnNext(...)
.doOnCompleted(...)
.doOnError(...)
.finallyDo(...);

Observable.zip(
getObservable1(),
getObservable2(),
getObservable3(),
UnifyingObject::new
).concatMap(unifyingObject -> unifyingObject.processToNewObservable())

问题

我是 RxJava 的新手,所以我很确定我错过了一些基本的东西。问题是:我能做什么蠢事?如果从我目前所说的来看这不是很明显,我可以做些什么来帮助诊断问题?

最佳答案

Observable 必须发射才能启动链。您必须将管道视为 Observable 发射时将发生什么的声明。

您没有分享实际观察到的内容,但 Observable.just() 导致 Observable 立即发出包装对象。

关于java - 为什么我的 RxJava Observable 除非阻塞,否则不会发出或完成?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/34257631/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com