gpt4 book ai didi

Spring 集成 HTTP 到 Scatter Gather

转载 作者:行者123 更新时间:2023-12-04 10:29:46 26 4
gpt4 key购买 nike

我是 Spring Integration 的新手,并试图利用 scatter-gather 的企业模式,但我正在为实现细节而苦苦挣扎,并为我可以在网上找到的可用示例而苦苦挣扎。

简而言之,我的场景是:

  • HTTP 请求从用户发送到系统 A。
  • 在响应(又名同步)之前,系统 A 向 N 个系统 X 异步发送 N 条消息。
  • 系统 A 坐下来等待响应。
  • 一旦有来自每个请求系统的响应,系统 A 将这些响应整理成一个更大的响应。
  • 系统 A 最终以较大的响应响应用户。

  • 基本上,就原始消费者而言,单个是请求响应,而不必“稍后回来”。但是,该请求实际上是针对掩盖其背后复杂性的外观(可能会影响数百个系统,使后端的同步请求性能不佳且不可行)。

    到目前为止,我已经有了这个实现(经过详细处理,因此可能不是我正在使用的 1:1 示例,例如,我已经制定出的相关策略并没有按照我的预期进行):
    @Bean
    public IntegrationFlow overallRequest(final AmqpTemplate amqpTemplate) {

    return IntegrationFlows.from( // HTTP endpoint to user makes requests on
    Http.inboundChannelAdapter("/request-overall-document")
    .requestMapping(m -> m.methods(HttpMethod.POST))
    .requestPayloadType(String.class))
    .log()
    // Arbitrary header to simplify example, realistically would generate a UUID
    // and attach to some correlating header that works for systems involved
    .enrichHeaders(p -> p.header("someHeader", "someValue"))
    .log()
    .scatterGather(
    recipientListRouterSpec ->
    recipientListRouterSpec
    .applySequence(true)
    .recipientFlow(
    flow ->
    flow.handle( // Straight pass through of msg received to see in response
    Amqp.outboundAdapter(amqpTemplate)
    .exchangeName( // RabbitMQ fanout exchange to N queues to N systems
    "request-overall-document-exchange"))),
    aggregatorSpec ->
    aggregatorSpec
    // Again for example, arbitrary once two correlated responses
    .correlationStrategy(msg -> msg.getHeaders().get("someHeader"))
    .releaseStrategy(gm -> gm.size() == 2)
    // Simple string concatenation for overall response
    .outputProcessor(
    msgrp ->
    msgrp.getMessages().stream()
    .map(msg -> msg.getPayload().toString())
    .reduce("Overall response: ", (nexus, txt) -> nexus + "|" + txt))
    // Reset group on each response
    .expireGroupsUponCompletion(true),
    scatterGatherSpec ->
    scatterGatherSpec.gatherChannel(
    responseChannel())) // The channel to listen for responses to request on
    .log()
    .get();
    }

    以此作为响应 channel 配置:
    @Bean
    public MessageChannel responseChannel() {
    return new QueueChannel();
    }

    @Bean
    public AmqpInboundChannelAdapter responseChannelAdapter(
    SimpleMessageListenerContainer listenerContainer,
    @Qualifier("responseChannel") MessageChannel channel) {
    AmqpInboundChannelAdapter adapter = new AmqpInboundChannelAdapter(listenerContainer);
    adapter.setOutputChannel(channel);
    return adapter;
    }

    @Bean
    public SimpleMessageListenerContainer responseContainer(ConnectionFactory connectionFactory) {
    SimpleMessageListenerContainer container =
    new SimpleMessageListenerContainer(connectionFactory);
    container.setQueueNames("request-overall-document-responses");
    return container;
    }

    将所有回复发送至 单独的 Spring 应用程序只是将请求有效负载再次通过管道返回(也就是用于测试而无需与实际系统集成):
    @Bean
    public IntegrationFlow systemOneReception(final ConnectionFactory connectionFactory, final AmqpTemplate amqpTemplate) {
    return IntegrationFlows.from(Amqp.inboundAdapter(connectionFactory, "request-overall-document-system-1"))
    .log()
    .handle(Amqp.outboundAdapter(amqpTemplate).routingKey("request-overall-document-responses"))
    .get();
    }

    @Bean
    public IntegrationFlow systemTwoReception(final ConnectionFactory connectionFactory, final AmqpTemplate amqpTemplate) {
    return IntegrationFlows.from(Amqp.inboundAdapter(connectionFactory, "request-overall-document-system-2"))
    .log()
    .handle(Amqp.outboundAdapter(amqpTemplate).routingKey("request-overall-document-responses"))
    .get();
    }

    根据 scatter-gather 实现中的聚合/发布策略,我在成功发布后在系统 A 中收到以下错误:
    2020-02-29 20:06:39.255 ERROR 152 --- [ask-scheduler-1] o.s.integration.handler.LoggingHandler   : org.springframework.messaging.MessageDeliveryException: The 'gatherResultChannel' header is required to deliver the gather result., failedMessage=GenericMessage [payload=Overall response: |somerequesttobesent|somerequesttobesent, headers={amqp_receivedDeliveryMode=PERSISTENT, content-length=19, amqp_deliveryTag=2, sequenceSize=1, amqp_redelivered=false, amqp_contentEncoding=UTF-8, host=localhost:18081, someHeader=someValue, connection=keep-alive, correlationId=182ee203-85ab-9ef6-7b19-3a8e2da8f5a7, id=994a0cf5-ad2b-02c3-dc93-74fae2f5092b, cache-control=no-cache, contentType=text/plain, timestamp=1583006799252, http_requestMethod=POST, sequenceNumber=1, amqp_consumerQueue=request-overall-document-responses, accept=*/*, amqp_receivedRoutingKey=request-overall-document-responses, amqp_timestamp=Sat Feb 29 20:06:39 GMT 2020, amqp_messageId=3341deae-7ed0-a042-0bb7-d2d2be871165, http_requestUrl=http://localhost:18081/request-overall-document, amqp_consumerTag=amq.ctag-ULxwuAjp8ZzcopBZYvcbZQ, accept-encoding=gzip, deflate, br, user-agent=PostmanRuntime/7.22.0}]
    at org.springframework.integration.scattergather.ScatterGatherHandler.lambda$doInit$2(ScatterGatherHandler.java:160)
    at org.springframework.integration.channel.FixedSubscriberChannel.send(FixedSubscriberChannel.java:77)
    at org.springframework.integration.channel.FixedSubscriberChannel.send(FixedSubscriberChannel.java:71)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)
    at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109)
    at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutput(AbstractMessageProducingHandler.java:431)
    at org.springframework.integration.handler.AbstractMessageProducingHandler.doProduceOutput(AbstractMessageProducingHandler.java:284)
    at org.springframework.integration.handler.AbstractMessageProducingHandler.produceOutput(AbstractMessageProducingHandler.java:265)
    at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutputs(AbstractMessageProducingHandler.java:223)
    at org.springframework.integration.aggregator.AbstractCorrelatingMessageHandler.completeGroup(AbstractCorrelatingMessageHandler.java:823)
    at org.springframework.integration.aggregator.AbstractCorrelatingMessageHandler.handleMessageInternal(AbstractCorrelatingMessageHandler.java:475)
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:169)
    at org.springframework.integration.endpoint.PollingConsumer.handleMessage(PollingConsumer.java:143)
    at org.springframework.integration.endpoint.AbstractPollingEndpoint.doPoll(AbstractPollingEndpoint.java:390)
    at org.springframework.integration.endpoint.AbstractPollingEndpoint.pollForMessage(AbstractPollingEndpoint.java:329)
    at org.springframework.integration.endpoint.AbstractPollingEndpoint.lambda$null$1(AbstractPollingEndpoint.java:277)
    at org.springframework.integration.util.ErrorHandlingTaskExecutor.lambda$execute$0(ErrorHandlingTaskExecutor.java:57)
    at org.springframework.core.task.SyncTaskExecutor.execute(SyncTaskExecutor.java:50)
    at org.springframework.integration.util.ErrorHandlingTaskExecutor.execute(ErrorHandlingTaskExecutor.java:55)
    at org.springframework.integration.endpoint.AbstractPollingEndpoint.lambda$createPoller$2(AbstractPollingEndpoint.java:274)
    at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54)
    at org.springframework.scheduling.concurrent.ReschedulingRunnable.run(ReschedulingRunnable.java:93)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

    现在我明白我有一些差距,但我正在努力找出如何前进:
  • 给定的错误:没有一些“gatherResultChannel”输出。我原以为这将是 scatterGather(...) 调用结果的后续“句柄”/“日志”/w.e,但不是这样。
  • 需要某种形式的从分散-聚集聚合的结果返回到原始 Http.XXX 请求的映射。

  • 编辑 :进一步挖掘,给出的问题似乎是因为当通过 AMQP(在我的情况下,RabbitMQ)出去时,有问题的标题是 deliberately dropped as it's a MessageChannel (see lines 230 to 257) .不确定这里的含义是否是拆分/聚合不打算在多个独立应用程序之间交叉(我的假设是它被删除是因为它是 Java 对象的一个​​实例,传递会有问题)...

    进一步编辑 :用新鲜的眼睛注意到我以前没有注意到的东西,我粘贴的异常引用了失败的消息,这似乎是输出处理的明确结果(在摆弄时,在 DirectChannel 和 QueueChannel 之间轻弹,只有 DirectChannel 不打印有效载荷所以没有寻找它)。为了确保它没有做一些克隆或奇怪的事情,更新了 stub 服务以转换和附加唯一的后缀(如下所示),是的,它实际上是在聚合。
       .transform(msg -> MessageFormat.format("{0}_system1response", msg))
    .transform(msg -> MessageFormat.format("{0}_system2response", msg))

    The 'gatherResultChannel' header is required to deliver the gather result., failedMessage=GenericMessage [payload=Overall response: |sometext_system2response|sometext_system1response, hea...

    所以看起来分散,收集和聚合都在起作用,唯一不是的就是给定的处理不知道在那之后将消息推送到哪里?

    再来一次:根据 Gary 的回应,用网关替换了所有适配器,但是这样做不能再扇出了吗?所以从 scatterGather 调用中删除了 scatterGatherSpec 参数,并替换/添加了两个收件人,如下所示:
    .recipientFlow(flow -> flow.handle(Amqp.asyncOutboundGateway(asyncTemplate).routingKeyFunction(m -> "request-overall-document-system-1"), e -> e.id("sytemOneOutboundGateway")))
    .recipientFlow(flow -> flow.handle(Amqp.asyncOutboundGateway(asyncTemplate).routingKeyFunction(m -> "request-overall-document-system-2"), e -> e.id("sytemTwoOutboundGateway")))

    这是我能得到的最接近一个工作示例的方法,然而,虽然这做了一些工作,但它会导致多次打开/关闭队列重新处理消息,其中我对带有“msgtosend”的 POST 的预期输出将是:
    Overall message: |msgtosend_system1response|msgtosend_system2response

    相反,我得到零星的输出,如:
    Overall message: |msgtosend|msgtosend_system1response
    Overall message: |msgtosend_system2response|msgtosend_system1response_system1response
    Overall message: |msgtosend|msgtosend_system1response_system1response
    Overall message: |msgtosend_system2response|msgtosend_system1response_system1response

    我假设有一些配置/bean 重叠,但尝试我可能无法隔离它是什么,即连接工厂、监听器容器、异步模板等。

    最佳答案

    使用 AMQP 出站网关而不是出站和入站 channel 适配器;这样 channel 标题将被保留。有一个AsyncAmqpOutboundGateway这可能最适合您的目的。

    如果出于某种原因必须使用 channel 适配器,请将标题丰富器与 Header Channel Registry 一起使用。将 channel 转换为字符串表示形式,以便保留。

    编辑

    这是一个简单的例子:

    @SpringBootApplication
    public class So60469260Application {

    public static void main(String[] args) {
    SpringApplication.run(So60469260Application.class, args);
    }

    @Bean
    public IntegrationFlow flow(AsyncRabbitTemplate aTemp) {
    return IntegrationFlows.from(Gate.class)
    .enrichHeaders(he -> he.headerExpression("corr", "payload"))
    .scatterGather(rlr -> rlr
    .applySequence(true)
    .recipientFlow(f1 -> f1.handle(Amqp.asyncOutboundGateway(aTemp)
    .routingKey("foo")))
    .recipientFlow(f2 -> f2.handle(Amqp.asyncOutboundGateway(aTemp)
    .routingKey("bar"))),
    agg -> agg.correlationStrategy(msg -> msg.getHeaders().get("corr")))
    .get();
    }

    @Bean
    public AsyncRabbitTemplate aTemp(RabbitTemplate template) {
    return new AsyncRabbitTemplate(template);
    }

    @Bean
    @DependsOn("flow")
    public ApplicationRunner runner(Gate gate) {
    return args -> System.out.println(gate.doIt("foo"));
    }

    @RabbitListener(queues = "foo")
    public String foo(String in) {
    return in.toUpperCase();
    }

    @RabbitListener(queues = "bar")
    public String bar(String in) {
    return in + in;
    }

    }

    interface Gate {

    List<String> doIt(String in);

    }
    [foofoo, FOO]

    关于Spring 集成 HTTP 到 Scatter Gather,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/60469260/

    26 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com