gpt4 book ai didi

spring-integration - @MessagingGateway 如何配置 Spring Cloud Stream MessageChannels?

转载 作者:行者123 更新时间:2023-12-04 07:59:25 26 4
gpt4 key购买 nike

我已经开发了异步 Spring Cloud Stream 服务,并且我正在尝试开发一个边缘服务,该服务使用 @MessagingGateway 来提供对本质上异步的服务的同步访问。

我目前得到以下堆栈跟踪:

Caused by: org.springframework.messaging.core.DestinationResolutionException: no output-channel or replyChannel header available
at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutput(AbstractMessageProducingHandler.java:355)
at org.springframework.integration.handler.AbstractMessageProducingHandler.produceOutput(AbstractMessageProducingHandler.java:271)
at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutputs(AbstractMessageProducingHandler.java:188)
at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:115)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:127)
at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:116)
... 47 common frames omitted

我的@MessagingGateway:
@EnableBinding(AccountChannels.class)
@MessagingGateway

public interface AccountService {
@Gateway(requestChannel = AccountChannels.CREATE_ACCOUNT_REQUEST,replyChannel = AccountChannels.ACCOUNT_CREATED, replyTimeout = 60000, requestTimeout = 60000)
Account createAccount(@Payload Account account, @Header("Authorization") String authorization);
}

如果我通过@StreamListener 在回复 channel 上使用消息,它工作得很好:
  @HystrixCommand(commandKey = "acounts-edge:accountCreated", fallbackMethod = "accountCreatedFallback", commandProperties = {@HystrixProperty(name = "execution.isolation.strategy", value = "SEMAPHORE")}, ignoreExceptions = {ClientException.class})
@StreamListener(AccountChannels.ACCOUNT_CREATED)
public void accountCreated(Account account, @Header(name = "spanTraceId", required = false) String traceId) {
try {
if (log.isInfoEnabled()) {
log.info(new StringBuilder("Account created: ").append(objectMapper.writeValueAsString(account)).toString());
}
} catch (JsonProcessingException e) {
log.error(e.getMessage(), e);
}
}

在生产者方面,我正在配置 requiredGroups保证多个消费者可以处理该消息,相应的,消费者有匹配的 group配置。

消费者:
spring:
cloud:
stream:
bindings:
create-account-request:
binder: rabbit1
contentType: application/json
destination: create-account-request
requiredGroups: accounts-service-create-account-request
account-created:
binder: rabbit1
contentType: application/json
destination: account-created
group: accounts-edge-account-created

制作人:
spring:
cloud:
stream:
bindings:
create-account-request:
binder: rabbit1
contentType: application/json
destination: create-account-request
group: accounts-service-create-account-request
account-created:
binder: rabbit1
contentType: application/json
destination: account-created
requiredGroups: accounts-edge-account-created

生产者端处理请求并发送响应的代码位:
  accountChannels.accountCreated().send(MessageBuilder.withPayload(accountService.createAccount(account)).build());

我可以调试并看到请求被接收和处理,但是当响应发送到回复 channel 时,这就是发生错误的时候。

为了让@MessagingGateway 正常工作,我缺少哪些配置和/或代码?我知道我正在结合 Spring Integration 和 Spring Cloud Gateway,所以我不确定将它们一起使用是否会导致问题。

最佳答案

这是个好问题,也是个好主意。但这不会那么容易。
首先我们必须自己确定gateway意味着 request/reply ,因此 correlation .这在 @MessagingGateway 中可用通过 replyChannel标题面对 TemporaryReplyChannel实例。即使您有明确的 replyChannel = AccountChannels.ACCOUNT_CREATED ,关联仅通过提到的 header 及其值完成。这个事实TemporaryReplyChannel不可序列化,不能通过网络传输到另一端的消费者。
幸运的是 Spring Integration 为我们提供了一些解决方案。它是 HeaderEnricher 的一部分和它的 headerChannelsToString后面的选项HeaderChannelRegistry :

Starting with Spring Integration 3.0, a new sub-element <int:header-channels-to-string/> is available; it has no attributes. This converts existing replyChannel and errorChannel headers (when they are a MessageChannel) to a String and stores the channel(s) in a registry for later resolution when it is time to send a reply, or handle an error. This is useful for cases where the headers might be lost; for example when serializing a message into a message store or when transporting the message over JMS. If the header does not already exist, or it is not a MessageChannel, no changes are made.


