- Java 双重比较
- java - 比较器与 Apache BeanComparator
- Objective-C 完成 block 导致额外的方法调用?
- database - RESTful URI 是否应该公开数据库主键?
我正在开发一个 spring boot 应用程序,我打算在其中使用来自多个队列的 JMS 消息 - 但我想控制处理这些消息的执行线程的总数,而不是控制每个线程执行的线程数JMS 队列。
简而言之:很多 JMS 队列。一个线程池来进行处理。
有些队列会比其他队列更忙(有些可能总是有工作,有些会长时间空闲)——所以我想使用可用的处理能力来完成任何需要完成的工作,而不管源队列。
我设置了一系列 DefaultMessageListenerContainer
,每个都使用具有固定池大小的共享 TaskExecutor
。不过,我观察到的行为是,一个队列将消耗所有可用的槽 - 然后(即使第一个队列变空)槽对其他队列的 DefaultMessageListenerContainer
不可用使用。
这在 javadocs for DefaultMessageListenerContainer.setTaskExecutor()
中有详细说明:
A plain thread pool does not add much value, as this listener container will occupy a number of threads for its entire lifetime.
MessageListenerContainer
吗?最佳答案
我试图重现您的问题,但我做不到。可能是因为缺少您在此问题中显示的源代码。但这是我尝试过的。
import java.io.File;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.task.TaskExecutor;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;
import org.springframework.jms.listener.DefaultMessageListenerContainer;
import org.springframework.jms.listener.adapter.MessageListenerAdapter;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.util.FileSystemUtils;
@Configuration
@EnableAutoConfiguration
public class Application {
public class Receiver {
private String name;
public Receiver(String name) {
this.name = name;
}
/**
* When you receive a message, print it out, then shut down the application.
* Finally, clean up any ActiveMQ server stuff.
*/
public void receiveMessage(String message) {
System.out.println("Received <" + message + "> @ "+name);
//context.close();
FileSystemUtils.deleteRecursively(new File("activemq-data"));
}
}
static String mailboxDestination = "mailbox-destination";
static String mailboxDestination2= "mailbox-destination2";
@Bean
MessageListenerAdapter adapter1() {
MessageListenerAdapter messageListener
= new MessageListenerAdapter(new Receiver("MailBox1"));
messageListener.setDefaultListenerMethod("receiveMessage");
return messageListener;
}
@Bean
MessageListenerAdapter adapter2() {
MessageListenerAdapter messageListener
= new MessageListenerAdapter(new Receiver("MailBox2"));
messageListener.setDefaultListenerMethod("receiveMessage");
return messageListener;
}
@Bean
DefaultMessageListenerContainer container(ConnectionFactory connectionFactory) throws Exception {
DefaultMessageListenerContainer container = new DefaultMessageListenerContainer();
container.setMessageListener(adapter1());
container.setConnectionFactory(connectionFactory);
container.setDestinationName(mailboxDestination);
container.setTaskExecutor(taskExecutor());
return container;
}
@Bean
DefaultMessageListenerContainer container2(ConnectionFactory connectionFactory) throws Exception {
DefaultMessageListenerContainer container = new DefaultMessageListenerContainer();
container.setMessageListener(adapter2());
container.setConnectionFactory(connectionFactory);
container.setDestinationName(mailboxDestination2);
container.setTaskExecutor(taskExecutor());
return container;
}
@Bean
TaskExecutor taskExecutor() throws Exception {
ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
taskExecutor.setCorePoolSize(10);
taskExecutor.setMaxPoolSize(100);
taskExecutor.setQueueCapacity(1000);
taskExecutor.setThreadGroupName("MyThreads");
return taskExecutor;
}
public static void main(String[] args) {
// Clean out any ActiveMQ data from a previous run
FileSystemUtils.deleteRecursively(new File("activemq-data"));
// Launch the application
ConfigurableApplicationContext context = SpringApplication.run(Application.class, args);
// Send a message
final JmsTemplate jmsTemplate = context.getBean(JmsTemplate.class);
System.out.println("Sending a new message.");
new Thread() {
public void run() {
for(int i=0;i<100;i++) {
final String message = (i+1)+" ping!";
MessageCreator messageCreator = new MessageCreator() {
@Override
public Message createMessage(Session session) throws JMSException {
return session.createTextMessage(message);
}
};
jmsTemplate.send(mailboxDestination, messageCreator);
}
}
}.start();
new Thread() {
public void run() {
for(int i=0;i<100;i++) {
final String message = (i+1)+" ping!";
MessageCreator messageCreator = new MessageCreator() {
@Override
public Message createMessage(Session session) throws JMSException {
return session.createTextMessage(message);
}
};
jmsTemplate.send(mailboxDestination2, messageCreator);
}
}
}.start();
}
}
我得到的输出是;
Received <1 ping!> @ MailBox2
Received <1 ping!> @ MailBox1
Received <2 ping!> @ MailBox2
Received <2 ping!> @ MailBox1
Received <3 ping!> @ MailBox2
Received <3 ping!> @ MailBox1
Received <4 ping!> @ MailBox2
Received <4 ping!> @ MailBox1
Received <5 ping!> @ MailBox2
Received <5 ping!> @ MailBox1
Received <6 ping!> @ MailBox2
Received <6 ping!> @ MailBox1
Received <7 ping!> @ MailBox2
Received <7 ping!> @ MailBox1
Received <8 ping!> @ MailBox2
Received <8 ping!> @ MailBox1
Received <9 ping!> @ MailBox2
Received <9 ping!> @ MailBox1
Received <10 ping!> @ MailBox2
Received <10 ping!> @ MailBox1
Received <11 ping!> @ MailBox2
Received <11 ping!> @ MailBox1
Received <12 ping!> @ MailBox1
Received <12 ping!> @ MailBox2
Received <13 ping!> @ MailBox1
Received <13 ping!> @ MailBox2
Received <14 ping!> @ MailBox1
Received <14 ping!> @ MailBox2
Received <15 ping!> @ MailBox2
Received <15 ping!> @ MailBox1
Received <16 ping!> @ MailBox2
Received <16 ping!> @ MailBox1
Received <17 ping!> @ MailBox2
Received <17 ping!> @ MailBox1
Received <18 ping!> @ MailBox2
Received <18 ping!> @ MailBox1
Received <19 ping!> @ MailBox1
Received <19 ping!> @ MailBox2
Received <20 ping!> @ MailBox1
Received <20 ping!> @ MailBox2
Received <21 ping!> @ MailBox1
Received <21 ping!> @ MailBox2
Received <22 ping!> @ MailBox1
Received <22 ping!> @ MailBox2
Received <23 ping!> @ MailBox1
Received <23 ping!> @ MailBox2
Received <24 ping!> @ MailBox1
Received <24 ping!> @ MailBox2
Received <25 ping!> @ MailBox1
Received <25 ping!> @ MailBox2
Received <26 ping!> @ MailBox1
Received <26 ping!> @ MailBox2
Received <27 ping!> @ MailBox1
Received <27 ping!> @ MailBox2
Received <28 ping!> @ MailBox2
Received <28 ping!> @ MailBox1
Received <29 ping!> @ MailBox2
Received <29 ping!> @ MailBox1
Received <30 ping!> @ MailBox2
Received <30 ping!> @ MailBox1
Received <31 ping!> @ MailBox2
Received <31 ping!> @ MailBox1
Received <32 ping!> @ MailBox2
Received <33 ping!> @ MailBox2
Received <32 ping!> @ MailBox1
Received <34 ping!> @ MailBox2
Received <33 ping!> @ MailBox1
Received <34 ping!> @ MailBox1
Received <35 ping!> @ MailBox2
Received <35 ping!> @ MailBox1
Received <36 ping!> @ MailBox2
Received <36 ping!> @ MailBox1
Received <37 ping!> @ MailBox2
Received <37 ping!> @ MailBox1
Received <38 ping!> @ MailBox2
Received <38 ping!> @ MailBox1
Received <39 ping!> @ MailBox2
Received <39 ping!> @ MailBox1
Received <40 ping!> @ MailBox2
Received <40 ping!> @ MailBox1
Received <41 ping!> @ MailBox2
Received <42 ping!> @ MailBox2
Received <43 ping!> @ MailBox2
Received <44 ping!> @ MailBox2
Received <45 ping!> @ MailBox2
Received <46 ping!> @ MailBox2
Received <47 ping!> @ MailBox2
Received <48 ping!> @ MailBox2
Received <49 ping!> @ MailBox2
Received <50 ping!> @ MailBox2
Received <51 ping!> @ MailBox2
Received <52 ping!> @ MailBox2
Received <53 ping!> @ MailBox2
Received <54 ping!> @ MailBox2
Received <55 ping!> @ MailBox2
Received <56 ping!> @ MailBox2
Received <57 ping!> @ MailBox2
Received <58 ping!> @ MailBox2
Received <59 ping!> @ MailBox2
Received <60 ping!> @ MailBox2
Received <61 ping!> @ MailBox2
Received <62 ping!> @ MailBox2
Received <63 ping!> @ MailBox2
Received <64 ping!> @ MailBox2
Received <65 ping!> @ MailBox2
Received <66 ping!> @ MailBox2
Received <67 ping!> @ MailBox2
Received <68 ping!> @ MailBox2
Received <69 ping!> @ MailBox2
Received <70 ping!> @ MailBox2
Received <71 ping!> @ MailBox2
Received <72 ping!> @ MailBox2
Received <73 ping!> @ MailBox2
Received <74 ping!> @ MailBox2
Received <75 ping!> @ MailBox2
Received <76 ping!> @ MailBox2
Received <77 ping!> @ MailBox2
Received <78 ping!> @ MailBox2
Received <79 ping!> @ MailBox2
Received <80 ping!> @ MailBox2
Received <81 ping!> @ MailBox2
Received <82 ping!> @ MailBox2
Received <83 ping!> @ MailBox2
Received <84 ping!> @ MailBox2
Received <85 ping!> @ MailBox2
Received <86 ping!> @ MailBox2
Received <87 ping!> @ MailBox2
Received <88 ping!> @ MailBox2
Received <89 ping!> @ MailBox2
Received <90 ping!> @ MailBox2
Received <91 ping!> @ MailBox2
Received <92 ping!> @ MailBox2
Received <93 ping!> @ MailBox2
Received <94 ping!> @ MailBox2
Received <95 ping!> @ MailBox2
Received <96 ping!> @ MailBox2
Received <97 ping!> @ MailBox2
Received <98 ping!> @ MailBox2
Received <99 ping!> @ MailBox2
Received <100 ping!> @ MailBox2
Received <41 ping!> @ MailBox1
Received <42 ping!> @ MailBox1
Received <43 ping!> @ MailBox1
Received <44 ping!> @ MailBox1
Received <45 ping!> @ MailBox1
Received <46 ping!> @ MailBox1
Received <47 ping!> @ MailBox1
Received <48 ping!> @ MailBox1
Received <49 ping!> @ MailBox1
Received <50 ping!> @ MailBox1
Received <51 ping!> @ MailBox1
Received <52 ping!> @ MailBox1
Received <53 ping!> @ MailBox1
Received <54 ping!> @ MailBox1
Received <55 ping!> @ MailBox1
Received <56 ping!> @ MailBox1
Received <57 ping!> @ MailBox1
Received <58 ping!> @ MailBox1
Received <59 ping!> @ MailBox1
Received <60 ping!> @ MailBox1
Received <61 ping!> @ MailBox1
Received <62 ping!> @ MailBox1
Received <63 ping!> @ MailBox1
Received <64 ping!> @ MailBox1
Received <65 ping!> @ MailBox1
Received <66 ping!> @ MailBox1
Received <67 ping!> @ MailBox1
Received <68 ping!> @ MailBox1
Received <69 ping!> @ MailBox1
Received <70 ping!> @ MailBox1
Received <71 ping!> @ MailBox1
Received <72 ping!> @ MailBox1
Received <73 ping!> @ MailBox1
Received <74 ping!> @ MailBox1
Received <75 ping!> @ MailBox1
Received <76 ping!> @ MailBox1
Received <77 ping!> @ MailBox1
Received <78 ping!> @ MailBox1
Received <79 ping!> @ MailBox1
Received <80 ping!> @ MailBox1
Received <81 ping!> @ MailBox1
Received <82 ping!> @ MailBox1
Received <83 ping!> @ MailBox1
Received <84 ping!> @ MailBox1
Received <85 ping!> @ MailBox1
Received <86 ping!> @ MailBox1
Received <87 ping!> @ MailBox1
Received <88 ping!> @ MailBox1
Received <89 ping!> @ MailBox1
Received <90 ping!> @ MailBox1
Received <91 ping!> @ MailBox1
Received <92 ping!> @ MailBox1
Received <93 ping!> @ MailBox1
Received <94 ping!> @ MailBox1
Received <95 ping!> @ MailBox1
Received <96 ping!> @ MailBox1
Received <97 ping!> @ MailBox1
Received <98 ping!> @ MailBox1
Received <99 ping!> @ MailBox1
Received <100 ping!> @ MailBox1
如果我错了请纠正我,但使用此配置我没有发现任何问题。两个队列和一个任务执行器。
关于java - 多个 DefaultMessageListenerContainer 提供单个 TaskExecutor/ThreadPool(公平地),我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/25817242/
我希望这是一个简单的配置问题,但我似乎无法弄清楚它可能是什么。 设置 Spring-Boor 2.2.2.RELEASE 云启动器 云启动器-aws spring-jms spring-cloud-d
我有以下配置: // Some attributes 以及以下监听器: public class myListener implements MessageListener
我有一个 DefaultMessageListenerContainer 配置如下: DefaultMessageListenerContainer container = new DefaultMe
我希望这是一个简单的配置问题,但我似乎无法弄清楚它可能是什么。 设置 Spring-Boor 2.2.2.RELEASE 云启动器 云启动器-aws spring-jms spring-cloud-d
我正在将 Spring JMS 与以下上下文 XML 文件一起使用。 我的应用程序是命令行独立的,看起来像这样: public static void main(String[]
是否有可能在 onMessage 方法内部知道 MessageListener 正在监听哪个队列? 我的 Spring-config(其中一部分):
我有一个 DefaultMessageListenerContainer,它(在我看来)没有按比例放大。 Container 被定义为监听一个队列,其中有 100 条消息。 我希望容器可以达到任何长度
我正在努力模拟(使用 Mockito)DefaultMessageListenerContainer (org.springframework.jms.listener.DefaultMessageL
使用DefaultJmsListenerContainerFactory有什么好处在 DefaultMessageListenerContainer ? 如果我直接配置 DMLC,我会通过调用 isR
我已将 Spring DefaultMessageListenerContainer 配置为 ActiveMQ 消费者,使用队列中的消息。我们称之为“Test.Queue”我将此代码部署在 4 台不同
我是 Spring Framework 的新手,我的问题如下: 我想实例化 DefaultMessageListenerContainer以编程方式,我使用的代码是: DefaultMessageLi
对于我当前的项目,我需要使用来自许多目的地(从数百到 20 或 30k)的消息,所有目的地都是主题。目前(对于初始负载测试)所有消息都是在同一台服务器上本地创建的,在线程池中。 我当前的 spring
我有一个要求,我在一个队列中有消息,消息选择器的数量是可配置的。我需要并行处理这些消息的地方。 经过深思熟虑,我发现使用多个 DefaultMessageListenerContainer 会产生很好
如果我使用 DefaultMessageListenerContainer 的 Spring要接收 JMS 消息,即使我设置了 sessionAcknowledgeMode,我也不会重新发送 JMS
我开发了使用 Spring JMS 的项目来接收来自队列的消息。并部署Websphere应用服务器(WAS 7.5)集群环境。一旦部署到服务器中,它就工作正常。后来我更新了记录器信息并部署到服务器中。
我正在使用 spring 集成来监听 ibm mq,我想在 hibernate 模式下部署我的应用程序并在需要的时间启动它。因此,我使用了 DefaultMessageListenerContaine
我在 DefaultMessageListenerContainer 内使用 SimpleAsyncTaskExecutor,并且我想使用 JMX mbean 监视 Activity 线程计数。我创建
我有一个案例,我想在同一个“主”线程中运行 DefaultMessageListenerContainer。现在它使用 SimpleAsyncTaskExecutor 每次收到消息时都会生成新线程。
我已经为 jms 监听器配置了 spring 以使用来自 hornetQ 的消息,如下所示。
我正在使用 Spring JMS 连接到 Websphere MQ 服务器。我实现了 SessionAwareListener 接口(interface)来创建自定义监听器,为业务逻辑重用旧代码。 在
我是一名优秀的程序员,十分优秀!