gpt4 book ai didi

java - Spring XD 流中的异常处理

转载 作者:太空宇宙 更新时间:2023-11-04 13:15:12 26 4
gpt4 key购买 nike

如何创建一个故障安全 Spring XD 流,该流将在一条特定消息触发异常后保持正常运行(即记录错误但继续使用流中的下一条消息),而不必在每个流步骤中添加 try catch(Throwable)?

有没有简单的方法可以使用 Reactor 或 RxJava 模型来做到这一点?

使用 Reactor 的示例流:

@Override
public Publisher<Tuple> process(Stream<GenericMessage> inputStream) {
return inputStream
.flatMap(SomeClass::someFlatMap)
.filter(SomeClass::someFilter)
.when(Throwable.class, t -> log.error("error", t));
}

最佳答案

RxJava 可由处理器模块使用。创建时需要创建订阅并处理错误,订阅者需要添加 onError 处理程序:

       subject = new SerializedSubject(PublishSubject.create());
Observable<?> outputStream = processor.process(subject);
subscription = outputStream.subscribe(new Action1<Object>() {
@Override
public void call(Object outputObject) {
if (ClassUtils.isAssignable(Message.class, outputObject.getClass())) {
getOutputChannel().send((Message) outputObject);
} else {
getOutputChannel().send(MessageBuilder.withPayload(outputObject).build());
}
}
}, new Action1<Throwable>() {
@Override
public void call(Throwable throwable) {
logger.error(throwable.getMessage(), throwable);
}
}, new Action0() {
@Override
public void call() {
logger.error("Subscription close for [" + subscription + "]");
}
});

查看更多示例:https://github.com/spring-projects/spring-xd/tree/master/spring-xd-rxjava/src

关于java - Spring XD 流中的异常处理,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/33608919/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com