- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我正在尝试使用 rxjava 创建一个事件总线,在该总线上我抛出一些命令,即使发生错误,这些命令也会继续执行。我正在研究 onErrorFlatMap,但它已经不存在了,而且我还无法理解具体化和非具体化。
这是我已经拥有的:
我的小测试
public class EventBusTest {
private EventBus eventBus = new EventBus();
@Test
public void test() {
//listeners in service layer
eventBus.on(createOf(BeanA.class))
.subscribe(this::handleBeanAInService1);
eventBus.on(createOf(BeanA.class))
.doOnNext(BeanA::validateBusinessLogic)
.subscribe(this::handleBeanAInService2);
eventBus.on(createOf(BeanB.class))
.subscribe(this::handleBeanBInService);
//incoming requests in rest layer
List<String> incomingCalls = new ArrayList<>();
Collections.addAll(incomingCalls, "firstRequest", "secondRequestErrorOnValidate", "thirdRequest", "fourthRequestErrorInService", "fifthRequest");
incomingCalls.stream().forEach(this::incomingCallsEgHttpCalls);
}
public void incomingCallsEgHttpCalls(String string) {
Observable.just(string)
.map(aName -> new BeanA(aName))
.doOnNext(bean -> eventBus.post(new CreateCommand(bean)))
.subscribe(bean -> System.out.println("\tReturn ok to client: " + bean.getName()), error -> System.out.println("\tReturning error to client: " + string + "; " + error.getMessage()));
}
public void handleBeanAInService1(BeanA beanA) {
System.out.println("BeanAService1 handling BeanA " + beanA.getName());
if(beanA.getName().contains("ErrorInService")) {
throw new RuntimeException("service exception for " + beanA.getName());
}
eventBus.post(new CreateCommand(new BeanB(beanA.getName())));
}
public void handleBeanAInService2(BeanA beanA) {
System.out.println("BeanAService2 handling BeanA " + beanA.getName());
}
public void handleBeanBInService(BeanB beanB) {
System.out.println("BeanBService handling BeanB " + beanB.getName());
}
}
EventBus
@Named
public class EventBus {
private PublishSubject<Object> publishSubject = PublishSubject.create();
public EventBus post(Object object) {
//publishSubject.onNext(object); => does not work, OnErrorNotImplementedExceptions if eventbus observers throw errors in validate
//To prevent OnErrorNotImplementedException
try {
publishSubject.onNext(object);
} catch (OnErrorNotImplementedException e) {
Throwable cause = e.getCause();
if (cause instanceof RuntimeException) {
throw (RuntimeException) cause;
}
throw new RuntimeException(cause);
}
return this;
}
public Observable<Object> observe() {
//I also tried
//return publishSubject.onErrorResumeNext(publishSubject);
return publishSubject;
}
public <F, T> Observable<T> on(Observable.Transformer<F, T> onCommand) {
return onCommand.call((Observable<F>) publishSubject);
}
}
创建命令
public class CreateCommand {
private Object object;
public CreateCommand(Object object) {
this.object = object;
}
public Class<?> type() {
return object.getClass();
}
public <T> T value() {
return (T) object;
}
public static <F, T> Observable.Transformer<F, T> createOf(Class<T> clazz) {
return observable -> observable
.ofType(CreateCommand.class)
.filter(createCommand -> clazz.isInstance(createCommand.object))
.map(createCommand -> clazz.cast(createCommand.object));
}
}
BeanA
public class BeanA {
private String name;
public BeanA(String name) {
this.name = name;
}
public String getName() {
return name;
}
public static void validateBusinessLogic(BeanA beanA) {
if (beanA.getName().contains("ErrorOnValidate")) {
throw new RuntimeException("validate exception for " + beanA.getName());
}
}
}
BeanB
public class BeanB {
private String name;
public BeanB(String name) {
this.name = name;
}
public String getName() {
return name;
}
}
实际输出
BeanAService1 handling BeanA firstRequest
BeanBService handling BeanB firstRequest
BeanAService2 handling BeanA firstRequest
Return ok to client: firstRequest
BeanAService1 handling BeanA secondRequestErrorOnValidate
BeanBService handling BeanB secondRequestErrorOnValidate
Returning error to client: secondRequestErrorOnValidate; validate exception for secondRequestErrorOnValidate
BeanAService1 handling BeanA thirdRequest
BeanBService handling BeanB thirdRequest
Return ok to client: thirdRequest
BeanAService1 handling BeanA fourthRequestErrorInService
Returning error to client: fourthRequestErrorInService; service exception for fourthRequestErrorInService
Return ok to client: fifthRequest
想要的输出
BeanAService1 handling BeanA firstRequest
BeanBService handling BeanB firstRequest
BeanAService2 handling BeanA firstRequest
Return ok to client: firstRequest
BeanAService1 handling BeanA secondRequestErrorOnValidate
BeanBService handling BeanB secondRequestErrorOnValidate
Returning error to client: secondRequestErrorOnValidate; validate exception for secondRequestErrorOnValidate
BeanAService1 handling BeanA thirdRequest
BeanBService handling BeanB thirdRequest
BeanAService2 handling BeanA thirdRequest
Return ok to client: thirdRequest
BeanAService1 handling BeanA fourthRequestErrorInService
BeanAService2 handling BeanA fourthRequestErrorInService
Returning error to client: fourthRequestErrorInService; service exception for fourthRequestErrorInService
BeanAService1 handling BeanA fifthRequest
BeanBService handling BeanB fifthRequest
BeanAService2 handling BeanA fifthRequest
Return ok to client: fifthRequest
有什么想法可以解决吗?我看到 rxjava 中有一个关于 EventBus 实现示例的问题,但没有找到它。
我还希望我的服务不需要进行一些specifix rxjava 错误处理。
或者是否有另一种方法将事件推送到多个其他可观察对象,然后再次压缩结果?
最佳答案
RxJava 中的错误是终止可观察流的事件(除非由 onErrorXXX 和 onException 运算符捕获/恢复)。如果您想让流保持 Activity 状态,您需要将错误包装到其他值类型(即通知)中,并在相关位置将其解开。
Here is an example gist显示了 RxJava 结构上的事件总线实现。请注意以下事项:
关于java - RxJava Eventbus 和错误处理,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/29791816/
我正在进行 2 个 RX 调用,这些调用相互嵌套且相互依赖。服务器存在问题(由于各种原因目前无法解决),该问题在第二个嵌套调用中返回错误。 在这个问题得到解决之前,我需要确保如果第二次调用返回错误,则
这个问题在这里已经有了答案: How to resolve Duplicate files copied in APK META-INF/rxjava.properties (8 个答案) 关闭 5
我正在尝试将此 RxJava1 代码转换为 RxJava2 public static Observable listFolder(Path dir, String glob) { retur
这个问题在这里已经有了答案: RxJava 1 and RxJava 2 in the same project [duplicate] (1 个回答) How to resolve Duplica
有显示项目的RecyclerViewAdapter(该项目已经映射到数据库中)。 RecyclerViewAdapter 包含对 Presenter 的引用以加载项目。它还包含带有项目 ID 的 Ar
我想弄清楚如何在 Android 中使用 RxJava 将 Realm 对象保存在 Realm 中。到目前为止,结合所有这些的所有示例都是如何从 Realm 读取数据的。我想在 android 中使用
我在日志中收到此错误: Caused by java.lang.ClassCastException: java.net.UnknownHostException cannot be cast to
我有一个 API 服务类,其方法返回 Retrofit 提供的调用。 最近,Rx2Java 引入了 Single,所以我想将 Call 更改为 Single,但我不想更改逻辑。 例如 : 类接口(in
如何使用运算符让我始终获得以前和当前的值?如果可能的话,我想避免在管道外创建状态。 - time -> 1 2 3 4 | | | | Op
我正在努力实现以下目标。我加载了一个对象列表,我想获取稍后放入列表中的值。 首先,我使用 flatmap 将所有值收集到一个数组中(按山顺序),然后当一切完成后,我填充一个适配器。 我无法做的是每
是否可以选择使用 timeout 的变体不发射 Throwable ? 我要 complete事件发出。 最佳答案 您不需要使用 onErrorResumeNext 映射错误。您可以使用以下方法提供备
我们可以在 C# Rx 中异步执行一些代码,如下所示,使用 Observable.Start()。我想知道 RxJava 中的等价物是什么。 void Main() { AddTwoNum
问题:我有一个用户可以输入查询字符串的功能,我制作了 2 个可观察对象,一个用于查询我的本地数据库,另一个用于从 API 获取结果。这两个操作必须并行运行。我需要尽快显示来自数据库的结果,当 API
我正在尝试在 MVVM 中实现 ViewModel,将可观察对象作为“输入流”提供,将观察者作为“输出流”提供以供 View 绑定(bind)。 如果 getUser() 调用成功,下面的代码似乎可以
出于某种原因,我有时想使用 RxOperators 而不是普通的 java 方式来转换数据结构,因为它越来越干净。例如: Observable.from(listOfStrings) .filter(
我是 RxJava 新手,我需要以异步方式使用 Observable 功能。 我还需要使用超时:在我的示例中,我希望每个进程在 1 秒或更短的时间内结束。 这是我现在所做的: public stati
我正在尝试在网络请求期间在UI中显示进度条至少3秒钟。 此答案中描述的相同方法似乎不适用于Single。 RxJava Observable minimum execution time Single
我有一个可观察的(很热),它通过系统进程执行操作,并且我希望也运行一个间隔,直到该进程可观察达到 onComplete。 我看到区间运算符:http://reactivex.io/documentat
好吧,我是 RxJava2 的新手(嗯,我也不了解 RxJava),并且正在尝试使用 RxJava2 和 MVP 结构开发 Android 应用程序。 在该应用程序中,我正在对使用监听器的库进行异步调
如何将单个流拆分为单独的单个流,这样就可以执行以下操作而无需两次计算getUserId()? // getUserId() returns Single getUserId().flatMap { g
我是一名优秀的程序员,十分优秀!