gpt4 book ai didi

java - Camel : How to join back to a single path after multicast?

转载 作者:搜寻专家 更新时间:2023-11-01 03:19:06 24 4
gpt4 key购买 nike

这似乎是一个非常简单的问题,但我已经尝试了所有我能想到的方法。基本上,我有一个计时器路由,可以将其消息发送到一堆不同的 bean。这些 bean 在交换器上设置了一个属性(我也尝试过在消息上设置一个 header ),我希望将所有这些 bean 的交换输出定向到一个过滤器(检查属性或 header ),然后可选地另一个端点。像这样:

                       ---> Bean A ---
/ \
timer --> multicast ------> Bean B ------> end --> filter --> endpoint
\ /
---> Bean C ---

目前路由看起来像这样,它适用于向 bean 进行多播:

from("timer://my-timer?fixedRate=true&period=20000&delay=0")
.multicast()
.to("bean:beanA", "bean:beanB", "bean:beanC");

以下是我尝试过的一些解决方案:

解决方案一

from("timer://my-timer?fixedRate=true&period=20000&delay=0")
.multicast()
.to("bean:beanA", "bean:beanB", "bean:beanC")
.filter(new myPredicate())
.to("myOptionalEndpoint");

这使过滤器与 bean 并行,而不是在它们之后。

方案二

from("timer://my-timer?fixedRate=true&period=20000&delay=0")
.multicast()
.to("bean:beanA", "bean:beanB", "bean:beanC")
.end()
.filter(new myPredicate())
.to("myOptionalEndpoint");

并行执行 bean,然后执行过滤器。但是,未设置属性/ header 。似乎交换是刚结束的计时器,而不是经历过 bean 类的交换......

编辑: 我尝试设置正文,但实际上到达过滤器的消息没有正文。我无法想象 Camel 会以某种方式去除消息的有效负载,因此我必须假设此交换是来自计时器的新交换,而不是经过 bean 的交换。但是,它发生在 bean 完成后

方案三

from("timer://my-timer?fixedRate=true&period=20000&delay=0")
.multicast()
.beanRef("beanA").to("direct:temp")
.beanRef("beanB").to("direct:temp")
.beanRef("beanC").to("direct:temp")
.end()

from("direct:temp")
.filter(new myPredicate())
.to("myOptionalEndpoint");

消息按预期到达过滤器,但我设置的属性/ header 消失了,因此没有消息通过过滤器。

编辑:尸体也不见了,很明显我没有得到来自 bean 的相同交换...

澄清一下,我正在寻找一种解决方案,其中将来自计时器的单个交换多播到每个 bean(因此现在我们有 3 个交换),然后将这 3 个中的每一个发送到过滤器。

谁能帮我弄清楚如何构建这条路线?

最佳答案

您需要使用聚合策略才能将所有结果聚合为一个。

下面是来自 http://javarticles.com/2015/05/apache-camel-multicast-examples.html 的一个很好的例子(请参阅具有自定义聚合策略的多播部分)

public class CamelMulticastAggregationExample {
public static final void main(String[] args) throws Exception {
JndiContext jndiContext = new JndiContext();
jndiContext.bind("myBean", new MyBean());
CamelContext camelContext = new DefaultCamelContext(jndiContext);
try {
camelContext.addRoutes(new RouteBuilder() {
public void configure() {
from("direct:start")
.multicast()
.aggregationStrategy(new JoinReplyAggregationStrategy())
.to("direct:a", "direct:b", "direct:c")
.end()
.to("stream:out");

from("direct:a")
.to("bean:myBean?method=addFirst");

from("direct:b")
.to("bean:myBean?method=addSecond");

from("direct:c")
.to("bean:myBean?method=addThird");
}
});
ProducerTemplate template = camelContext.createProducerTemplate();
camelContext.start();
template.sendBody("direct:start", "Multicast");
} finally {
camelContext.stop();
}
}

JoinReplyAggregationStrategy 类如下所示

public class JoinReplyAggregationStrategy implements AggregationStrategy {

public Exchange aggregate(Exchange exchange1, Exchange exchange2) {
if (exchange1 == null) {
return exchange2;
} else {
String body1 = exchange1.getIn().getBody(String.class);
String body2 = exchange2.getIn().getBody(String.class);
String merged = (body1 == null) ? body2 : body1 + "," + body2;
exchange1.getIn().setBody(merged);
return exchange1;
}
}
}

更新在您的情况下,您的聚合策略可能是按如下方式将所有交换聚集在一起:

public class ListAggregationStrategy implements AggregationStrategy {

public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
Message newIn = newExchange.getIn();
Object newBody = newIn.getBody();
List list = null;
if (oldExchange == null) {
list = new ArrayList();
list.add(newBody);
newIn.setBody(list);
return newExchange;
} else {
Message in = oldExchange.getIn();
list = in.getBody(List.class);
list.add(newBody);
return oldExchange;
}
}

}

关于java - Camel : How to join back to a single path after multicast?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/36608517/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com