gpt4 book ai didi

java - 使用 Camel 聚合相同 header 的消息

转载 作者:行者123 更新时间:2023-11-29 05:41:48 25 4
gpt4 key购买 nike

我有多个客户端向服务器发送文件。对于一组数据,有两个文件包含有关该数据的信息,每个文件具有相同的名称。收到文件后,服​​务器向我的队列发送一条消息,其中包含文件路径、文件名、客户端 ID 以及文件的“类型”(所有文件扩展名相同,但有两种“类型”, “称他们为 A 和 B)。

一组数据的两个文件具有相同的文件名。一旦服务器收到这两个文件,我就需要启动一个将两者结合起来的程序。目前我有一些看起来像这样的东西:

from("jms:queue.name").aggregate(header("CamelFileName")).completionSize(2).to("exec://FILEPATH?args=");

我卡住的地方是 header (“CamelFileName”),更具体地说是聚合器的工作方式。

将 completionSize 设置为 2 时,它是否只是吸收所有消息并将它们存储在某个数据结构中,直到与第一个匹配的第二个消息通过?另外, header() 是否需要特定值?我有多个客户,所以我想在 header 中包含客户 ID 和文件名,但我又不知道是否必须提供特定值。我也不知道我是否可以使用正则表达式。

任何想法或提示都会非常有帮助。谢谢

编辑:这是我现在拥有的一些代码。根据我在这里对问题的描述以及对所选答案的评论,它看起来是否准确(除了我没有复制的右括号)?

public static void main(String args[]) throws Exception{
CamelContext c = new DefaultCamelContext();
c.addComponent("activemq", activeMQComponent("vm://localhost?broker.persistent=false"));
//ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("vm://localhost?broker.persistent=false");
//c.addComponent("jms", JmsComponent.jmsComponentAutoAcknowledge(connectionFactory));
c.addRoutes(new RouteBuilder() {
public void configure() {
from("activemq:queue:analytics.camelqueue").aggregate(new MyAggregationStrategy()).header("subject").completionSize(2).to("activemq:queue:analytics.success");
}
});
c.start();
while (true) {
System.out.println("Waiting on messages to come through for camel");
Thread.sleep(2 * 1000);
}
//c.stop();
}

private static class MyAggregationStrategy implements AggregationStrategy {

public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
if (oldExchange == null)
return newExchange;
// and here is where combo stuff goes
String oldBody = oldExchange.getIn().getBody(String.class);
String newBody = newExchange.getIn().getBody(String.class);
boolean oldSet = oldBody.contains("set");
boolean newSet = newBody.contains("set");
boolean oldFlow = oldBody.contains("flow");
boolean newFlow = newBody.contains("flow");
if ( (oldSet && newFlow) || (oldFlow && newSet) ) {
//they match so return new exchange with info so extractor can be started with exec
String combined = oldBody + "\n" + newBody + "\n";
newExchange.getIn().setBody(combined);
return newExchange;
}
else {
// no match so do something....
return null;
}
}
}

最佳答案

您必须提供一个 AggregationStrategy 来定义您希望如何组合交换...

如果您只对文件名感兴趣并正好接收 2 个 Exchange,那么您可以使用 UseLatestAggregationStrategy 在 2 个“聚合”后通过最新的 Exchange...

也就是说,听起来您需要保留两个 Exchange(每个 clientId 一个),以便您可以将该信息传递给“exec”步骤...如果是这样,您可以将 Exchange 合并到一个 GroupedExchange 持有者中使用通过 groupExchanges 选项启用的内置聚合策略...或指定一个自定义 AggregationStrategy 以根据需要组合它们。只需要记住,您的“执行”步骤需要处理您决定使用的任何聚合结构...

有关示例,请参阅这些单元测试:

https://svn.apache.org/repos/asf/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregatorTest.java

https://svn.apache.org/repos/asf/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateGroupedExchangeTest.java

关于java - 使用 Camel 聚合相同 header 的消息,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/17240421/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com