gpt4 book ai didi

spring - 错误 channel 在拆分器/聚合器中不起作用(异步调用)

转载 作者:行者123 更新时间:2023-12-03 07:57:40 24 4
gpt4 key购买 nike

我创建了一个使用拆分器/聚合器异步调用网关的应用程序。在我的配置文件中,我通过 InvestmentMessagingGateway 调用该过程,该过程继续调用拆分器。每个拆分的消息并行调用一个服务激活器并将其传递给 inn 聚合器。
我在 InvestmentMessagingGateway 中放置了一个错误 channel ,并将每条失败的消息也转换为传递给聚合器。

我在聚合器中收集每条成功和失败的消息作为响应的编译。
但是当我试图在一条或多条消息中放置异常时,我的聚合器中出现错误,

收到回复消息,但接收线程已经收到回复。

<?xml version="1.0" encoding="UTF-8"?>
<beans:beans xmlns="......."">

<context:component-scan base-package="com.api.investments"/>


<!--The gateway to be called in parallel-->
<gateway id="InvestmentGateway" service-interface="com.api.investments.gateways.InvestmentGateway"/>

<channel id="investmentDetailChannel"/>
<service-activator input-channel="investmentDetailChannel" ref="investmentService" method="getAccountPortfolio"/>


<!--Inbound gateway to invoke Splitter / Aggregator-->
<gateway id="InvestmentMessageGateway" service-interface="com.api.investments.gateways.InvestmentMessageGateway"
default-reply-channel="investmentAsyncReceiver" error-channel="investmentAsyncException"/>

<channel id="investmentAsyncSender"/>
<channel id="investmentAsyncReceiver"/>

<!-- Splitter for Invesment Details-->
<splitter input-channel="investmentAsyncSender" output-channel="investmentSplitChannel" id="investmentDetailsSplitter" ref="investmentComponentsSplitter" />

<channel id="investmentSplitChannel">
<queue />
</channel>

<!--Calls the Investment Gateway asynchronously using split messages ad send the response in aggregator-->
<service-activator input-channel="investmentSplitChannel" output-channel="investmentAggregateChannel" ref="investmentAsyncActivator" method="retrieveInvestmentDetailsAsync" requires-reply="true">
<poller receive-timeout="5000" task-executor="investmentExecutor" fixed-rate="50"/>
</service-activator>



<channel id="investmentAsyncException"/>

<!--Handles failed messages and pass it in aggregator-->
<transformer input-channel="investmentAsyncException" output-channel="investmentAggregateChannel" ref="invesmentErrorLogger" method="logError"/>

<!--Aggreggates successfull and failed messaged-->
<publish-subscribe-channel id="investmentAggregateChannel"/>
<aggregator input-channel="investmentAggregateChannel" output-channel="investmentAsyncReceiver" id="investmentAggregator"
ref="investmentComponentsAggregator" correlation-strategy="investmentComponentsCorrelationStrategy"
expire-groups-upon-completion="true"
send-partial-result-on-expiry="true" />


<task:executor id="investmentExecutor" pool-size="10-1000"
queue-capacity="5000"/>

</beans:beans>

我尝试将我的错误 channel 放入服务激活器的轮询器中,但错误仍然相同,但这次它没有进入聚合器。
我还尝试为这样的服务激活器放置一个中间网关

但错误变为空。
<gateway id="InvestmentAsyncActivatorGateway" service-interface="com.api.investments.gateways.InvestmentAsyncActivatorGateway"
default-reply-channel="investmentAggregateChannel" error-channel="investmentAsyncException"/>

- - 更新 - - -

这是处理每条错误消息的转换器
@Component("invesmentErrorLogger")
public class InvesmentErrorLoggerImpl implements InvestmentErrorLogger {

private final Logger logger = LoggerFactory.getLogger(Application.class.getName());

/**
* handles all error messages in InvestmentMessageGateway
* Creates an error message and pass it in the aggregator channel
* @param invesmentMessageError
* @return errorMessage
*/
@Override
public Message<ErrorDetails> logError(Message<?> invesmentMessageError) {
if(invesmentMessageError.getPayload().getClass().equals(MessagingException.class)) {
MessagingException messageException = (MessagingException) invesmentMessageError.getPayload();
AccountPortfolioRequest failedMsgPayload = (AccountPortfolioRequest) messageException.getFailedMessage().getPayload();
String logError = "Exception occured in Account Number: " + failedMsgPayload.getiAccNo();
logger.error(logError);
ErrorDetails productErrorDetail = new ErrorDetails();
productErrorDetail.setCode(InvestmentAPIErrorMessages.SVC_ERR_INQACCNTPORTFOLIO);
productErrorDetail.setMessage(InvestmentAPIErrorMessages.SVC_ERR_INQACCNTPORTFOLIO_DESC + ". Problem occured in Account Number: " + failedMsgPayload.getiAccNo());

Message<ErrorDetails> errorMessage = MessageBuilder.withPayload(productErrorDetail)
.setHeaderIfAbsent(InvestmentInquiryConstants.INV_CORRELATION_STRATEGY, InvestmentInquiryConstants.INV_CORRELATION_STRATEGY_VALUE)
.build();

return errorMessage;
}
else if(invesmentMessageError.getPayload().getClass().equals(MessageDeliveryException.class)) {
MessageDeliveryException messageException = (MessageDeliveryException) invesmentMessageError.getPayload();
AccountPortfolioRequest failedMsgPayload = (AccountPortfolioRequest) messageException.getFailedMessage().getPayload();
String logError = "Exception occured in Account Number: " + failedMsgPayload.getiAccNo();
logger.error(logError);
ErrorDetails productErrorDetail = new ErrorDetails();
productErrorDetail.setCode(InvestmentAPIErrorMessages.SVC_ERR_INQACCNTPORTFOLIO);
productErrorDetail.setMessage(InvestmentAPIErrorMessages.SVC_ERR_INQACCNTPORTFOLIO_DESC + ". Problem occured in Account Number: " + failedMsgPayload.getiAccNo());

Message<ErrorDetails> errorMessage = MessageBuilder.withPayload(productErrorDetail)
.setHeaderIfAbsent(InvestmentInquiryConstants.INV_CORRELATION_STRATEGY, InvestmentInquiryConstants.INV_CORRELATION_STRATEGY_VALUE)
.build();

return errorMessage;
}
else {
Exception messageException = (Exception) invesmentMessageError.getPayload();
String logError = "Exception occured in Investment Gateway ";
logger.error(logError);
logger.equals(messageException.getMessage());
ErrorDetails productErrorDetail = new ErrorDetails();
productErrorDetail.setCode(InvestmentAPIErrorMessages.SVC_ERR_INQACCNTPORTFOLIO);
productErrorDetail.setMessage(InvestmentAPIErrorMessages.SVC_ERR_INQACCNTPORTFOLIO_DESC + " " + messageException.getMessage());

Message<ErrorDetails> errorMessage = MessageBuilder.withPayload(productErrorDetail)
.setHeaderIfAbsent(InvestmentInquiryConstants.INV_CORRELATION_STRATEGY, InvestmentInquiryConstants.INV_CORRELATION_STRATEGY_VALUE)
.build();

return errorMessage;
}
}

}

最佳答案

Reply message received but the receiving thread has already received a reply.



正如错误所暗示的,您不能为单个请求发送多个回复(或错误);严格来说,每个请求只有一个回复。

您需要拆分器和服务之间的另一个网关。

中间流网关应该没有 service-interface所以它使用 RequestReplyExchanger .

关于spring - 错误 channel 在拆分器/聚合器中不起作用(异步调用),我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/52236635/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com