gpt4 book ai didi

java - 通过多线程客户端使用 JMS

转载 作者:行者123 更新时间:2023-11-30 04:21:29 26 4
gpt4 key购买 nike

我正在尝试使用多个线程来消耗 jms 队列。我知道每个线程应该有一个单独的 JMS session ,并且我在代码中所做的事情如下所示。但我遇到了一个奇怪的异常

这是异常堆栈跟踪:

javax.jms.IllegalStateException: Forbidden call on a closed connection.
at org.objectweb.joram.client.jms.Connection.checkClosed(Connection.java:404)
at org.objectweb.joram.client.jms.Connection.createSession(Connection.java:530)
at MessageWorker.run(ReceiveJmsDemoMultiThreaded.java:96)
at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)

我需要你的帮助,因为这对我来说是一个阻碍问题

import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.QueueConnectionFactory;
import javax.jms.Session;
import javax.naming.Context;
import javax.naming.InitialContext;
import javax.naming.NamingException;

public class ReceiveJmsDemoMultiThreaded {

public static void main(String[] args) {
Context context = null;
ConnectionFactory factory = null;
Connection connection = null;
Destination destination = null;

try {
context = getInitialContext();
factory = (QueueConnectionFactory) context.lookup("JQCF");
destination = (Destination) context.lookup("sampleQueue");
connection = factory.createConnection();

final ExecutorService executor = Executors.newCachedThreadPool();
executor.submit(new MessageWorker(connection, destination) );

executor.submit(new MessageWorker(connection, destination) );

executor.submit(new MessageWorker(connection, destination) );

executor.submit(new MessageWorker(connection, destination) );

connection.start();

} catch (Exception e) {
e.printStackTrace();
} finally {
if (context != null) {
try {
context.close();
} catch (NamingException e) {
e.printStackTrace();
}
}

if (connection != null) {
try {
connection.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
}
}

private static InitialContext getInitialContext() throws NamingException {
Properties prop = new Properties();
prop.put("java.naming.provider.url", "rmi://localhost:1099");
prop.put("java.naming.factory.initial",
"org.objectweb.carol.jndi.spi.MultiOrbInitialContextFactory");
return new InitialContext(prop);
}

}

class MessageWorker extends Thread {
Connection connection = null;
Destination dest = null;
Session session = null;
Destination destination = null;

public MessageWorker(Connection connection, Destination dest) {
this.connection = connection;
this.destination = dest;
}
@Override
public void run() {
try {
MessageConsumer receiver = null;
System.out.println("Starting Thread "+currentThread().getName());
while (true) {
try {
System.out.println("Waiting for next msg "+currentThread().getName());
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
receiver = session.createConsumer(destination);
Message msg = receiver.receive();
if (msg instanceof Message && msg != null) {
System.out.println("STARTING consuming "+msg.toString()+" by thread "+currentThread().getName() );
Thread.sleep(2000);//some work here
System.out.println("ENDING consuming "+msg.toString()+" by thread "+currentThread().getName() );
}
} catch (JMSException e) {

e.printStackTrace();
System.exit(1);
}
}
} catch (Exception e) {
e.printStackTrace();
} finally {

}
}
}

非常感谢

最佳答案

您看到此问题是因为,在主线程中,将作业提交到执行器服务后,使用以下方法关闭连接:

        connection.close();

因此,当线程尝试使用此共享连接(刚刚关闭)创建 session 时,它们会收到此异常。这里没有什么意外的。只是为了测试,您可以让主线程 hibernate 很长时间,直到所有线程都完成接收消息。这样,您就可以确认您没有收到此异常。

真正的解决方案可能是关闭 Executor 服务并让主线程 awaitTermination() 等待提交的作业完成。

关于java - 通过多线程客户端使用 JMS,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/16937923/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com