- mongodb - 在 MongoDB mapreduce 中,如何展平值对象?
- javascript - 对象传播与 Object.assign
- html - 输入类型 ="submit"Vs 按钮标签它们可以互换吗?
- sql - 使用 MongoDB 而不是 MS SQL Server 的优缺点
我有一个 JMS 客户端,它正在生成消息并通过 JMS 队列发送给它的唯一消费者。
我想要的是不止一个消费者收到这些消息。我首先想到的是将队列转换为主题,以便当前和新的消费者可以订阅并将相同的消息传递给他们。
这显然将涉及在生产者和消费者方面修改当前客户端代码。
我还想看看其他选项,例如创建第二个队列,这样我就不必修改现有的消费者。我相信这种方法有一些优点,比如(如果我错了,请纠正我)平衡两个不同队列而不是一个队列之间的负载,这可能会对性能产生积极影响。
我想就您可能会看到的这些选项和缺点/优点获得建议。非常感谢任何反馈。
最佳答案
正如你所说,你有几个选择。
如果将其转换为主题以获得相同的效果,则需要使消费者成为持久消费者。如果您的消费者不活着,队列提供的一件事是持久性。这取决于您使用的 MQ 系统。
如果您想坚持使用队列,您将为每个消费者创建一个队列,并创建一个监听原始队列的调度程序。
Producer -> Queue_Original <- Dispatcher -> Queue_Consumer_1 <- Consumer_1
-> Queue_Consumer_2 <- Consumer_2
-> Queue_Consumer_3 <- Consumer_3
主题的优点
主题的缺点
队列的优点
队列的缺点
在开发消息传递系统时,我更喜欢主题,因为它给了我最大的权力,但是鉴于您已经在使用队列,它需要您更改系统的工作方式来实现主题。
多消费者队列系统的设计与实现
Producer -> Queue_Original <- Dispatcher -> Queue_Consumer_1 <- Consumer_1
-> Queue_Consumer_2 <- Consumer_2
-> Queue_Consumer_3 <- Consumer_3
来源
请记住,您还需要处理其他一些事情,例如问题异常处理、重新连接到连接以及在失去连接时排队等。这只是为了让您了解如何处理完成我所描述的。
在真实系统中,我可能不会在第一个异常时退出。我会允许系统继续尽其所能地运行并记录错误。在这段代码中,如果将消息放入单个消费者队列失败,整个调度程序将停止。
Dispatcher.java
/*
* To change this template, choose Tools | Templates
* and open the template in the editor.
*/
package stackoverflow_4615895;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.QueueConnection;
import javax.jms.QueueConnectionFactory;
import javax.jms.QueueSession;
import javax.jms.Session;
public class Dispatcher {
private static long QUEUE_WAIT_TIME = 1000;
private boolean mStop = false;
private QueueConnectionFactory mFactory;
private String mSourceQueueName;
private String[] mConsumerQueueNames;
/**
* Create a dispatcher
* @param factory
* The QueueConnectionFactory in which new connections, session, and consumers
* will be created. This is needed to ensure the connection is associated
* with the correct thread.
* @param source
*
* @param consumerQueues
*/
public Dispatcher(
QueueConnectionFactory factory,
String sourceQueue,
String[] consumerQueues) {
mFactory = factory;
mSourceQueueName = sourceQueue;
mConsumerQueueNames = consumerQueues;
}
public void start() {
Thread thread = new Thread(new Runnable() {
public void run() {
Dispatcher.this.run();
}
});
thread.setName("Queue Dispatcher");
thread.start();
}
public void stop() {
mStop = true;
}
private void run() {
QueueConnection connection = null;
MessageProducer producer = null;
MessageConsumer consumer = null;
QueueSession session = null;
try {
// Setup connection and queues for receiving the messages
connection = mFactory.createQueueConnection();
session = connection.createQueueSession(false, Session.DUPS_OK_ACKNOWLEDGE);
Queue sourceQueue = session.createQueue(mSourceQueueName);
consumer = session.createConsumer(sourceQueue);
// Create a null producer allowing us to send messages
// to any queue.
producer = session.createProducer(null);
// Create the destination queues based on the consumer names we
// were given.
Queue[] destinationQueues = new Queue[mConsumerQueueNames.length];
for (int index = 0; index < mConsumerQueueNames.length; ++index) {
destinationQueues[index] = session.createQueue(mConsumerQueueNames[index]);
}
connection.start();
while (!mStop) {
// Only wait QUEUE_WAIT_TIME in order to give
// the dispatcher a chance to see if it should
// quit
Message m = consumer.receive(QUEUE_WAIT_TIME);
if (m == null) {
continue;
}
// Take the message we received and put
// it in each of the consumers destination
// queues for them to process
for (Queue q : destinationQueues) {
producer.send(q, m);
}
}
} catch (JMSException ex) {
// Do wonderful things here
} finally {
if (producer != null) {
try {
producer.close();
} catch (JMSException ex) {
}
}
if (consumer != null) {
try {
consumer.close();
} catch (JMSException ex) {
}
}
if (session != null) {
try {
session.close();
} catch (JMSException ex) {
}
}
if (connection != null) {
try {
connection.close();
} catch (JMSException ex) {
}
}
}
}
}
Main.java
QueueConnectionFactory factory = ...;
Dispatcher dispatcher =
new Dispatcher(
factory,
"Queue_Original",
new String[]{
"Consumer_Queue_1",
"Consumer_Queue_2",
"Consumer_Queue_3"});
dispatcher.start();
关于java - JMS - 从一个消费者到多个消费者,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/4615744/
我正在读这个 question和 corresponding answer并被答案第一行中的术语 JMS broker 弄糊涂了: MS (ActiveMQ is a JMS broker imple
我正在学习 API 中的 Reactive Streams,我对它与 JMS 之间的相似性感到震惊。在 JMS 中,我们也有异步处理、发布者和订阅者。我在进行这种等效时缺少什么观点? 最佳答案 Rea
假设生产者向 JMS 主题“新闻”发送一条消息。消费者 1 读取了消息,但消费者 2 处于离线状态,因此他还没有读取消息。 是否有任何内置(针对规范或实现)的方式来通知生产者消费者 1 已阅读他的消息
目前我正在开发一个 JMS 应用程序。但我使用普通的 JMS API 和属性文件进行配置。我的应用程序在 Weblogic 中运行并连接到我客户端的 MQ 系列服务器。 最近我知道我可以使用 Webl
我正在尝试使用 Solace 中可用的异步发送功能,但我打算使用 JMS 进行抽象,而不是直接使用 JCSMP 使用它。 JMS 2.0 支持异步发送以及其他新功能:http://www.oracle
我无法获得 javax.jms.ConnectionFactory注入(inject)我的独立 JMS 客户端。 我得到一个 java.lang.NullPointerException在 conne
保持 JMS 连接/ session /消费者始终打开是一种不好的做法吗? 代码草稿示例: // app startup code ConnectionFactory cf = (Connection
我有几个作业,每个作业都有多条消息排队。每个作业的消息随机交错。如果用户决定取消作业,我想从队列中删除属于该作业的所有消息。我已经能够使用 browse() 找到所有要删除的消息,但一直无法弄清楚如何
是否可以将主题配置为仅存储最后一条消息的副本并将其发送到新连接而不知道客户端标识符或其他信息? 更新: 从 Shashi 提供的信息中,我发现这两页使用 retroactive consumer 描述
目前正在使用 WebLogic 和分布式队列。我从文档中了解到,分布式队列允许您使用全局 JNDI 名称检索到集群中任何队列的连接。分布式队列为您提供的主要功能之一似乎是跨多个托管服务器的负载平衡连接
再见,我的基本要求是有一个可以发送消息的路由,并将其放在 JMS 队列中。 camel 上下文在 JavaEE 6 容器中运行,即 JBoss AS 7.1.1,因此它是 HornetQ for JM
我正在阅读 JMS 2.0 规范,其中提到(相关摘录下方)如果客户端尝试修改 Message 对象,则 JMS 提供程序可能会抛出异常。 我的问题是 JMS 提供者如何知道客户端是否试图修改 Mess
我的 spring 上下文文件中有以下设置。 "PowerEventQueue" “${
我正在尝试使用 JSP 连接到 ActiveMQ。但是,当我运行该程序时,它给了我以下类型的异常: NoClassDefFoundError: javax/jms/Destination . 我不确定
我刚看了CORBA和JMS,他们好像都是用来实现的代理架构/模式。 我对他们有几个问题 1.他们之间的区别我还不是很清楚,谁能解释一下? 2.CORBA 是否用于当今的 IT 解决方案?还是正在失去魅
我正在更新现有的 Mule 配置,任务是增强它以根据消息的某些属性将消息路由到不同的端点,因此最好对我手头的两个选项有一些利弊: 在消息上添加属性,使用“message-properties-tran
我有一个订阅 JMS 主题应用程序的 Java 应用程序,该应用程序偶尔会出现以下异常: javax.jms.JMSException: Connection has been terminated
我知道 Camel 的 JMS 组件用于接收消息,使用 Springs DefaultMessageListenerContainer。它可以配置为使用 CLIENT_ACKNOWLEDGE 模式来确
通常不鼓励使用从 JMS 提供者返回的消息 ID 作为相关 ID,将消息发布到队列中。人们如何为请求/响应架构生成相关 ID? 最佳答案 客户端可以使用唯一的 ID 标准,如 UUID生成新的 ID。
我有一个简单的代码可以将 2 条消息放入队列中。 1) 我用两台服务器设置了 connectionNameList。 2) 这两个服务器是独立的,但有相同的队列管理器和定义相同名称的队列,例如“QMg
我是一名优秀的程序员,十分优秀!