- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我是 Spring Integration 的新手,并试图利用 scatter-gather 的企业模式,但我正在为实现细节而苦苦挣扎,并为我可以在网上找到的可用示例而苦苦挣扎。
简而言之,我的场景是:
@Bean
public IntegrationFlow overallRequest(final AmqpTemplate amqpTemplate) {
return IntegrationFlows.from( // HTTP endpoint to user makes requests on
Http.inboundChannelAdapter("/request-overall-document")
.requestMapping(m -> m.methods(HttpMethod.POST))
.requestPayloadType(String.class))
.log()
// Arbitrary header to simplify example, realistically would generate a UUID
// and attach to some correlating header that works for systems involved
.enrichHeaders(p -> p.header("someHeader", "someValue"))
.log()
.scatterGather(
recipientListRouterSpec ->
recipientListRouterSpec
.applySequence(true)
.recipientFlow(
flow ->
flow.handle( // Straight pass through of msg received to see in response
Amqp.outboundAdapter(amqpTemplate)
.exchangeName( // RabbitMQ fanout exchange to N queues to N systems
"request-overall-document-exchange"))),
aggregatorSpec ->
aggregatorSpec
// Again for example, arbitrary once two correlated responses
.correlationStrategy(msg -> msg.getHeaders().get("someHeader"))
.releaseStrategy(gm -> gm.size() == 2)
// Simple string concatenation for overall response
.outputProcessor(
msgrp ->
msgrp.getMessages().stream()
.map(msg -> msg.getPayload().toString())
.reduce("Overall response: ", (nexus, txt) -> nexus + "|" + txt))
// Reset group on each response
.expireGroupsUponCompletion(true),
scatterGatherSpec ->
scatterGatherSpec.gatherChannel(
responseChannel())) // The channel to listen for responses to request on
.log()
.get();
}
@Bean
public MessageChannel responseChannel() {
return new QueueChannel();
}
@Bean
public AmqpInboundChannelAdapter responseChannelAdapter(
SimpleMessageListenerContainer listenerContainer,
@Qualifier("responseChannel") MessageChannel channel) {
AmqpInboundChannelAdapter adapter = new AmqpInboundChannelAdapter(listenerContainer);
adapter.setOutputChannel(channel);
return adapter;
}
@Bean
public SimpleMessageListenerContainer responseContainer(ConnectionFactory connectionFactory) {
SimpleMessageListenerContainer container =
new SimpleMessageListenerContainer(connectionFactory);
container.setQueueNames("request-overall-document-responses");
return container;
}
@Bean
public IntegrationFlow systemOneReception(final ConnectionFactory connectionFactory, final AmqpTemplate amqpTemplate) {
return IntegrationFlows.from(Amqp.inboundAdapter(connectionFactory, "request-overall-document-system-1"))
.log()
.handle(Amqp.outboundAdapter(amqpTemplate).routingKey("request-overall-document-responses"))
.get();
}
@Bean
public IntegrationFlow systemTwoReception(final ConnectionFactory connectionFactory, final AmqpTemplate amqpTemplate) {
return IntegrationFlows.from(Amqp.inboundAdapter(connectionFactory, "request-overall-document-system-2"))
.log()
.handle(Amqp.outboundAdapter(amqpTemplate).routingKey("request-overall-document-responses"))
.get();
}
2020-02-29 20:06:39.255 ERROR 152 --- [ask-scheduler-1] o.s.integration.handler.LoggingHandler : org.springframework.messaging.MessageDeliveryException: The 'gatherResultChannel' header is required to deliver the gather result., failedMessage=GenericMessage [payload=Overall response: |somerequesttobesent|somerequesttobesent, headers={amqp_receivedDeliveryMode=PERSISTENT, content-length=19, amqp_deliveryTag=2, sequenceSize=1, amqp_redelivered=false, amqp_contentEncoding=UTF-8, host=localhost:18081, someHeader=someValue, connection=keep-alive, correlationId=182ee203-85ab-9ef6-7b19-3a8e2da8f5a7, id=994a0cf5-ad2b-02c3-dc93-74fae2f5092b, cache-control=no-cache, contentType=text/plain, timestamp=1583006799252, http_requestMethod=POST, sequenceNumber=1, amqp_consumerQueue=request-overall-document-responses, accept=*/*, amqp_receivedRoutingKey=request-overall-document-responses, amqp_timestamp=Sat Feb 29 20:06:39 GMT 2020, amqp_messageId=3341deae-7ed0-a042-0bb7-d2d2be871165, http_requestUrl=http://localhost:18081/request-overall-document, amqp_consumerTag=amq.ctag-ULxwuAjp8ZzcopBZYvcbZQ, accept-encoding=gzip, deflate, br, user-agent=PostmanRuntime/7.22.0}]
at org.springframework.integration.scattergather.ScatterGatherHandler.lambda$doInit$2(ScatterGatherHandler.java:160)
at org.springframework.integration.channel.FixedSubscriberChannel.send(FixedSubscriberChannel.java:77)
at org.springframework.integration.channel.FixedSubscriberChannel.send(FixedSubscriberChannel.java:71)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)
at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109)
at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutput(AbstractMessageProducingHandler.java:431)
at org.springframework.integration.handler.AbstractMessageProducingHandler.doProduceOutput(AbstractMessageProducingHandler.java:284)
at org.springframework.integration.handler.AbstractMessageProducingHandler.produceOutput(AbstractMessageProducingHandler.java:265)
at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutputs(AbstractMessageProducingHandler.java:223)
at org.springframework.integration.aggregator.AbstractCorrelatingMessageHandler.completeGroup(AbstractCorrelatingMessageHandler.java:823)
at org.springframework.integration.aggregator.AbstractCorrelatingMessageHandler.handleMessageInternal(AbstractCorrelatingMessageHandler.java:475)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:169)
at org.springframework.integration.endpoint.PollingConsumer.handleMessage(PollingConsumer.java:143)
at org.springframework.integration.endpoint.AbstractPollingEndpoint.doPoll(AbstractPollingEndpoint.java:390)
at org.springframework.integration.endpoint.AbstractPollingEndpoint.pollForMessage(AbstractPollingEndpoint.java:329)
at org.springframework.integration.endpoint.AbstractPollingEndpoint.lambda$null$1(AbstractPollingEndpoint.java:277)
at org.springframework.integration.util.ErrorHandlingTaskExecutor.lambda$execute$0(ErrorHandlingTaskExecutor.java:57)
at org.springframework.core.task.SyncTaskExecutor.execute(SyncTaskExecutor.java:50)
at org.springframework.integration.util.ErrorHandlingTaskExecutor.execute(ErrorHandlingTaskExecutor.java:55)
at org.springframework.integration.endpoint.AbstractPollingEndpoint.lambda$createPoller$2(AbstractPollingEndpoint.java:274)
at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54)
at org.springframework.scheduling.concurrent.ReschedulingRunnable.run(ReschedulingRunnable.java:93)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
.transform(msg -> MessageFormat.format("{0}_system1response", msg))
.transform(msg -> MessageFormat.format("{0}_system2response", msg))
The 'gatherResultChannel' header is required to deliver the gather result., failedMessage=GenericMessage [payload=Overall response: |sometext_system2response|sometext_system1response, hea...
.recipientFlow(flow -> flow.handle(Amqp.asyncOutboundGateway(asyncTemplate).routingKeyFunction(m -> "request-overall-document-system-1"), e -> e.id("sytemOneOutboundGateway")))
.recipientFlow(flow -> flow.handle(Amqp.asyncOutboundGateway(asyncTemplate).routingKeyFunction(m -> "request-overall-document-system-2"), e -> e.id("sytemTwoOutboundGateway")))
Overall message: |msgtosend_system1response|msgtosend_system2response
Overall message: |msgtosend|msgtosend_system1response
Overall message: |msgtosend_system2response|msgtosend_system1response_system1response
Overall message: |msgtosend|msgtosend_system1response_system1response
Overall message: |msgtosend_system2response|msgtosend_system1response_system1response
最佳答案
使用 AMQP 出站网关而不是出站和入站 channel 适配器;这样 channel 标题将被保留。有一个AsyncAmqpOutboundGateway
这可能最适合您的目的。
如果出于某种原因必须使用 channel 适配器,请将标题丰富器与 Header Channel Registry 一起使用。将 channel 转换为字符串表示形式,以便保留。
编辑
这是一个简单的例子:
@SpringBootApplication
public class So60469260Application {
public static void main(String[] args) {
SpringApplication.run(So60469260Application.class, args);
}
@Bean
public IntegrationFlow flow(AsyncRabbitTemplate aTemp) {
return IntegrationFlows.from(Gate.class)
.enrichHeaders(he -> he.headerExpression("corr", "payload"))
.scatterGather(rlr -> rlr
.applySequence(true)
.recipientFlow(f1 -> f1.handle(Amqp.asyncOutboundGateway(aTemp)
.routingKey("foo")))
.recipientFlow(f2 -> f2.handle(Amqp.asyncOutboundGateway(aTemp)
.routingKey("bar"))),
agg -> agg.correlationStrategy(msg -> msg.getHeaders().get("corr")))
.get();
}
@Bean
public AsyncRabbitTemplate aTemp(RabbitTemplate template) {
return new AsyncRabbitTemplate(template);
}
@Bean
@DependsOn("flow")
public ApplicationRunner runner(Gate gate) {
return args -> System.out.println(gate.doIt("foo"));
}
@RabbitListener(queues = "foo")
public String foo(String in) {
return in.toUpperCase();
}
@RabbitListener(queues = "bar")
public String bar(String in) {
return in + in;
}
}
interface Gate {
List<String> doIt(String in);
}
[foofoo, FOO]
关于Spring 集成 HTTP 到 Scatter Gather,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/60469260/
我有几个 pandas 数据框。我想在单独的散点图中相互绘制几列,并将它们组合为图中的子图。我想相应地标记每个子图。我在让子图标签正常工作方面遇到了很多麻烦,直到我发现据我所知,有两种直接从数据帧进行
使用 Dask 分布式分散广播列表的合适方法是什么? 案例 1 - 包装列表: [future_list] = client.scatter([my_list], broadcast=True) 情况
使用 Dask 分布式分散广播列表的合适方法是什么? 案例 1 - 包装列表: [future_list] = client.scatter([my_list], broadcast=True) 情况
我有一个包含数组元素的列表: [array([2.40460915, 0.85513601]), array([1.80998096, 0.97406986]), array([2.14505475,
我想显示一些点。这是我的代码: plt.scatter(y[:,0],y[:,1],c=col) plt.show() 并作为 col我有: Col: [1 1 0 1 1 1 1 0 0 0 1
当我在 OSX Yosemite 上运行它时,生成的颜色条有奇怪的白线(见下图)。有什么方法可以生成没有这些难看的线条的颜色条吗? import pylab import numpy x = nump
我已成功绘制了两个图表,但当我绘制第三个图表时,出现无效语法错误。我是否遗漏了一些非常明显的东西? x=df['time'] d=df['dist'] x2=df2['time'] d2=df2['d
我的每个进程都有一个值数组 v 和一个大小相同的进程 ID pid 数组。 pid[i] 指定要将项目 v[i] 发送到哪个进程。 我需要实现一个分散操作(当然还有相应的聚集操作)。 进程 ID 数组
如何使用核心图绘制有间隙的散点图?我正在使用核心绘图库。一切正常,但现在我想在数据线的开头或中间做一个有间隙的散点图?有人帮我吗? 最佳答案 让数据源返回nil 或[NSNull null]。例如,如
我有一些代码可以绘制一些点。我将 ax.scatter 替换为 ax.plot,这样我就可以单独控制每个点的颜色。但是,当我进行此更改时,x 轴和 y 轴的范围似乎会增加。 我无法确定为什么会这样。我
从我复制到自己服务器的已知公共(public)数据集开始。 数据集在这里:https://www.kaggle.com/imdevskp/corona-virus-report/download im
我正在使用 Bokeh 将 ~700 次模拟的结果与使用散点图的另一组结果进行对比。我想使用悬停工具通过分配标识模拟参数的自定义索引来定性地确定数据中的模式。 在下面的代码中,x和 y是来自 Pand
这个问题在这里已经有了答案: matplotlib scatter edge without specifying edgecolor (1 个回答) 3年前关闭。 我想通过使用 2 个索引 [Chi
我希望更好地理解流行的 EIP 分散聚集中的多个分离器、转换器和聚合方法。 用例是我发送一些可以对应于多种不同类型的 XML。我想分割 header 中指定的类型(每种类型不同)并通过与每种类型对应的
考虑以下示例(取自 Stackoverflow 上的另一篇文章): require(ggplot2) d <- data.frame(x = c(102856,17906,89697,74384,91
我有许多带有形状的矩阵 w1、w2、w3...wn (k*n1 、k*n2、k*n3...k*nn) 和 x1、x2、x3...xn 具有形状(n1*m、n2*m、n3*m...nn*m >). 我想
假设我的数据按以下方式组织: x_values = [6.2, 3.6, 7.3, 3.2, 2.7] y_values = [1.5, 3.2, 5.4, 3.1, 2.8] colours = [
我对 Spring Integration 还是很陌生,我尝试使用 IntetrationFlowDefinition.scatterGather() 但无济于事。总体思路是: 将一个String作为
我有一个谷歌分散图,我希望当鼠标悬停在圆圈上时,然后打开一个包含自定义字符串的信息窗口。 例如教程 https://developers.google.com/chart/interactive/do
好吧,我了解 JFreeChart 和其他人,但我正在编写自己的简单散点图。我已经有了一个箱形图(没有 y 轴标签,但当我在报告中解释它时,这不应该是一个大问题)。 我有一个基本的散点图类,但是我尝试
我是一名优秀的程序员,十分优秀!