- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我是 Spring Integration 的新手,并试图利用 scatter-gather 的企业模式,但我正在为实现细节而苦苦挣扎,并为我可以在网上找到的可用示例而苦苦挣扎。
简而言之,我的场景是:
@Bean
public IntegrationFlow overallRequest(final AmqpTemplate amqpTemplate) {
return IntegrationFlows.from( // HTTP endpoint to user makes requests on
Http.inboundChannelAdapter("/request-overall-document")
.requestMapping(m -> m.methods(HttpMethod.POST))
.requestPayloadType(String.class))
.log()
// Arbitrary header to simplify example, realistically would generate a UUID
// and attach to some correlating header that works for systems involved
.enrichHeaders(p -> p.header("someHeader", "someValue"))
.log()
.scatterGather(
recipientListRouterSpec ->
recipientListRouterSpec
.applySequence(true)
.recipientFlow(
flow ->
flow.handle( // Straight pass through of msg received to see in response
Amqp.outboundAdapter(amqpTemplate)
.exchangeName( // RabbitMQ fanout exchange to N queues to N systems
"request-overall-document-exchange"))),
aggregatorSpec ->
aggregatorSpec
// Again for example, arbitrary once two correlated responses
.correlationStrategy(msg -> msg.getHeaders().get("someHeader"))
.releaseStrategy(gm -> gm.size() == 2)
// Simple string concatenation for overall response
.outputProcessor(
msgrp ->
msgrp.getMessages().stream()
.map(msg -> msg.getPayload().toString())
.reduce("Overall response: ", (nexus, txt) -> nexus + "|" + txt))
// Reset group on each response
.expireGroupsUponCompletion(true),
scatterGatherSpec ->
scatterGatherSpec.gatherChannel(
responseChannel())) // The channel to listen for responses to request on
.log()
.get();
}
@Bean
public MessageChannel responseChannel() {
return new QueueChannel();
}
@Bean
public AmqpInboundChannelAdapter responseChannelAdapter(
SimpleMessageListenerContainer listenerContainer,
@Qualifier("responseChannel") MessageChannel channel) {
AmqpInboundChannelAdapter adapter = new AmqpInboundChannelAdapter(listenerContainer);
adapter.setOutputChannel(channel);
return adapter;
}
@Bean
public SimpleMessageListenerContainer responseContainer(ConnectionFactory connectionFactory) {
SimpleMessageListenerContainer container =
new SimpleMessageListenerContainer(connectionFactory);
container.setQueueNames("request-overall-document-responses");
return container;
}
@Bean
public IntegrationFlow systemOneReception(final ConnectionFactory connectionFactory, final AmqpTemplate amqpTemplate) {
return IntegrationFlows.from(Amqp.inboundAdapter(connectionFactory, "request-overall-document-system-1"))
.log()
.handle(Amqp.outboundAdapter(amqpTemplate).routingKey("request-overall-document-responses"))
.get();
}
@Bean
public IntegrationFlow systemTwoReception(final ConnectionFactory connectionFactory, final AmqpTemplate amqpTemplate) {
return IntegrationFlows.from(Amqp.inboundAdapter(connectionFactory, "request-overall-document-system-2"))
.log()
.handle(Amqp.outboundAdapter(amqpTemplate).routingKey("request-overall-document-responses"))
.get();
}
2020-02-29 20:06:39.255 ERROR 152 --- [ask-scheduler-1] o.s.integration.handler.LoggingHandler : org.springframework.messaging.MessageDeliveryException: The 'gatherResultChannel' header is required to deliver the gather result., failedMessage=GenericMessage [payload=Overall response: |somerequesttobesent|somerequesttobesent, headers={amqp_receivedDeliveryMode=PERSISTENT, content-length=19, amqp_deliveryTag=2, sequenceSize=1, amqp_redelivered=false, amqp_contentEncoding=UTF-8, host=localhost:18081, someHeader=someValue, connection=keep-alive, correlationId=182ee203-85ab-9ef6-7b19-3a8e2da8f5a7, id=994a0cf5-ad2b-02c3-dc93-74fae2f5092b, cache-control=no-cache, contentType=text/plain, timestamp=1583006799252, http_requestMethod=POST, sequenceNumber=1, amqp_consumerQueue=request-overall-document-responses, accept=*/*, amqp_receivedRoutingKey=request-overall-document-responses, amqp_timestamp=Sat Feb 29 20:06:39 GMT 2020, amqp_messageId=3341deae-7ed0-a042-0bb7-d2d2be871165, http_requestUrl=http://localhost:18081/request-overall-document, amqp_consumerTag=amq.ctag-ULxwuAjp8ZzcopBZYvcbZQ, accept-encoding=gzip, deflate, br, user-agent=PostmanRuntime/7.22.0}]
at org.springframework.integration.scattergather.ScatterGatherHandler.lambda$doInit$2(ScatterGatherHandler.java:160)
at org.springframework.integration.channel.FixedSubscriberChannel.send(FixedSubscriberChannel.java:77)
at org.springframework.integration.channel.FixedSubscriberChannel.send(FixedSubscriberChannel.java:71)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)
at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109)
at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutput(AbstractMessageProducingHandler.java:431)
at org.springframework.integration.handler.AbstractMessageProducingHandler.doProduceOutput(AbstractMessageProducingHandler.java:284)
at org.springframework.integration.handler.AbstractMessageProducingHandler.produceOutput(AbstractMessageProducingHandler.java:265)
at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutputs(AbstractMessageProducingHandler.java:223)
at org.springframework.integration.aggregator.AbstractCorrelatingMessageHandler.completeGroup(AbstractCorrelatingMessageHandler.java:823)
at org.springframework.integration.aggregator.AbstractCorrelatingMessageHandler.handleMessageInternal(AbstractCorrelatingMessageHandler.java:475)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:169)
at org.springframework.integration.endpoint.PollingConsumer.handleMessage(PollingConsumer.java:143)
at org.springframework.integration.endpoint.AbstractPollingEndpoint.doPoll(AbstractPollingEndpoint.java:390)
at org.springframework.integration.endpoint.AbstractPollingEndpoint.pollForMessage(AbstractPollingEndpoint.java:329)
at org.springframework.integration.endpoint.AbstractPollingEndpoint.lambda$null$1(AbstractPollingEndpoint.java:277)
at org.springframework.integration.util.ErrorHandlingTaskExecutor.lambda$execute$0(ErrorHandlingTaskExecutor.java:57)
at org.springframework.core.task.SyncTaskExecutor.execute(SyncTaskExecutor.java:50)
at org.springframework.integration.util.ErrorHandlingTaskExecutor.execute(ErrorHandlingTaskExecutor.java:55)
at org.springframework.integration.endpoint.AbstractPollingEndpoint.lambda$createPoller$2(AbstractPollingEndpoint.java:274)
at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54)
at org.springframework.scheduling.concurrent.ReschedulingRunnable.run(ReschedulingRunnable.java:93)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
.transform(msg -> MessageFormat.format("{0}_system1response", msg))
.transform(msg -> MessageFormat.format("{0}_system2response", msg))
The 'gatherResultChannel' header is required to deliver the gather result., failedMessage=GenericMessage [payload=Overall response: |sometext_system2response|sometext_system1response, hea...
.recipientFlow(flow -> flow.handle(Amqp.asyncOutboundGateway(asyncTemplate).routingKeyFunction(m -> "request-overall-document-system-1"), e -> e.id("sytemOneOutboundGateway")))
.recipientFlow(flow -> flow.handle(Amqp.asyncOutboundGateway(asyncTemplate).routingKeyFunction(m -> "request-overall-document-system-2"), e -> e.id("sytemTwoOutboundGateway")))
Overall message: |msgtosend_system1response|msgtosend_system2response
Overall message: |msgtosend|msgtosend_system1response
Overall message: |msgtosend_system2response|msgtosend_system1response_system1response
Overall message: |msgtosend|msgtosend_system1response_system1response
Overall message: |msgtosend_system2response|msgtosend_system1response_system1response
最佳答案
使用 AMQP 出站网关而不是出站和入站 channel 适配器;这样 channel 标题将被保留。有一个AsyncAmqpOutboundGateway
这可能最适合您的目的。
如果出于某种原因必须使用 channel 适配器,请将标题丰富器与 Header Channel Registry 一起使用。将 channel 转换为字符串表示形式,以便保留。
编辑
这是一个简单的例子:
@SpringBootApplication
public class So60469260Application {
public static void main(String[] args) {
SpringApplication.run(So60469260Application.class, args);
}
@Bean
public IntegrationFlow flow(AsyncRabbitTemplate aTemp) {
return IntegrationFlows.from(Gate.class)
.enrichHeaders(he -> he.headerExpression("corr", "payload"))
.scatterGather(rlr -> rlr
.applySequence(true)
.recipientFlow(f1 -> f1.handle(Amqp.asyncOutboundGateway(aTemp)
.routingKey("foo")))
.recipientFlow(f2 -> f2.handle(Amqp.asyncOutboundGateway(aTemp)
.routingKey("bar"))),
agg -> agg.correlationStrategy(msg -> msg.getHeaders().get("corr")))
.get();
}
@Bean
public AsyncRabbitTemplate aTemp(RabbitTemplate template) {
return new AsyncRabbitTemplate(template);
}
@Bean
@DependsOn("flow")
public ApplicationRunner runner(Gate gate) {
return args -> System.out.println(gate.doIt("foo"));
}
@RabbitListener(queues = "foo")
public String foo(String in) {
return in.toUpperCase();
}
@RabbitListener(queues = "bar")
public String bar(String in) {
return in + in;
}
}
interface Gate {
List<String> doIt(String in);
}
[foofoo, FOO]
关于Spring 集成 HTTP 到 Scatter Gather,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/60469260/
参数说明 以官方说明为例,gather()函数需要三个参数,输入input,维度dim,以及索引index input必须为Tensor类型 dim为int类型,代表从哪个维度进行索引 in
我知道如何在 melt 中使用两个 id.vars .这很简单: x = data.frame(subject = c("John", "Mary"), time = c
我正在尝试使用 gather在 tidyr包,但我无法从默认名称更改输出的列名称。例如: df = data.frame(time = 1:100,a = 1:100,b = 101:200) df.
为什么 asyncio.gather 不适用于生成器表达式? import asyncio async def func(): await asyncio.sleep(2) # Works a
我想整理一些不幸的是在前两行中设置了两个列标题的数据: 第一行(标题):实际上是度量的类型(例如。估计、标准误差、上限、下限)。 第二行(也是标题):是度量的年份。 有什么方法可以使用gather()
当我添加 NuGet 包(最新版本的 NuGet 和 Visual Studio 2015)时,它在安装包之前在“尝试收集依赖项”处挂起大约 5 分钟。我可以指向 NuGet.org、我们的内部服务器
我想在 melt 中指定输出列的类别(或 gather)。我想为所有列和不同的类做这件事。 例如,我有一些数据: example example day max min 1 1 20
我有一个按地区进行满意度调查的结果数据集。调查中的每个问题都采用 4 分制评分(从非常满意到非常不满意)。数据集中的每一行都包含给定“财政年度”结束时给定区域中给定问题的汇总结果。它还包含每个级别的受
键排序是否取决于我是否首先列出要收集的列与不收集的列? 这是我的数据框: library(tidyr) wide_df <- data.frame(c("a", "b"), c("oh", "ah")
我见过asyncio.gather vs asyncio.wait ,但不确定这是否解决了这个特定问题。我想做的是将 asyncio.gather() 协程包装在 asyncio.wait_for()
我正在尝试了解 AVX2 intel intrinsic 的收集功能。 根据官方文档Link ,函数定义为, __m256i _mm256_i32gather_epi32 (int const* ba
首先,我一直在使用 this code作为引用,它显示了不使用 MPI_Scatter 的 MPI_Gather 的使用,因为这就是我在这里想要实现的目标。我已经为此工作了很长时间,只是无法弄清楚这个
我正在使用 MPI 开发 mandelbrot 生成器,它在完成时输出 PPM 文件。我使用 MPI gather 将计算结果 block 收集到最终数组中。代码生成文件但不完整;仅显示图片的上半部分
我正在使用 R 将宽格式数据表转换为长格式。它有效,除了必须为新列使用变量: library(readr) library(tidyr) files <- Sys.glob("sources/*.cs
使用 Python 3.7,我试图捕获异常并通过 following an example I found on StackOverflow 重新引发它.虽然该示例确实有效,但它似乎并不适用于所有情况
我有一个数据框,看起来像下面“输入”中显示的图片。 我尝试每行获取 1 个日期(请参见下面“所需输出”中的图片)。换句话说,我尝试为每一行做一种“转置”。 让我们规定组合 'LC' 和 'Prod'
我正在尝试使用索引张量对张量进行切片。为此,我尝试使用 tf.gather . 但是,我很难理解 documentation并且不要让它像我期望的那样工作: 我有两个张量。安 activations形
我想 gather() 列出列以在我的数据框中创建新行。我正在使用 repurrrsive 包中的《权力的游戏》数据集。下面是我设置问题的代码: library(tidyverse) got_char
我想有条件地运行异步函数,如下所示: one, two, three = await asyncio.gather( some_async_method1(), some_async_
我正在使用tensorflow的tf.gather从多维数组中获取元素,如下所示: import tensorflow as tf indices = tf.constant([0, 1, 1]) x
我是一名优秀的程序员,十分优秀!