gpt4 book ai didi

apache-camel - 使用 Apache Camel 请求回复和分散收集

转载 作者:行者123 更新时间:2023-12-04 23:00:51 29 4
gpt4 key购买 nike

我正在尝试构建一条将执行以下操作的路线:

  • 消费来自 jms:sender-in 的消息.我正在使用 INOUT请求回复模式。 JMSReplyTo = sender-out
  • 上述消息将被路由到多个收件人,如 jms:consumer1-in , jms:consumer2-injms:consumer3-in .所有都使用请求回复模式。 JMSReplyTo为每个消费者指定(在这种情况下,JMSReplyTo 的顺序是 jms:consumer1-out, jms:consumer2-out, jms:consumer3-out
  • 我需要将所有回复汇总在一起并将结果发送回 jms:sender-out .

  • 我构建了一条类似于这样的路线:
    from("jms:sender-in")
    .to("jms:consumer1-in?exchangePattern=InOut&replyTo=queue:consumer1-out&preserveMessageQos=true")
    .to("jms:consumer2-in?exchangePattern=InOut&replyTo=queue:consumer2-out&preserveMessageQos=true")
    .to("jms:consumer3-in?exchangePattern=InOut&replyTo=queue:consumer3-out&preserveMessageQos=true");

    然后我将回复发送回某个队列以进行收集和聚合:
    from("jms:consumer1-out?preserveMessageQos=true").to("jms:gather");
    from("jms:consumer1-out?preserveMessageQos=true").to("jms:gather");
    from("jms:consumer1-out?preserveMessageQos=true").to("jms:gather");
    from("jms:gather").aggregate(header("TransactionID"), new GatherResponses()).completionSize(3).to("jms:sender-out");

    为了模拟我的消费者的行为,我添加了以下路线:
    from("jms:consumer1-in").setBody(body());
    from("jms:consumer2-in").setBody(body());
    from("jms:consumer3-in").setBody(body());

    我有几个问题:
  • 我在回复中收到超时错误。如果我注释掉收集部分,则没有问题。为什么即使回复回到队列然后转发到另一个队列也会超时。
  • 如何存储原始 JMSReplyTo 值,以便 Camel 能够将聚合结果发送回发件人的回复队列。

  • 我有一种感觉,我正在为一些基本概念而苦苦挣扎。任何帮助表示赞赏。
    谢谢。

    最佳答案

    好问题!

    你需要考虑两件事

  • 不要混用exchange patterns , 请求回复 (InOut) vs 事件
    消息(InOnly)。 (除非你有充分的理由)。
  • 如果你做 scatter-gather ,您需要提出请求
    multicast ,否则它们将是 pipelined这不是
    真的scatter-gather .

  • 我已经制作了两个与您的案例类似的示例 - 一个带有请求回复,一个带有(一种方式)事件消息。

    随意用 jms 替换 activemq 组件 - 在这些示例中是相同的。

    示例一,使用事件消息 - InOnly:
    from("activemq:amq.in")
    .multicast()
    .to("activemq:amq.q1")
    .to("activemq:amq.q2")
    .to("activemq:amq.q3");

    from("activemq:amq.q1").setBody(constant("q1")).to("activemq:amq.gather");
    from("activemq:amq.q2").setBody(constant("q2")).to("activemq:amq.gather");
    from("activemq:amq.q3").setBody(constant("q3")).to("activemq:amq.gather");

    from("activemq:amq.gather")
    .aggregate(new ConcatAggregationStrategy())
    .header("breadcrumbId")
    .completionSize(3)
    .to("activemq:amq.out");

    from("activemq:amq.out")
    .log("${body}"); // logs "q1q2q3"

    示例二,使用请求回复 - 请注意,分散路由必须在响应进入时收集响应。结果与第一个示例相同,但路由更少,配置更少。
    from("activemq:amq.in2")
    .multicast(new ConcatAggregationStrategy())
    .inOut("activemq:amq.q4")
    .inOut("activemq:amq.q5")
    .inOut("activemq:amq.q6")
    .end()
    .log("Received replies: ${body}"); // logs "q4q5q6"

    from("activemq:amq.q4").setBody(constant("q4"));
    from("activemq:amq.q5").setBody(constant("q5"));
    from("activemq:amq.q6").setBody(constant("q6"));

    至于您的问题二 - 当然,可以传递 JMSReplyTo header 并沿路强制交换模式 - 但您将创建难以调试的代码。保持你的交换模式简单和干净 - 它可以防止错误。

    关于apache-camel - 使用 Apache Camel 请求回复和分散收集,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/25165783/

    29 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com