gpt4 book ai didi

java - ActiveMQ 一些消费者如果在生产者之后到达,则不会拾取任务

转载 作者:行者123 更新时间:2023-12-01 14:22:18 25 4
gpt4 key购买 nike

我刚刚开始使用 ActiveMQ,我似乎遇到了一个奇怪的问题。 (来源如下)

有两种情况

  1. 消费者连接到代理,等待队列上的任务。生产者稍后到达,删除任务列表,并且它们正确地由不同的消费者接管并执行。这工作得很好,我也模拟了它。

  2. 生产者首先连接,删除任务列表。此时没有消费者连接。现在,当假设 3 个消费者 - C1、C2 和 C3 连接到代理(按顺序)时,我看到只有 C1 接手并执行生产者丢弃的任务。 C2和C3保持空闲。为什么会出现这种情况?

我还注意到关于第二种情况的另一件事 - 如果生产者继续在队列中删除任务,C2 和 C3 会拾取任务,但如果生产者之前删除了任务(如上所述),则 C2并且 C3 不执行任何操作。

生产者代码

package com.activemq.apps;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;

import com.commons.helpers.Maths;

public class Publisher implements MessageListener {

private static String _URL;
private static String _TOPIC_PUBLISH;
private static String _TOPIC_CONSUME;

public Publisher (String URL, String TOPIC) {

_URL = URL;
_TOPIC_PUBLISH = TOPIC + "_REQUESTS";
_TOPIC_CONSUME = TOPIC + "_RESPONSES";

}

public void initialize() {

try
{

ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(_URL);
Connection connection = connectionFactory.createConnection();
connection.start();

Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

Destination destinationProducer = session.createQueue(_TOPIC_PUBLISH);
Destination destinationConsumers = session.createQueue(_TOPIC_CONSUME);

MessageProducer producer = session.createProducer(destinationProducer);
MessageConsumer consumer = session.createConsumer(destinationConsumers);

consumer.setMessageListener(this);

int count = 0;

System.out.println("Sending requests");

while (true)
{
int randomSleepTime = Maths.rand(1000, 5000);

String messageToSend = count + "_" + randomSleepTime;

TextMessage message = session.createTextMessage(messageToSend);

producer.send(message);

System.out.println("Job #" + count + " | " + (randomSleepTime/1000) + "s");

if (count++%10 == 0)
Thread.sleep(10000);

}
}

catch (JMSException ex)
{
ex.printStackTrace();
}

catch (InterruptedException e) {
e.printStackTrace();
}

}

@Override
public void onMessage(Message message) {

if (message instanceof TextMessage)
{
TextMessage msg = (TextMessage) message;

try {

String response = msg.getText();
String[] responseSplit = response.split("_");

String clientId = responseSplit[1];
String count = responseSplit[0];

System.out.println("Got response from " + clientId + " Job #" + count);
}

catch (JMSException e) {
e.printStackTrace();
}
}

}

}

消费者代码

package com.activemq.apps;

import java.util.UUID;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;

public class Consumer implements MessageListener {

private static String _URL;
private static String _TOPIC_PUBLISH;
private static String _TOPIC_CONSUME;
private static String _CLIENTID;

private MessageProducer producer;
private Session session;

public Consumer (String URL, String TOPIC) {

_URL = URL;
_TOPIC_PUBLISH = TOPIC + "_RESPONSES";
_TOPIC_CONSUME = TOPIC + "_REQUESTS";

}

public void initialize() {

try
{

_CLIENTID = UUID.randomUUID().toString();

ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(_URL);
Connection connection = connectionFactory.createConnection();
connection.start();

session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

Destination destinationProducer = session.createQueue(_TOPIC_PUBLISH);
Destination destinationConsumers = session.createQueue(_TOPIC_CONSUME);

producer = session.createProducer(destinationProducer);
MessageConsumer consumer = session.createConsumer(destinationConsumers);

consumer.setMessageListener(this);

System.out.println("Client: " + _CLIENTID + "\nWaiting to pick up tasks");
}

catch (JMSException ex)
{
ex.printStackTrace();
}

}

@Override
public void onMessage(Message message) {

if (message instanceof TextMessage)
{
TextMessage msg = (TextMessage) message;

try
{

String[] messageSplits = msg.getText().split("_");

String count = messageSplits[0];
String timeString = messageSplits[1];

int sleepFor = Integer.parseInt(timeString);

System.out.println("Job #" + count + " | Sleeping for " + (sleepFor/1000) + "s");

Thread.sleep(sleepFor);

TextMessage sendToProducer = session.createTextMessage(count + "_" + _CLIENTID);

producer.send(sendToProducer);
}

catch (JMSException e) {
e.printStackTrace();
}

catch (InterruptedException e) {
e.printStackTrace();
}
}

}

}

最佳答案

你提到过

Now when lets say 3 consumers - C1, C2 and C3 connect to the broker (in that order)

由于 C1 首先连接,它在连接后立即开始获取队列上的所有消息。这是预期的。所以我在这里没有看到任何问题。 C2、C3 并没有闲着,但 C1 在 C2、C3 之前就已经掌握了消息。

我不确定生产者发送了多少条消息。我认为消息数量会减少。要了解您的期望,请尝试从生产者发送许多消息,例如数千或数百万,然后启动消费者。消息的数量是主观的,取决于内存、网络和其他资源。您可能会看到您所期待的内容。

关于java - ActiveMQ 一些消费者如果在生产者之后到达,则不会拾取任务,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/17440070/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com