gpt4 book ai didi

java - Spring Integration - 可以在特定时间订阅消息

转载 作者:太空宇宙 更新时间:2023-11-04 11:29:11 25 4
gpt4 key购买 nike

我是 Spring Integration 和 EIP 的新手。

我试图解决自己的问题,但我脑子里没有找到正确的解决方案。

这是我需要的流程示例。

  1. 客户端将Json数据发送给SI。
  2. HTTP 入站网关将 Json 数据作为负载。
  3. 此 Json 数据将使用两个单独的 HTTP 出站。但第二个出站调用是在第一个出站调用的响应之后进行的。
  4. 将结果回复到步骤 0(HTTP 入站网关上的回复 channel )

我正在考虑使用publishSubscribeChannel 或RecipientListRouter 将相同的消息发送到单独的调用。但在这种情况下,需要在某个时间执行第二次http出站。据我了解,这应该是第 4 步回复的一笔交易。

或者...

是否可以保留消息实例(来自 http 入站网关的 JSON 数据)以在第一次调用后的第二次调用中使用。

请给我一些想法,如果你有示例代码,那一定很棒。目前我正在编写 JAVA 配置样式,但也欢迎 XML 示例。

更新问题

我正在尝试添加桥接器和 executorChannel。

这是我的代码,几乎没有什么东西没有按我的预期工作。

  1. 我希望第二个出站处理程序(httpPMGateway) 使用来自 aIncomingGateway 的消息(因此使用publishSubscribeChannel),但在 httpJ3Gateway 出站处理程序之后,它使用来自 httpJ3Gateway 的消息。我使用带有来自 httpJ3Gateway 的响应消息的路由。(确定 -> httpPMgateway ,默认 -> backAChannel)

  2. 我想从j3ValidationFlow 或broadcastFlow 的末尾向aIncomingGateway 发送响应消息。但 aIncomingGateway 在发送之前已经收到回复消息。这是日志消息。

2017-05-16 11:17:09.061 WARN 11488 --- [pool-1-thread-1] cMessagingTemplate$TemporaryReplyChannel : Reply message received but the receiving thread has already received a reply:

这是代码部分:

@Bean
public MessagingGatewaySupport aIncomingGateway() {
HttpRequestHandlingMessagingGateway handler = new HttpRequestHandlingMessagingGateway(true);

RequestMapping requestMapping = new RequestMapping();
requestMapping.setMethods(HttpMethod.POST);
requestMapping.setPathPatterns("/a/incoming");
requestMapping.setConsumes("application/json");
requestMapping.setProduces("application/json");


handler.setRequestMapping(requestMapping);
handler.setMessageConverters(getMessageConverters());
handler.setRequestPayloadType(Amount.class);

handler.setReplyChannelName("backAChannel");
return handler;
}

@Bean
public MessageHandler httpJ3Gateway(){
HttpRequestExecutingMessageHandler httpHandler = new HttpRequestExecutingMessageHandler("http://localhost:8080/j3/incoming");
httpHandler.setHttpMethod(HttpMethod.POST);
httpHandler.setExpectReply(true);
httpHandler.setMessageConverters(getMessageConverters());
httpHandler.setExpectedResponseType(String.class);
httpHandler.setRequestFactory(requestFactory());
return httpHandler;
}

@Bean
public MessageHandler httpPMGateway(){
HttpRequestExecutingMessageHandler httpHandler = new HttpRequestExecutingMessageHandler("http://localhost:8080/pm/incoming");
httpHandler.setHttpMethod(HttpMethod.POST);
httpHandler.setExpectReply(true);
httpHandler.setMessageConverters(getMessageConverters());
httpHandler.setExpectedResponseType(String.class);
return httpHandler;
}

@Bean
public MessageChannel broadChannel(){
return new ExecutorChannel(Executors.newCachedThreadPool());
}

@Bean
@ServiceActivator(inputChannel = "brigdeChannel")
public MessageHandler bridgeHandler(){
BridgeHandler handler = new BridgeHandler();
handler.setOutputChannel(broadChannel());
return handler;
}


@Bean
public IntegrationFlow aIncomingFlow() {
return IntegrationFlows.from(aIncomingGateway())
.log(LoggingHandler.Level.INFO, "From Inbound http gateway", m -> m.getPayload().toString())
.publishSubscribeChannel(p -> p
.subscribe(s -> s.channel("j3Channel"))
.subscribe(s -> s.bridge(b -> bridgeHandler()) )
)
.get();
}

@Bean
public IntegrationFlow j3ValidationFlow() {
return IntegrationFlows.from("j3Channel")
.log(LoggingHandler.Level.INFO, "Run j3ChannelFlow", m -> m.getPayload().toString())
.handle(httpJ3Gateway())
.route("headers.http_statusCode", m -> m
.resolutionRequired(false)
.channelMapping("OK", "brigdeChannel")
.defaultOutputToParentFlow()
)
.channel("backAChannel")
.get();
}

@Bean
public IntegrationFlow broadcastFlow() {
return IntegrationFlows.from("broadChannel")
.log(LoggingHandler.Level.INFO, "Run broadcastFlow", m -> m.getPayload().toString())
.handle(httpPMGateway())
.channel("backAChannel")
.get();
}

请帮我出个主意。
我提前表示赞赏。感谢您帮助我解决这个愚蠢的问题。

最佳答案

是的;使用发布/订阅 channel 或收件人列表路由器

inbound -> ... -> pubsub

pubsub -> outbound1

pubsub -> bridge -> executorChannel -> outbound2

outbound1 将在入站线程上运行; outbound2 将在另一个线程上运行。

关于java - Spring Integration - 可以在特定时间订阅消息,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/43986193/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com