https://docs.spring.io/spring-integration/docs/5.0.0.RELEASE/reference/html/messaging-transformation-chapter.html#header-enricher
但在这种情况下,您必须引入从网关到 HeaderEnricher 的内部 channel 。只有最后一个会发送消息到 AccountChannels.CREATE_ACCOUNT_REQUEST .所以, replyChannel header 将转换为字符串表示形式并能够在网络上传播。在消费者方面,当您发送回复时,您应该确保您转移了 replyChannel标题也是如此。所以,当消息到达 AccountChannels.ACCOUNT_CREATED在生产者方面,我们有 @MessagingGateway ,相关机制能够将 channel 标识符转换为正确的 TemporaryReplyChannel并将回复与等待的网关调用相关联。
唯一的问题是您的生产者应用程序必须是 AccountChannels.ACCOUNT_CREATED 组中的单个使用者。 - 我们必须确保一次只有一个云实例在运行。仅仅因为只有一个实例具有该 TemporaryReplyChannel在它的内存中。
更多网关信息: https://docs.spring.io/spring-integration/docs/5.0.0.RELEASE/reference/html/messaging-endpoints-chapter.html#gateway
更新
一些帮助代码:
@EnableBinding(AccountChannels.class)
@MessagingGateway

public interface AccountService {
@Gateway(requestChannel = AccountChannels.INTERNAL_CREATE_ACCOUNT_REQUEST, replyChannel = AccountChannels.ACCOUNT_CREATED, replyTimeout = 60000, requestTimeout = 60000)
Account createAccount(@Payload Account account, @Header("Authorization") String authorization);
}

@Bean
public IntegrationFlow headerEnricherFlow() {
return IntegrationFlows.from(AccountChannels.INTERNAL_CREATE_ACCOUNT_REQUEST)
.enrichHeaders(headerEnricher -> headerEnricher.headerChannelsToString())
.channel(AccountChannels.CREATE_ACCOUNT_REQUEST)
.get();

}
更新
一些简单的应用程序来演示 PoC:
@EnableBinding({ Processor.class, CloudStreamGatewayApplication.GatewayChannels.class })
@SpringBootApplication
public class CloudStreamGatewayApplication {

interface GatewayChannels {

String REQUEST = "request";

@Output(REQUEST)
MessageChannel request();


String REPLY = "reply";

@Input(REPLY)
SubscribableChannel reply();
}

private static final String ENRICH = "enrich";


@MessagingGateway
public interface StreamGateway {

@Gateway(requestChannel = ENRICH, replyChannel = GatewayChannels.REPLY)
String process(String payload);

}

@Bean
public IntegrationFlow headerEnricherFlow() {
return IntegrationFlows.from(ENRICH)
.enrichHeaders(HeaderEnricherSpec::headerChannelsToString)
.channel(GatewayChannels.REQUEST)
.get();
}

@StreamListener(Processor.INPUT)
@SendTo(Processor.OUTPUT)
public Message<?> process(Message<String> request) {
return MessageBuilder.withPayload(request.getPayload().toUpperCase())
.copyHeaders(request.getHeaders())
.build();
}


public static void main(String[] args) {
ConfigurableApplicationContext applicationContext =
SpringApplication.run(CloudStreamGatewayApplication.class, args);

StreamGateway gateway = applicationContext.getBean(StreamGateway.class);

String result = gateway.process("foo");

System.out.println(result);
}

}
application.yml :
spring:
cloud:
stream:
bindings:
input:
destination: requests
output:
destination: replies
request:
destination: requests
reply:
destination: replies
我用 spring-cloud-starter-stream-rabbit .
MessageBuilder.withPayload(request.getPayload().toUpperCase())
.copyHeaders(request.getHeaders())
.build()
将请求 header 复制到回复消息的技巧。因此,网关能够在回复端将 header 中的 channel 标识符转换为适当的 TemporaryReplyChannel将回复正确地传达给网关的调用者。
关于此事的 SCSt 问题: https://github.com/spring-cloud/spring-cloud-stream/issues/815

关于spring-integration - @MessagingGateway 如何配置 Spring Cloud Stream MessageChannels?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/47800497/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com