gpt4 book ai didi

java - ActiveMQ OnMessage() 方法阻塞其他线程

转载 作者:行者123 更新时间:2023-11-30 08:32:28 29 4
gpt4 key购买 nike

我正在使用 ActiveMQ 编写应用程序,其中我使用异步 onMessage() 方法从 ActiveMQ 获取消息。 假设我从 activemq 收到 1000 条消息,因此所有消息都将存储在 OnMessage() 方法中的 ConcurrentLinkedQueue 中,并且我使用线程从 ConcurrentLinkedQueue 中检索。 但是我面临的问题是我无法在 ConcurrentLinkedQueue 中添加或检索一条消息,而来自 onMessage() 的 textMessage 被发送到一个采用 textMessage 的 setter 方法,但我无法从 getter 方法中获取任何信息。为什么会这样?如何避免这种情况?

代码片段如下

public static void main(String[] args) throws InterruptedException, JMSException {

//Create a producer
Thread producer = new Thread(new Producer(queue,settext));
producer.start();
//Create a Consumer with coresize 4 and Max size 10
final ThreadPoolExecutor executor = new ThreadPoolExecutor(4, 10, 100, TimeUnit.SECONDS,new LinkedBlockingQueue<Runnable>(), new ThreadPoolExecutor.CallerRunsPolicy());
executor.allowCoreThreadTimeOut(true);

for (int i = 0; i <count; i++) {
executor.execute(new Consumer(queue));
}

**//INITIALIZE ACTIVEMQ CONFIGURATION HERE**

consumer.setMessageListener(new QueueMessageListener());
executor.shutdown();
}

private static class QueueMessageListener implements MessageListener {

@Override
public void onMessage(Message message) {
//Setting the text message to a setter which takes TextMessage as arg
settext.setTextmessage((TextMessage) message);
}
}
}

//Problem here unable to produce
class Producer implements Runnable {

ConcurrentLinkedQueue<TextMessage> queue;
Settext settext;
Producer(ConcurrentLinkedQueue<TextMessage> queue2, Settext settext){
this.queue = queue2;
this.settext=settext;
}

public void run() {
System.out.println("Producer Started");
try {
if(this.settext.getTextmessage()!=null)
{
//Add to ConcurrentLinkedQueue
queue.add(this.settext.getTextmessage());
}
Thread.currentThread().sleep(200);
//}
} catch (Exception ex) {
ex.printStackTrace();
}
}
}

//Problem here unable to consume
class Consumer implements Runnable {
ConcurrentLinkedQueue<TextMessage> queue;

public Consumer(ConcurrentLinkedQueue<TextMessage> queue2) {
this.queue = queue2;
}
public void run() {
TextMessage str;
System.out.println("Consumer Started");
while ((str = queue.poll()) != null) {
System.out.println("Removed: " + str);

}
try {
Thread.currentThread().sleep(500);
} catch (Exception ex) {
ex.printStackTrace();
}
//}
}

最佳答案

我不知道你为什么这样做,但你的设计有问题,见下面的注释 1-5,注意 QueueMessageListener 是异步执行的,它可以改变 settext.setTextmessage( (TextMessage) message); 在另一个消费者检索 TextMessage 并将其添加到队列之前,为此可能 V2 更好但可能使用 org.springframework.jms.listener.DefaultMessageListenerContainer是最好的解决方案:

public static void main(String[] args) throws InterruptedException, JMSException {

//Create a producer
// 1- settext.getTextmessage() == null i suppose at this level, see 2- point
Thread producer = new Thread(new Producer(queue,settext));
producer.start();
//Create a Consumer with coresize 4 and Max size 10
final ThreadPoolExecutor executor = new ThreadPoolExecutor(4, 10, 100, TimeUnit.SECONDS,new LinkedBlockingQueue<Runnable>(), new ThreadPoolExecutor.CallerRunsPolicy());
executor.allowCoreThreadTimeOut(true);

// 3- you start consumers go to 4, note that you will only consume count messages !!
for (int i = 0; i <count; i++) {
executor.execute(new Consumer(queue));
}

**//INITIALIZE ACTIVEMQ CONFIGURATION HERE**

consumer.setMessageListener(new QueueMessageListener());
executor.shutdown();
}

private static class QueueMessageListener implements MessageListener {

@Override
public void onMessage(Message message) {
//Setting the text message to a setter which takes TextMessage as arg
settext.setTextmessage((TextMessage) message);
// at this point message is considered as delivered if sessionAcknowledgeModeName is AUTO_ACKNOWLEDGE and maybe lost if asynchronous treatment fails
}
}
}

