gpt4 book ai didi

java - 将 ActiveMQ 与多个消费者实例结合使用

转载 作者:行者123 更新时间:2023-12-02 10:43:00 24 4
gpt4 key购买 nike

在我们的架构中,我们为每个应用程序提供 2 个或更多容器高可用性目的。我们正在使用 activeMQ,我想实现以下行为。

  1. 将消息推送至队列。
  2. 此消息将仅由*一个*容器(第一个将会读取它)。
  3. 消息处理成功后,消费者将更新此消息消息可以被确认和忽略
  4. 我尝试使用带有提交的事务并使用 CLIENT_ACKNOWLEDGE 但是在这两种情况下,两个消费者都收到了消息并对其进行了处理。

我希望只有一个消费者根据可用性来处理每条消息。我们的实现是用 Java 实现的。

请分享一下实现方法。

这是我的代码示例

 final Connection consumerConnection = connectionFactory.createConnection();
consumerConnection.start();
// Create a session.
final Session consumerSession = consumerConnection.createSession(false, Session.CLIENT_ACKNOWLEDGE);

final Destination consumerDestination = consumerSession.createQueue(queueName);


// Create a message consumer from the session to the queue.
final MessageConsumer consumer = consumerSession.createConsumer(consumerDestination);
// Begin to wait for messages.
Queue queue = consumerSession.createQueue(queueName);
QueueBrowser queueBrowser = consumerSession.createBrowser(queue);
Enumeration msgs = queueBrowser.getEnumeration();
while (msgs.hasMoreElements()) {
//do your things here

ActiveMQTextMessage message = (ActiveMQTextMessage) msgs.nextElement();

if (message == null)
continue;

//handle message
System.out.println("Message received in : " + message);
try {
String text = message.getText();
JSONObject messageJson = new JSONObject(text);
consumer.receive(1000);

String responseString = handleMessage(messageJson);

message.acknowledge();

谢谢摩西

最佳答案

这里的问题是您正在确认来自 QueueBrowser 的消息。实例不会产生任何影响,因为浏览器仅用于浏览消息而不是消费它们。

...
QueueBrowser queueBrowser = consumerSession.createBrowser(queue);
Enumeration msgs = queueBrowser.getEnumeration();
while (msgs.hasMoreElements()) {
...
ActiveMQTextMessage message = (ActiveMQTextMessage) msgs.nextElement();
...
try {
...
message.acknowledge(); // this does nothing

实际上正在创建一个真正的消费和调用 consumer.receive(1000) ,但您要丢弃 Message实例 receive()返回。这是Message您必须确认该实例才能实际使用队列中的消息。

...
// Create a message consumer from the session to the queue.
final MessageConsumer consumer = consumerSession.createConsumer(consumerDestination);
...
while (msgs.hasMoreElements()) {
...
try {
...
Message actualMessage = consumer.receive(1000); // acknowledge this!
...

另一个重要说明...队列浏览器接收的消息不能保证是队列的静态快照。因此,假设您调用 consumer.receive(1000) 是危险的。实际上是来自队列浏览器的相同消息。在我看来,你应该重新设计这个逻辑,使其仅适用于 MessageConsumer并删除 QueueBrowser .

关于java - 将 ActiveMQ 与多个消费者实例结合使用,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/52796618/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com