gpt4 book ai didi

java - RxJava Eventbus 和错误处理

转载 作者:行者123 更新时间:2023-11-30 08:14:57 25 4
gpt4 key购买 nike

我正在尝试使用 rxjava 创建一个事件总线,在该总线上我抛出一些命令,即使发生错误,这些命令也会继续执行。我正在研究 onErrorFlatMap,但它已经不存在了,而且我还无法理解具体化和非具体化。

这是我已经拥有的:

我的小测试

public class EventBusTest {

private EventBus eventBus = new EventBus();

@Test
public void test() {
//listeners in service layer
eventBus.on(createOf(BeanA.class))
.subscribe(this::handleBeanAInService1);
eventBus.on(createOf(BeanA.class))
.doOnNext(BeanA::validateBusinessLogic)
.subscribe(this::handleBeanAInService2);
eventBus.on(createOf(BeanB.class))
.subscribe(this::handleBeanBInService);

//incoming requests in rest layer
List<String> incomingCalls = new ArrayList<>();
Collections.addAll(incomingCalls, "firstRequest", "secondRequestErrorOnValidate", "thirdRequest", "fourthRequestErrorInService", "fifthRequest");
incomingCalls.stream().forEach(this::incomingCallsEgHttpCalls);
}

public void incomingCallsEgHttpCalls(String string) {
Observable.just(string)
.map(aName -> new BeanA(aName))
.doOnNext(bean -> eventBus.post(new CreateCommand(bean)))
.subscribe(bean -> System.out.println("\tReturn ok to client: " + bean.getName()), error -> System.out.println("\tReturning error to client: " + string + "; " + error.getMessage()));
}

public void handleBeanAInService1(BeanA beanA) {
System.out.println("BeanAService1 handling BeanA " + beanA.getName());
if(beanA.getName().contains("ErrorInService")) {
throw new RuntimeException("service exception for " + beanA.getName());
}
eventBus.post(new CreateCommand(new BeanB(beanA.getName())));
}

public void handleBeanAInService2(BeanA beanA) {
System.out.println("BeanAService2 handling BeanA " + beanA.getName());
}

public void handleBeanBInService(BeanB beanB) {
System.out.println("BeanBService handling BeanB " + beanB.getName());
}

}

EventBus

@Named
public class EventBus {

private PublishSubject<Object> publishSubject = PublishSubject.create();

public EventBus post(Object object) {
//publishSubject.onNext(object); => does not work, OnErrorNotImplementedExceptions if eventbus observers throw errors in validate

//To prevent OnErrorNotImplementedException
try {
publishSubject.onNext(object);
} catch (OnErrorNotImplementedException e) {
Throwable cause = e.getCause();
if (cause instanceof RuntimeException) {
throw (RuntimeException) cause;
}
throw new RuntimeException(cause);
}
return this;
}

public Observable<Object> observe() {
//I also tried
//return publishSubject.onErrorResumeNext(publishSubject);
return publishSubject;
}

public <F, T> Observable<T> on(Observable.Transformer<F, T> onCommand) {
return onCommand.call((Observable<F>) publishSubject);
}

}

创建命令

public class CreateCommand {

private Object object;

public CreateCommand(Object object) {
this.object = object;
}

public Class<?> type() {
return object.getClass();
}

public <T> T value() {
return (T) object;
}

public static <F, T> Observable.Transformer<F, T> createOf(Class<T> clazz) {
return observable -> observable
.ofType(CreateCommand.class)
.filter(createCommand -> clazz.isInstance(createCommand.object))
.map(createCommand -> clazz.cast(createCommand.object));
}
}

BeanA

public class BeanA {
private String name;

public BeanA(String name) {
this.name = name;
}

public String getName() {
return name;
}

public static void validateBusinessLogic(BeanA beanA) {
if (beanA.getName().contains("ErrorOnValidate")) {
throw new RuntimeException("validate exception for " + beanA.getName());
}
}
}

BeanB

public class BeanB {

private String name;

public BeanB(String name) {
this.name = name;
}

public String getName() {
return name;
}

}

实际输出

BeanAService1 handling BeanA firstRequest
BeanBService handling BeanB firstRequest
BeanAService2 handling BeanA firstRequest
Return ok to client: firstRequest
BeanAService1 handling BeanA secondRequestErrorOnValidate
BeanBService handling BeanB secondRequestErrorOnValidate
Returning error to client: secondRequestErrorOnValidate; validate exception for secondRequestErrorOnValidate
BeanAService1 handling BeanA thirdRequest
BeanBService handling BeanB thirdRequest
Return ok to client: thirdRequest
BeanAService1 handling BeanA fourthRequestErrorInService
Returning error to client: fourthRequestErrorInService; service exception for fourthRequestErrorInService
Return ok to client: fifthRequest

想要的输出

BeanAService1 handling BeanA firstRequest
BeanBService handling BeanB firstRequest
BeanAService2 handling BeanA firstRequest
Return ok to client: firstRequest
BeanAService1 handling BeanA secondRequestErrorOnValidate
BeanBService handling BeanB secondRequestErrorOnValidate
Returning error to client: secondRequestErrorOnValidate; validate exception for secondRequestErrorOnValidate
BeanAService1 handling BeanA thirdRequest
BeanBService handling BeanB thirdRequest
BeanAService2 handling BeanA thirdRequest
Return ok to client: thirdRequest
BeanAService1 handling BeanA fourthRequestErrorInService
BeanAService2 handling BeanA fourthRequestErrorInService
Returning error to client: fourthRequestErrorInService; service exception for fourthRequestErrorInService
BeanAService1 handling BeanA fifthRequest
BeanBService handling BeanB fifthRequest
BeanAService2 handling BeanA fifthRequest
Return ok to client: fifthRequest

有什么想法可以解决吗?我看到 rxjava 中有一个关于 EventBus 实现示例的问题,但没有找到它。

我还希望我的服务不需要进行一些specifix rxjava 错误处理。

或者是否有另一种方法将事件推送到多个其他可观察对象,然后再次压缩结果?

最佳答案

RxJava 中的错误是终止可观察流的事件(除非由 onErrorXXX 和 onException 运算符捕获/恢复)。如果您想让流保持 Activity 状态,您需要将错误包装到其他值类型(即通知)中,并在相关位置将其解开。

Here is an example gist显示了 RxJava 结构上的事件总线实现。请注意以下事项:

  • 如果可以从多个线程调用事件总线的 post() 方法,您可能需要序列化对主题的访问。
  • RxJava 有时不会反弹从 onNext 抛出的异常,而只会在流的最开始处反弹,从而终止整个流,因此您需要一个运算符将此类错误限制在一定水平以下,因此示例中使用了 ClientErrorBounceBack 运算符。

关于java - RxJava Eventbus 和错误处理,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/29791816/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com