gpt4 book ai didi

java - ActiveMQ是否支持消费者的最大消息处理时间

转载 作者:行者123 更新时间:2023-12-02 05:21:08 26 4
gpt4 key购买 nike

我有一个在 Java 中运行的 ActiveMQ 消费者脚本,我在 while(true) 循环中调用 consumer.receive()

我需要为每条处理的消息实现超时(例如:如果消息处理时间超过 15 秒,我必须接收下一条消息)。

我已经给出了 ACK 的客户端确认模式。

请查看我实现消费的consumeMessage方法。

期望的结果:

15 秒后,需要丢弃第一条消息(即,它不应调用 acknowledge())。需要处理下一条消息。

//package consumer;
import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.ExceptionListener;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;

public class ActivmqConsumer implements ExceptionListener {

ActiveMQConnectionFactory connectionFactory = null;
Connection connection = null;
Session session = null;

public ActivmqConsumer() throws Exception{
String USERNAME = "admin";
String PASSWORD = "admin";
this.connectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, "tcp://192.168.56.101:61616?jms.prefetchPolicy.all=1");
// Create a Connection
this.connection = connectionFactory.createConnection();
connection.start();
connection.setExceptionListener(this);
// Create a Session
this.session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
}

public void consumeMessage(String destinationName, EventProcesser eventprocess){
Destination destination = null;
MessageConsumer consumer = null;
try{
// Create the destination (Topic or Queue)
destination = session.createQueue(destinationName);

// Create a MessageConsumer from the Session to the Topic or Queue
consumer = session.createConsumer(destination);

// Wait for a message
while(true){
Message message = consumer.receive(2);
if(message==null){
continue;
}
else if(message instanceof TextMessage) {
TextMessage textMessage = (TextMessage) message;
String text = textMessage.getText();
System.out.println("Received: " + text);
eventprocess.processEvent(text);
message.acknowledge();
} else{
System.out.println("Received: " + message);
}
}
}catch(Exception ex){
ex.printStackTrace();
}finally{
try{
consumer.close();
}catch(Exception ex){
ex.printStackTrace();
}
}
}
}

最佳答案

ActiveMQ 中没有“最大消息处理时间”或等效功能。您需要自己监控处理过程。也许看看 this question/answer有关如何做到这一点的想法。另一种方法是使用 JTA 事务管理器并在事务中使用消息,超时时间为 15 秒。在 Java EE 容器中使用 MDB 是获得事务超时功能的简单方法。

关于java - ActiveMQ是否支持消费者的最大消息处理时间,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56260524/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com