gpt4 book ai didi

java - 如何杀死activemq中的消费者

转载 作者:行者123 更新时间:2023-11-30 09:30:27 24 4
gpt4 key购买 nike

我正试图摆脱某个队列中的所有“消费者数量”。每当我清除/删除队列时,如果我再次创建具有相同名称的队列,消费者的数量仍然存在。即使有 0 个待处理消息,仍然有 6 个消费者。

我的问题可能源于我的 java 代码,同时没有关闭 session 或连接。

我已尝试重新启动和重新安装服务器。

这是我的生产者代码:

 private static String url = ActiveMQConnection.DEFAULT_BROKER_URL;

public static String addElementToQueue(String queueName,String param1, String param2) throws JMSException, NamingException {
// Getting JMS connection from the server and starting it
ConnectionFactory connectionFactory =
new ActiveMQConnectionFactory(url);
Connection connection = connectionFactory.createConnection();

// JMS messages are sent and received using a Session. We will
// create here a non-transactional session object. If you want
// to use transactions you should set the first parameter to 'true'
Session session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);

// Destination represents here our queue on the
// JMS server. You don't have to do anything special on the
// server to create it, it will be created automatically.
Destination destination = session.createQueue(queueName);

// MessageProducer is used for sending messages (as opposed
// to MessageConsumer which is used for receiving them)
MessageProducer producer = session.createProducer(destination);

String queueMessage = param1+ "-" + param2;

TextMessage message = session.createTextMessage(queueMessage);

// Here we are sending the message!
producer.send(message);

connection.close();
session.close(); // added after problem came up
producer.close(); // added after problem came up

return commandID;
}

这是我的消费者代码:

 // URL of the JMS server
private static String url = ActiveMQConnection.DEFAULT_BROKER_URL;

public static Pair consumeNextElement(String queueName) throws JMSException {
// Getting JMS connection from the server
ConnectionFactory connectionFactory
= new ActiveMQConnectionFactory(url);
Connection connection = connectionFactory.createConnection();
connection.start();

// Creating session for seding messages
Session session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);

// Getting the queue
Destination destination = session.createQueue(queueName);

// MessageConsumer is used for receiving (consuming) messages
MessageConsumer consumer = session.createConsumer(destination);


// Here we receive the message.
// By default this call is blocking, which means it will wait
// for a message to arrive on the queue.
Message message = consumer.receive();

// There are many types of Message and TextMessage
// is just one of them. Producer sent us a TextMessage
// so we must cast to it to get access to its .getText()
// method.

String[] parts = ((TextMessage)message).getText().split("-");
Pair retVal = new Pair(parts[0], parts[1]);

connection.close();
session.close(); // added after problem came up
consumer.close(); // added after problem came up

return retVal;
}

有什么想法吗?

谢谢。

最佳答案

消费者的数量就是队列中监听器的数量。清除队列应该只删除排队的消息——那些收听的消费者不会受到影响。

消费者维持/重新建立连接的能力可能取决于用于连接的传输,以及settings for the transport可能允许对连接属性进行一些调整。

坦率地说,我对这些没有太多经验,但你可以调查一下 Advisory Messages作为一种帮助调试连接的方法。除了报告消费者数量之外,JMX 界面或 Web 控制台似乎没有任何帮助。

关于java - 如何杀死activemq中的消费者,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/13262942/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com