gpt4 book ai didi

java - 应用程序停止时 Spring JMS 手动创建 MessageListenerContainer 泄漏

转载 作者:太空宇宙 更新时间:2023-11-04 13:37:27 25 4
gpt4 key购买 nike

我有一个 Spring 应用程序,它必须使用来自某些 JMS 队列的消息。队列的数量必须是可配置的,因此我们必须通过读取配置文件来手动创建消费者。因此我可以拥有类型 1 的 x 队列和类型 2 的 y 队列,并且所有连接详细信息都在此配置文件中指定。

我想说这是一个相当复杂的代码,我需要指出以下事实:我手动创建了 spring DefaultMessageListenerContainer 并在其上调用启动和停止,事务管理器分布在 JMS 和 JDBC 资源之间。此外,应用程序在 WebLogic 上运行,JMS 队列也在 WebLogic 中。流程是应用程序从队列中读取消息,尝试将消息放入数据库,但如果数据库关闭,事务(JMS 和 JDBC 之间共享)将回滚,因此消息会放回到队列中 - 这是数据库关闭时的故障转移机制。

我遇到的问题是,当我在执行故障转移机制时停止应用程序时,有一些 JMS 使用者线程未停止。这样我就会泄漏线程并使系统过载。

所以我的问题是如何确保当应用程序停止时,它会停止所有消费者线程?在消息监听器容器上调用 stop 似乎不起作用。

下面是一些代码片段:

配置:

[
{
"factoryInitial": "weblogic.jndi.WLInitialContextFactory",
"providerUrl": "t3://localhost:7001",
"securityPrincipal": "user",
"securityCredentials": "password",
"connectionFactory": "jms/QCF",
"channels": {
"type1": "jms/queue1"
}
}
]

java:

public class JmsConfig {

private Map<String, List<DefaultMessageListenerContainer>> channels = new HashMap<>();
private Map<String, MessageListener> messageConsumers;
private PlatformTransactionManager transactionManager;

public JmsConfig(Map<String, MessageListener> messageConsumers, PlatformTransactionManager transactionManager) throws Exception {
this.messageConsumers = messageConsumers;
this.transactionManager = transactionManager;
List<JmsServerConfiguration> serverConfigurationList = readJsonFile();
for (JmsServerConfiguration jmsServerConfiguration : serverConfigurationList) {
Properties environment = createEnvironment(jmsServerConfiguration);
JndiTemplate jndiTemplate = new JndiTemplate();
jndiTemplate.setEnvironment(environment);
ConnectionFactory connectionFactory = createConnectionFactory(jndiTemplate, jmsServerConfiguration);
populateMessageListenerContainers(jmsServerConfiguration, jndiTemplate, connectionFactory);
}
}

@PreDestroy
public void stopListenerContainers() {
for (Map.Entry<String, List<DefaultMessageListenerContainer>> channel : channels.entrySet()) {
for (DefaultMessageListenerContainer listenerContainer : channel.getValue()) {
listenerContainer.stop();
}
}
}

private void populateMessageListenerContainers(
JmsServerConfiguration jmsServerConfiguration,
JndiTemplate jndiTemplate, ConnectionFactory connectionFactory) throws Exception {
Set<Map.Entry<String, String>> channelsEntry = jmsServerConfiguration.getChannels().entrySet();
for (Map.Entry<String, String> channel : channelsEntry) {
Destination destination = createDestination(jndiTemplate, channel.getValue());
DefaultMessageListenerContainer listenerContainer =
createListenerContainer(connectionFactory, destination, messageConsumers.get(channel.getKey()));
if (!channels.containsKey(channel.getKey())) {
channels.put(channel.getKey(),
new ArrayList<DefaultMessageListenerContainer>());
}
channels.get(channel.getKey()).add(listenerContainer);
}
}

private Properties createEnvironment(JmsServerConfiguration jmsServerConfiguration) {
Properties properties = new Properties();
properties.setProperty("java.naming.factory.initial", jmsServerConfiguration.getFactoryInitial());
properties.setProperty("java.naming.provider.url", jmsServerConfiguration.getProviderUrl());
properties.setProperty("java.naming.security.principal", jmsServerConfiguration.getSecurityPrincipal());
properties.setProperty("java.naming.security.credentials", jmsServerConfiguration.getSecurityCredentials());
return properties;
}

private ConnectionFactory createConnectionFactory(JndiTemplate jndiTemplate,
JmsServerConfiguration jmsServerConfiguration) throws Exception {
JndiObjectFactoryBean connectionFactory = new JndiObjectFactoryBean();
connectionFactory.setJndiTemplate(jndiTemplate);
connectionFactory.setJndiName(jmsServerConfiguration.getConnectionFactory());
connectionFactory.setExpectedType(ConnectionFactory.class);
connectionFactory.afterPropertiesSet();
return (ConnectionFactory) connectionFactory.getObject();
}

private Destination createDestination(JndiTemplate jndiTemplate, String jndiName) throws Exception {
JndiObjectFactoryBean destinationFactory = new JndiObjectFactoryBean();
destinationFactory.setJndiTemplate(jndiTemplate);
destinationFactory.setJndiName(jndiName);
destinationFactory.setExpectedType(Destination.class);
destinationFactory.afterPropertiesSet();
return (Destination) destinationFactory.getObject();
}

private DefaultMessageListenerContainer createListenerContainer(
ConnectionFactory connectionFactory, Destination destination,
MessageListener messageListener) {
DefaultMessageListenerContainer listenerContainer = new DefaultMessageListenerContainer();
listenerContainer.setConcurrentConsumers(3);
listenerContainer.setConnectionFactory(connectionFactory);
listenerContainer.setDestination(destination);
listenerContainer.setMessageListener(messageListener);
listenerContainer.setTransactionManager(transactionManager);
listenerContainer.setSessionTransacted(true);
listenerContainer.afterPropertiesSet();
listenerContainer.start();
return listenerContainer;
}
}

最佳答案

所以通过调用listenerContainer.shutdown()解决了这个问题;而不是停止()。

关于java - 应用程序停止时 Spring JMS 手动创建 MessageListenerContainer 泄漏,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/31568630/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com