gpt4 book ai didi

Spring-Integration Webflux 异常处理

转载 作者:行者123 更新时间:2023-12-04 07:51:26 26 4
gpt4 key购买 nike

如果在 spring-integration webflux 流中发生异常,则异常本身(带有堆栈跟踪)作为有效负载通过 MessagePublishingErrorHandler 发送回调用者,它使用来自“errorChannel” header 的错误 channel ,而不是默认错误 channel 。

如何设置类似于 WebExceptionHandler 的错误处理程序?我想生成一个 Http 状态代码,可能还有一个 DefaultErrorAttributes对象作为响应。

简单地定义一个从 errorChannel 开始的流不起作用,错误消息不会在那里结束。我试图定义我自己的 fluxErrorChannel ,但它似乎也没有用作错误 channel ,错误不会在我的 errorFlow 中结束:

@Bean
public IntegrationFlow fooRestFlow() {
return IntegrationFlows.from(
WebFlux.inboundGateway("/foo")
.requestMapping(r -> r.methods(HttpMethod.POST))
.requestPayloadType(Map.class)
.errorChannel(fluxErrorChannel()))
.channel(bazFlow().getInputChannel())
.get();
}

@Bean
public MessageChannel fluxErrorChannel() {
return MessageChannels.flux().get();
}

@Bean
public IntegrationFlow errorFlow() {
return IntegrationFlows.from(fluxErrorChannel())
.transform(source -> source)
.enrichHeaders(h -> h.header(HttpHeaders.STATUS_CODE, HttpStatus.BAD_GATEWAY))
.get();
}

@Bean
public IntegrationFlow bazFlow() {
return f -> f.split(Map.class, map -> map.get("items"))
.channel(MessageChannels.flux())
.<String>handle((p, h) -> throw new RuntimeException())
.aggregate();
}

更新

MessagingGatewaySupport.doSendAndReceiveMessageReactive我在 WebFlux.inboundGateway 上定义的错误 channel 从未用于设置错误 channel ,而是错误 channel 始终是 replyChannel正在创建 here :
FutureReplyChannel replyChannel = new FutureReplyChannel();

Message<?> requestMessage = MutableMessageBuilder.fromMessage(message)
.setReplyChannel(replyChannel)
.setHeader(this.messagingTemplate.getSendTimeoutHeader(), null)
.setHeader(this.messagingTemplate.getReceiveTimeoutHeader(), null)
.setErrorChannel(replyChannel)
.build();

错误 channel 最终被重置为 originalErrorChannelHandlerMono.fromFuture ,但在我的情况下,该错误 channel 是ˋnullˋ。另外, onErrorResume lambda 永远不会被调用:
return Mono.fromFuture(replyChannel.messageFuture)
.doOnSubscribe(s -> {
if (!error && this.countsEnabled) {
this.messageCount.incrementAndGet();
}
})
.<Message<?>>map(replyMessage ->
MessageBuilder.fromMessage(replyMessage)
.setHeader(MessageHeaders.REPLY_CHANNEL, originalReplyChannelHeader)
.setHeader(MessageHeaders.ERROR_CHANNEL, originalErrorChannelHeader)
.build())
.onErrorResume(t -> error ? Mono.error(t) : handleSendError(requestMessage, t));

这是如何工作的?

最佳答案

这是一个错误; ErrorMessage由错误处理程序为异常创建的异常被发送到 errorChannel header (必须是 replyChannel 以便网关获得结果)。然后网关应该调用错误流(如果存在)并返回结果。

https://jira.spring.io/browse/INT-4541

关于Spring-Integration Webflux 异常处理,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/52677057/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com