- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我是 RxJava 的新手,和其他许多人一样,我正在努力了解异常处理。我在网上阅读了很多帖子(例如此处的讨论 how to handle exceptions thrown by observer's onNext)并且认为我了解了这些概念的基本概念。
在上面提到的讨论中,其中一位发帖者说,当订阅者抛出异常时,RxJava 会执行以下操作:
Implement generic handling to log the failure and stop sending it events (of any kind) and clean up any resources due to that subscriber and carry on with any remaining subscriptions.
这或多或少也是我所看到的,我唯一有问题的是“清理任何资源”位。为了清楚起见,让我们假设以下示例:
我想创建一个 Observable 来监听异步事件源(例如 JMS 队列)和 onNext()s 在每个接收到的消息上。所以在(伪)代码中我会做类似的事情:
Observable<String> observable = Observable.create( s -> {
createConnectionToBroker();
getConsumer().setMessageListener(message -> s.onNext(transform(message)));
s.setDisposable(new Disposable() {
public void dispose() {
tearDownBrokerConnection();
}
});
});
因为我想为许多订阅者/观察者重用消息监听器,所以我不直接在创建的 Observable 上订阅,而是使用 publish().refCount() 组。类似这样的东西:
Observable<String> observableToSubscribeTo = observable.publish().refCount();
Disposable d1 = observableToSubscribeTo.subscribe(s -> ...);
Disposable d2 = observableToSubscribeTo.subscribe(s -> ...);
这一切都按预期工作。代码仅在建立第一个订阅时连接到 JMS,并在最后一个观察者 dispose()
d 时关闭与代理的连接。
但是,当订阅者在 onNext()
ed 时抛出异常时,事情似乎变得一团糟。正如预期的那样,throw 的观察者被删除了,每当发布新事件时,它都不会再收到通知。我的问题似乎是,当所有剩余的订阅者都是 dispose()
d 时,不再通知保持与消息代理连接的 Observable。在我看来,好像抛出异常的订户处于某种僵尸状态。当涉及到事件分发时它会被忽略,但它会以某种方式阻止根 Observable 在最后一个订阅者是 dispose()
d 时得到通知。
我理解 RxJava 期望观察者确保不抛出异常,而是正确处理最终的异常。不幸的是,在我想提供一个向调用者返回 Observable 的库的情况下,我无法控制我的订阅者。这意味着,我永远无法保护我的图书馆免受愚蠢的观察者的侵害。
所以,我问自己:我是否遗漏了什么?当订阅者抛出时真的没有机会正确清理吗?这是一个错误还是只是我不了解图书馆?
非常感谢任何见解!
最佳答案
如果您可以展示一些单元测试来证明问题(不需要 JMS),那就太好了。
此外,RxJava 2 中的 onNext 永远不应该抛出;如果是,则为未定义行为。如果您不信任您的消费者,您可以使用一个终端可观察转换器来执行 safeSubscribe
而不是普通的 subscribe
以防止下游的不当行为:
.compose(o -> v -> o.safeSubscribe(v))
或
.compose(new ObservableTransformer<T>() {
@Override public Observable<T> apply(final Observable<T> source) {
return new Observable<T>() {
@Override public void subscribeActual(Observer<? super T> observer) {
source.safeSubscribe(observer);
}
};
}
})
关于java - RxJava 2.0 - 处理 publish().refCount() 中未捕获订阅者错误的资源,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/42079234/
我正在进行 2 个 RX 调用,这些调用相互嵌套且相互依赖。服务器存在问题(由于各种原因目前无法解决),该问题在第二个嵌套调用中返回错误。 在这个问题得到解决之前,我需要确保如果第二次调用返回错误,则
这个问题在这里已经有了答案: How to resolve Duplicate files copied in APK META-INF/rxjava.properties (8 个答案) 关闭 5
我正在尝试将此 RxJava1 代码转换为 RxJava2 public static Observable listFolder(Path dir, String glob) { retur
这个问题在这里已经有了答案: RxJava 1 and RxJava 2 in the same project [duplicate] (1 个回答) How to resolve Duplica
有显示项目的RecyclerViewAdapter(该项目已经映射到数据库中)。 RecyclerViewAdapter 包含对 Presenter 的引用以加载项目。它还包含带有项目 ID 的 Ar
我想弄清楚如何在 Android 中使用 RxJava 将 Realm 对象保存在 Realm 中。到目前为止,结合所有这些的所有示例都是如何从 Realm 读取数据的。我想在 android 中使用
我在日志中收到此错误: Caused by java.lang.ClassCastException: java.net.UnknownHostException cannot be cast to
我有一个 API 服务类,其方法返回 Retrofit 提供的调用。 最近,Rx2Java 引入了 Single,所以我想将 Call 更改为 Single,但我不想更改逻辑。 例如 : 类接口(in
如何使用运算符让我始终获得以前和当前的值?如果可能的话,我想避免在管道外创建状态。 - time -> 1 2 3 4 | | | | Op
我正在努力实现以下目标。我加载了一个对象列表,我想获取稍后放入列表中的值。 首先,我使用 flatmap 将所有值收集到一个数组中(按山顺序),然后当一切完成后,我填充一个适配器。 我无法做的是每
是否可以选择使用 timeout 的变体不发射 Throwable ? 我要 complete事件发出。 最佳答案 您不需要使用 onErrorResumeNext 映射错误。您可以使用以下方法提供备
我们可以在 C# Rx 中异步执行一些代码,如下所示,使用 Observable.Start()。我想知道 RxJava 中的等价物是什么。 void Main() { AddTwoNum
问题:我有一个用户可以输入查询字符串的功能,我制作了 2 个可观察对象,一个用于查询我的本地数据库,另一个用于从 API 获取结果。这两个操作必须并行运行。我需要尽快显示来自数据库的结果,当 API
我正在尝试在 MVVM 中实现 ViewModel,将可观察对象作为“输入流”提供,将观察者作为“输出流”提供以供 View 绑定(bind)。 如果 getUser() 调用成功,下面的代码似乎可以
出于某种原因,我有时想使用 RxOperators 而不是普通的 java 方式来转换数据结构,因为它越来越干净。例如: Observable.from(listOfStrings) .filter(
我是 RxJava 新手,我需要以异步方式使用 Observable 功能。 我还需要使用超时:在我的示例中,我希望每个进程在 1 秒或更短的时间内结束。 这是我现在所做的: public stati
我正在尝试在网络请求期间在UI中显示进度条至少3秒钟。 此答案中描述的相同方法似乎不适用于Single。 RxJava Observable minimum execution time Single
我有一个可观察的(很热),它通过系统进程执行操作,并且我希望也运行一个间隔,直到该进程可观察达到 onComplete。 我看到区间运算符:http://reactivex.io/documentat
好吧,我是 RxJava2 的新手(嗯,我也不了解 RxJava),并且正在尝试使用 RxJava2 和 MVP 结构开发 Android 应用程序。 在该应用程序中,我正在对使用监听器的库进行异步调
如何将单个流拆分为单独的单个流,这样就可以执行以下操作而无需两次计算getUserId()? // getUserId() returns Single getUserId().flatMap { g
我是一名优秀的程序员,十分优秀!