gpt4 book ai didi

java - 从组合 route 获取准确响应

转载 作者:行者123 更新时间:2023-11-30 08:11:49 26 4
gpt4 key购买 nike

我有一个动态路线创建器 Web 应用程序。根据流程设计,我制作了一条 Camel 路线。路由可能包含多播、过滤器、聚合、处理器等。通过 UI 设计流程后,我的路由创建如下:

from("seda:start").routeId("idx")
.multicast()
.to("direct:a", "direct:b", "direct:c")
.parallelProcessing()
.end();

from("direct:a").transform(constant("A")).delay(1000).to("direct:merge");
from("direct:b").transform(constant("B")).delay(2000).to("direct:merge");
from("direct:c").transform(constant("C")).delay(3000).to("direct:merge");

from("direct:merge")
.aggregate(new MyAggregationStrategy()).constant(true).completionSize(3)
.to("mock:end");

我有一个 API 可以将这条路线的结果提供给用户。当我使用 InOut MEP 执行此路线时,响应为“C”,但 mock:end 满足“ABC”:

MockEndpoint mock = getMockEndpoint("mock:end");
mock.expectedBodiesReceived("ABC"); //works as expected

String reply = template.requestBody("seda:start", "", String.class);

assertEquals("ABC", reply); //it returns 'C', but I expect 'ABC'

assertMockEndpointsSatisfied();

如何更改代码以通过同步调用获取聚合结果?这是代码:

public class ResponseTest extends CamelTestSupport {

@Test
public void testAsyncInOut() throws Exception {
MockEndpoint mock = getMockEndpoint("mock:end");
mock.expectedBodiesReceived("ABC"); //works as expected

String reply = template.requestBody("seda:start", "", String.class);

assertEquals("ABC", reply); //it returns 'C', but I expect 'ABC'

assertMockEndpointsSatisfied();
}

@Override
protected RouteBuilder createRouteBuilder() throws Exception {
return new RouteBuilder() {
@Override
public void configure() throws Exception {
from("seda:start").routeId("idx")
.multicast()
.to("direct:a", "direct:b", "direct:c")
.parallelProcessing()
.end();

from("direct:a").transform(constant("A")).delay(1000).to("direct:merge");
from("direct:b").transform(constant("B")).delay(2000).to("direct:merge");
from("direct:c").transform(constant("C")).delay(3000).to("direct:merge");

from("direct:merge")
.aggregate(new MyAggregationStrategy()).constant(true).completionSize(3)
.to("mock:end");
}
};
}

class MyAggregationStrategy implements AggregationStrategy {

public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
if (oldExchange == null) {
// this is the first time so no existing aggregated exchange
return newExchange;
}

// append the new word to the existing
String body = newExchange.getIn().getBody(String.class);
String existing = oldExchange.getIn().getBody(String.class);

oldExchange.getIn().setBody(existing + body);
return oldExchange;
}
}
}

编辑

将消息多播到 5 个不同的端点并不意味着所有消息都将在稍后聚合。所以我不能在多播定义中使用聚合策略。其中一些可能用于另一种工作。流定义可能是这样的:将多播消息发送到“a”、“b”、“c”、“d”、“e”端点后,“a”和“b”可以聚合在一起(“direct:merge1”)、“c”和“d” ' 聚合在一起('direct:merge2') 和 'e' 可以用于另一件事。最终聚合器将聚合“direct:merge1”和“direct:merge2”到“直接:merge3”。所有这些端点都是动态创建的('direct:a','direct:b','direct:c','direct:d','direct:e','direct:merge1','direct:merge2','直接:合并3')。这个场景将像这样创建:

from("seda:start").routeId("idx")
.multicast()
.to("direct:a", "direct:b", "direct:c", "direct:d", "direct:e")
.parallelProcessing()
.end();

from("direct:a").transform(constant("A")).delay(1000).to("direct:merge1");
from("direct:b").transform(constant("B")).delay(2000).to("direct:merge1");

from("direct:c").transform(constant("C")).delay(3000).to("direct:merge2");
from("direct:d").transform(constant("D")).delay(1000).to("direct:merge2");

from("direct:e").transform(constant("E")).delay(1000).to("mock:anywhere");

from("direct:merge1").aggregate(new MyAggregationStrategy()).constant(true).completionSize(2).to("direct:merge3");
from("direct:merge2").aggregate(new MyAggregationStrategy()).constant(true).completionSize(2).to("direct:merge3");

from("direct:merge3").aggregate(new MyAggregationStrategy()).constant(true).completionSize(2).to("mock:end");

当我向 seda:start 发送消息时,我期望 ABDC,但我得到“E”。有没有办法获得最终聚合消息('ABDC')?测试方法如下:

@Test
public void testAsyncInOut() throws Exception {
MockEndpoint mock = getMockEndpoint("mock:end");
mock.expectedBodiesReceived("ABDC"); //works as expected

String reply = template.requestBody("seda:start", "", String.class);

assertEquals("ABDC", reply); //it returns 'E' because of default multicast behavior, but I expect 'ABDC'

assertMockEndpointsSatisfied();
}

最佳答案

来自多播 documentation :

By default Camel will use the last reply as the outgoing message.

如果您希望将多播结果聚合到单个消息中,请在多播定义中进行说明。

@Override
public void configure() throws Exception {
from("seda:start").routeId("idx")
.multicast(new MyAggregationStrategy()) //Put the Aggregation Strategy here!
.to("direct:a", "direct:b", "direct:c")
.parallelProcessing()
.end();

from("direct:a").transform(constant("A")).delay(1000).to("direct:merge");
from("direct:b").transform(constant("B")).delay(2000).to("direct:merge");
from("direct:c").transform(constant("C")).delay(3000).to("direct:merge");

from("direct:merge")
.to("mock:end");
}

请注意,您的模拟端点现在将被调用 3 次,因为聚合要稍后才会发生。您需要相应地修改您的测试。

关于java - 从组合 route 获取准确响应,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/30291930/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com