gpt4 book ai didi

java - 检测 ActiveMQ 主题的使用者的变化

转载 作者:行者123 更新时间:2023-11-30 02:35:03 25 4
gpt4 key购买 nike

我有一个 Tomcat 集群,它们使用来自 ActiveMQ 主题的消息。现在,如果集群中的其中一个 tomcat 出现故障,那么我猜测消费者数量将减少 1。

现在,我想使用该主题的一些回调或监听器来检测该更改。这可行吗?

类似:Region.getDestinations(ActiveMQDestination) 可以工作吗?

最佳答案

咨询消息就是您所需要的。

每次您收到包含此代码的消息时,这意味着您消费者已开始或停止。

文档http://activemq.apache.org/advisory-message.html

示例:

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Queue;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.ActiveMQSession;
import org.apache.activemq.command.ActiveMQMessage;
import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.command.RemoveInfo;

public class AdvisorySupportConsumerAdvisoryTopic {

public static void main(String[] args) throws JMSException {
Connection conn = null;
try {
ConnectionFactory cf = new ActiveMQConnectionFactory("auto://localhost:5671");
conn = cf.createConnection("admin", "admin");
ActiveMQSession session = (ActiveMQSession) conn.createSession(false,
ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE);
conn.start();
Queue q = session.createQueue("Q");
Destination advisoryDestination = org.apache.activemq.advisory.AdvisorySupport.getConsumerAdvisoryTopic(q);
MessageConsumer consumer = session.createConsumer(advisoryDestination);
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message msg) {
if (msg instanceof ActiveMQMessage) {
try {
ActiveMQMessage aMsg = (ActiveMQMessage) msg;
System.out.println(aMsg.getStringProperty("consumerCount"));
System.out.println(aMsg.getStringProperty("producerCount"));
if (aMsg.getDataStructure() instanceof ConsumerInfo) {
// Consumer start
ConsumerInfo consumerInfo = (ConsumerInfo) aMsg.getDataStructure();
System.out.println(consumerInfo);
} else if (aMsg.getDataStructure() instanceof RemoveInfo) {
// Consumer stop
RemoveInfo removeInfo = (RemoveInfo) aMsg.getDataStructure();
System.out.println(removeInfo);
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
});
} catch (Exception e) {
e.printStackTrace();
} finally {
if (conn != null) {
try {
conn.close();
} catch (Exception e) {
}
}
}
}
}

每次您收到包含此代码的消息时,这意味着您连接已启动或停止。

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.ActiveMQSession;
import org.apache.activemq.command.ActiveMQMessage;
import org.apache.activemq.command.ConnectionInfo;
import org.apache.activemq.command.RemoveInfo;

public class AdvisorySupportConnectionAdvisoryTopic {

public static void main(String[] args) throws JMSException {
Connection conn = null;
try {
ConnectionFactory cf = new ActiveMQConnectionFactory("auto://localhost:5671");
conn = cf.createConnection("admin", "admin");
ActiveMQSession session = (ActiveMQSession) conn.createSession(false,
ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE);
conn.start();
Destination advisoryDestination = org.apache.activemq.advisory.AdvisorySupport.getConnectionAdvisoryTopic();
MessageConsumer consumer = session.createConsumer(advisoryDestination);
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message msg) {
if (msg instanceof ActiveMQMessage) {
try {
ActiveMQMessage aMsg = (ActiveMQMessage) msg;
if (aMsg.getDataStructure() instanceof ConnectionInfo) {
// Connection start
ConnectionInfo connectionInfo = (ConnectionInfo) aMsg.getDataStructure();
System.out.println(connectionInfo);
} else if (aMsg.getDataStructure() instanceof RemoveInfo) {
// Connection stop
RemoveInfo removeInfo = (RemoveInfo) aMsg.getDataStructure();
System.out.println(removeInfo);
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
});
} catch (Exception e) {
e.printStackTrace();
} finally {
if (conn != null) {
try {
conn.close();
} catch (Exception e) {
}
}
}
}
}

关于java - 检测 ActiveMQ 主题的使用者的变化,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/43319224/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com