//Problem here unable to produce
class Producer implements Runnable {

ConcurrentLinkedQueue<TextMessage> queue;
Settext settext;
Producer(ConcurrentLinkedQueue<TextMessage> queue2, Settext settext){
this.queue = queue2;
this.settext=settext;
}

public void run() {
System.out.println("Producer Started");
try {
// 2- settext.getTextmessage() == null if block is not executed and thread will sleep and finish
// you have to add this
while (this.settext.getTextmessage() == null) {
try {
Thread.currentThread().sleep(500);
} catch (Exception ex) {
ex.printStackTrace();
}
}
if(this.settext.getTextmessage()!=null)
{
//Add to ConcurrentLinkedQueue
queue.add(this.settext.getTextmessage());
}
//}
} catch (Exception ex) {
ex.printStackTrace();
}
}
}

//Problem here unable to consume
class Consumer implements Runnable {
ConcurrentLinkedQueue<TextMessage> queue;

public Consumer(ConcurrentLinkedQueue<TextMessage> queue2) {
this.queue = queue2;
}
public void run() {
TextMessage str;
System.out.println("Consumer Started");
// 4- queue.poll() == null at this level, while loop finished, thread will sleep and finish
// you have to add this
while ((str = queue.poll()) == null) {
try {
Thread.currentThread().sleep(500);
} catch (Exception ex) {
ex.printStackTrace();
}
}
System.out.println("Removed: " + str);
//}
}

V2:

    public static void main(String[] args) throws InterruptedException, JMSException {

//Create a Consumer with coresize 4 and Max size 10
final ThreadPoolExecutor executor = new ThreadPoolExecutor(4, 10, 100, TimeUnit.SECONDS,new LinkedBlockingQueue<Runnable>(), new ThreadPoolExecutor.CallerRunsPolicy());
executor.allowCoreThreadTimeOut(true);

for (int i = 0; i <count; i++) {
executor.execute(new Consumer(queue));
}

**//INITIALIZE ACTIVEMQ CONFIGURATION HERE**

consumer.setMessageListener(new QueueMessageListener());
executor.shutdown();
}

private static class QueueMessageListener implements MessageListener {

@Override
public void onMessage(Message message) {
queue.add((TextMessage) message);
}
}
}

//Problem here unable to consume
class Consumer implements Runnable {
ConcurrentLinkedQueue<TextMessage> queue;

public Consumer(ConcurrentLinkedQueue<TextMessage> queue2) {
this.queue = queue2;
}
public void run() {
TextMessage str;
System.out.println("Consumer Started");
while ((str = queue.poll()) == null) {
try {
Thread.currentThread().sleep(500);
} catch (Exception ex) {
ex.printStackTrace();
}
}
System.out.println("Removed: " + str);
//}
}

V3:

public static void main(String[] args) throws InterruptedException, JMSException {

//Create a Consumer with coresize 4 and Max size 10
final ThreadPoolExecutor executor = new ThreadPoolExecutor(4, 10, 100, TimeUnit.SECONDS,new LinkedBlockingQueue<Runnable>(), new ThreadPoolExecutor.CallerRunsPolicy());
executor.allowCoreThreadTimeOut(true);

**//INITIALIZE ACTIVEMQ CONFIGURATION HERE**

consumer.setMessageListener(new QueueMessageListener());
executor.shutdown();
}

private static class QueueMessageListener implements MessageListener {

@Override
public void onMessage(Message message) {
executor.execute(new Consumer((TextMessage) message));
}
}
}

//Problem here unable to consume
class Consumer implements Runnable {
TextMessage textMessage;

public Consumer(TextMessage textMessage) {
this.textMessage = textMessage;
}
public void run() {
System.out.println("Removed: " + str);
}
}

V4:

    public static void main(String[] args) throws InterruptedException, JMSException {

new Consumer(queue).start();

**//INITIALIZE ACTIVEMQ CONFIGURATION HERE**

consumer.setMessageListener(new QueueMessageListener());
executor.shutdown();
}

private static class QueueMessageListener implements MessageListener {

@Override
public void onMessage(Message message) {
queue.add((TextMessage) message);
}
}

//Problem here unable to consume
class Consumer implements Runnable {
ConcurrentLinkedQueue<TextMessage> queue;

public Consumer(ConcurrentLinkedQueue<TextMessage> queue2) {
this.queue = queue2;
}
public void run() {
TextMessage str;
System.out.println("Consumer Started");
while (true) {
try {
Thread.currentThread().sleep(500);
} catch (Exception ex) {
}
while ((str = queue.poll()) == null) {
try {
Thread.currentThread().sleep(500);
} catch (Exception ex) {
}
}
System.out.println("Removed: " + str);
}
}
}

关于java - ActiveMQ OnMessage() 方法阻塞其他线程,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/40123173/

29 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com