gpt4 book ai didi

java - 如何手动从 JMS 队列中仅获取特定类型的消息并保留所有其他消息?

转载 作者:行者123 更新时间:2023-12-01 20:21:19 24 4
gpt4 key购买 nike

我有一个队列,其中包含消息,它们都是围绕不同消息类型包装的ObjectMessage,这些消息类型都扩展了MyCustomMessage,例如MyClientMessageMyInternalMessage。我想做以下事情:

  1. 客户端登录系统
  2. 手动查找 message.getObject() 将返回 MyClientMessage 的任何消息
  3. 如果消息属于该类型,但 clientId 不是登录用户的 clientId,则将其放回消息队列

我该怎么做?

Connection connection = null;
Session session = null;
try {
connection = factory.createConnection();
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

//Get a consumer
MessageConsumer consumer = session.createConsumer( myQueue );

//Start the connection
connection.start();

//Try to read a message
Message response = consumer.receive(QUALITY_OF_SERVICE_THRESHOLD_MS);

//Is anything like this possible?
ObjectMessage<T> objMessage = ( ObjectMessage<T> ) response;

//How check object is right type? Only this?
if ( !objMessage.isBodyAssignableTo( typeClass ) )
{
//put back in queue? Or does it stay there until acknowledge?

}

//It's the right type
else
{
objMessage.acknowledge();
}


} finally {
if (connection != null) {
connection.close();
}
}

最佳答案

您可以使用“JMS 消息选择器”(最好是阅读 JMS 规范第 3.8 节,或者您也可以阅读一些描述 here ,或者阅读 this 了解理论)。基本上,JMS 消息选择器使 JMS 提供者能够过滤并仅发送 JMS 使用者感兴趣的消息,而不发送 JMS 提供者收到的所有消息。

所以,事情是这样的:

  • 消息生产者将生成具有某些特定属性的消息以启用消息选择。
  • 消息使用者将创建一个指定消息选择标准的使用者。
  • 在向该消费者传递消息之前,JMS 提供程序将检查是否满足消费者指定的消息选择标准,如果不满足,则不会将该特定消息传递给该特定消费者。

在生产者端,您可以指定字符串属性,如下所示,这只是一个示例,您可以添加对您有意义的字符串属性。

        Message hellowWorldText = session.createTextMessage("Hello World! " + new Date());
hellowWorldText.setStringProperty("StockSector", "Technology");

在消费者端,创建消费者时您可以指定消息选择标准:

        String selector = new String("(StockSector = 'Technology')");
MessageConsumer consumer = session.createConsumer(queue, selector);

请注意,您可以指定多个消息选择属性/条件,因此根据您的需要,您可以添加任意多个条件,您可以将它们分组为单个条件,也可以添加单独的条件。

下面是完整的工作代码示例,您只需要确保生产者和消费者的选择器匹配,因此在生产者中您不能使用当前日期/时间戳等内容作为消息选择属性,因为在消费者端您无法指定一样。

JmsProducerQueueClient:

import java.util.Date;
import java.util.Hashtable;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.QueueSession;
import javax.jms.Session;
import javax.naming.Context;
import javax.naming.InitialContext;
import javax.naming.NamingException;

public class JmsProducerQueueClient {
public static void main(String[] args) throws NamingException, JMSException {
Connection connection = null;
try {
Context context = getInitialContext();
ConnectionFactory connectionFactory = (ConnectionFactory) context.lookup("ConnectionFactory2");
connection = connectionFactory.createConnection();
Session session = connection.createSession(false, QueueSession.AUTO_ACKNOWLEDGE);
Queue queue = (Queue) context.lookup("Queue0");
connection.start();
MessageProducer producer = session.createProducer(queue);
Message hellowWorldText = session.createTextMessage("Hello World! " + new Date());
hellowWorldText.setStringProperty("StockSector", "Finance");
producer.send(hellowWorldText);
} finally {
if (connection != null) {
connection.close();
}
}

}

@SuppressWarnings({ "rawtypes", "unchecked" })
public static Context getInitialContext() throws NamingException {
Hashtable env = new Hashtable();
env.put(Context.INITIAL_CONTEXT_FACTORY, "weblogic.jndi.WLInitialContextFactory");
env.put(Context.PROVIDER_URL, "t3://localhost:8208");
Context context = new InitialContext(env);
return context;
}
}

JmsConsumerQueueClient:

import java.util.Hashtable;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.Queue;
import javax.jms.QueueSession;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.naming.Context;
import javax.naming.InitialContext;
import javax.naming.NamingException;

public class JmsConsumerQueueClient {
public static void main(String[] args) throws NamingException, JMSException {
Connection connection = null;
try {
Context context = getInitialContext();
ConnectionFactory connectionFactory = (ConnectionFactory) context.lookup("ConnectionFactory1");
connection = connectionFactory.createConnection();
Session session = connection.createSession(false, QueueSession.AUTO_ACKNOWLEDGE);
Queue queue = (Queue) context.lookup("Queue0");
String selector = new String("(StockSector = 'Technology')");
MessageConsumer consumer = session.createConsumer(queue, selector);
connection.start();
TextMessage hellowWorldText = (TextMessage) consumer.receive();

System.out.println("> " + hellowWorldText + " | " + hellowWorldText.getText());

} finally {
if (connection != null) {
connection.close();
}
}

}

@SuppressWarnings({ "unchecked", "rawtypes" })
public static Context getInitialContext() throws NamingException {
Hashtable env = new Hashtable();
env.put(Context.INITIAL_CONTEXT_FACTORY, "weblogic.jndi.WLInitialContextFactory");
env.put(Context.PROVIDER_URL, "t3://localhost:7001");
Context context = new InitialContext(env);
return context;
}
}

关于java - 如何手动从 JMS 队列中仅获取特定类型的消息并保留所有其他消息?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/44636645/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com