gpt4 book ai didi

apache-camel - Apache Camel 并发消费者与线程

转载 作者:行者123 更新时间:2023-12-04 18:43:11 25 4
gpt4 key购买 nike

我花了将近两天的时间来理解 Apache Camel 中的 concurrentConsumers 与线程的概念。但我真的不明白这个概念。谁能帮我理解这个概念。我正在使用 Camel 2.12.0。

   from("jms:queue:start?concurrentConsumers=5")
.threads(3, 3, "replyThread")
.bean(new SomeBean());

pom.xml
==========================================
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<camel.version>2.12.0</camel.version>
</properties>

<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.8.1</version>
<scope>test</scope>
</dependency>

<!-- required by both client and server -->
<dependency>
<groupId>org.apache.camel</groupId>
<artifactId>camel-core</artifactId>
<version>${camel.version}</version>
</dependency>
<dependency>
<groupId>org.apache.camel</groupId>
<artifactId>camel-spring</artifactId>
<version>${camel.version}</version>
</dependency>
<dependency>
<groupId>org.apache.camel</groupId>
<artifactId>camel-jms</artifactId>
<version>${camel.version}</version>
</dependency>

<dependency>
<groupId>org.apache.camel</groupId>
<artifactId>camel-test</artifactId>
<version>${camel.version}</version>
</dependency>

<dependency>
<groupId>org.apache.camel</groupId>
<artifactId>camel-activemq</artifactId>
<version>1.1.0</version>
</dependency>

<dependency>
<groupId>org.apache.camel</groupId>
<artifactId>camel-test-spring</artifactId>
<version>2.12.1</version>
</dependency>
</dependencies>


//posting to queue
for (int i = 0; i < 10; i++) {
System.out.println(Thread.currentThread() + " sending request..." + i);
template.asyncCallbackRequestBody("jms:queue:start", (body + " " + i), callback);
}

//my call back
public class MyCallback extends SynchronizationAdapter {

@Override
public void onComplete(Exchange exchange) {
String body = exchange.getOut().getBody(String.class);
System.out.println(Thread.currentThread() + " Callback Resposne..." +body);
}
}

public static class SomeBean {
public void someMethod(Exchange body) {
System.out.println(Thread.currentThread() + " Received: " + body.getIn().getBody());
body.getOut().setBody(body.getIn().getBody());
}
}

我的理解(来自 apache camel 文档)是 jms:start queue 上将有 5 个竞争消费者来消费消息。和 3 个“replyThreads”来处理来自 jms:start 队列的异步回复。但实际输出是不同的。
     Thread[main,5,main] sending request...0
Thread[main,5,main] sending request...1
Thread[Camel (camel-1) thread #9 - replyThread,5,main] Received: Hello Camel 0
Thread[Camel (camel-1) thread #10 - replyThread,5,main] Received: Hello Camel 1
Thread[Camel (camel-1) thread #5 - ProducerTemplate,5,main] Callback Resposne...Hello Camel 0
Thread[Camel (camel-1) thread #6 - ProducerTemplate,5,main] Callback Resposne...Hello Camel 1

最佳答案

JMS 组件具有内置的线程池,它可以根据消息队列的数量很好地上下扩展。

所以就用那个

 from("jms:queue:start?concurrentConsumers=5")
.bean(new SomeBean());

你也可以指定一个最大值所以有一个范围
 from("jms:queue:start?concurrentConsumers=5&maxConcurrentConsumers=10")
.bean(new SomeBean());

关于apache-camel - Apache Camel 并发消费者与线程,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/19985792/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com