gpt4 book ai didi

java - 新创建的客户端是否知道 hornetq 中的旧消息?

转载 作者:搜寻专家 更新时间:2023-11-01 01:27:23 25 4
gpt4 key购买 nike

我在 hornetQ 中与一个消费者创建了 session ,然后我使用生产者在队列中添加了 4 条消息。在此之后我创建了新的消费者。

这个消费者会知道旧消息吗?

如果不是,是否可以在 XML 中配置它?

我创建了新的消费者,它无法获取以前的消息。我只是想确认这种行为是否正确?我没有在文档中找到任何帮助。

以下是代码片段:

TextMessage receivedMessage = (TextMessage)consumer.receive(); 
receivedMessage.acknowledge();
System.out.println("Got order: " + receivedMessage.getText());
//consumer.close();
MessageConsumer newConsumer = session.createConsumer(orderQueue);
receivedMessage = (TextMessage)newConsumer.receive();
receivedMessage.acknowledge();
System.out.println("Got order: " + receivedMessage.getText());


如果我取消注释 consumer.close() 行,它工作正常。
我的hornetq-jms.xml

<connection-factory name="NettyConnectionFactory">
<xa>true</xa>
<connectors>
<connector-ref connector-name="netty"/>
</connectors>
<entries>
<entry name="/XAConnectionFactory"/>
</entries>
<consumer-window-size>0</consumer-window-size>
</connection-factory>

<connection-factory name="NettyConnectionFactory">
<xa>false</xa>
<connectors>
<connector-ref connector-name="netty"/>
</connectors>
<entries>
<entry name="/ConnectionFactory"/>
</entries>
<consumer-window-size>0</consumer-window-size>
</connection-factory>

<connection-factory name="NettyThroughputConnectionFactory">
<xa>true</xa>
<connectors>
<connector-ref connector-name="netty-throughput"/>
</connectors>
<entries>
<entry name="/XAThroughputConnectionFactory"/>
</entries>
<consumer-window-size>0</consumer-window-size>
</connection-factory>

<connection-factory name="NettyThroughputConnectionFactory">
<xa>false</xa>
<connectors>
<connector-ref connector-name="netty-throughput"/>
</connectors>
<entries>
<entry name="/ThroughputConnectionFactory"/>
</entries>
<consumer-window-size>0</consumer-window-size>
</connection-factory>

连接工厂的代码片段

TransportConfiguration transportConfiguration = new
TransportConfiguration(NettyConnectorFactory.class.getName());
HornetQConnectionFactory cf =
HornetQJMSClient.createConnectionFactoryWithoutHA(JMSFactoryType.CF,getTransportConfiguration());
Queue orderQueue = HornetQJMSClient.createQueue("MutationPipelineQueue");

getTransportConfiguration() 的代码:

private synchronized static TransportConfiguration getTransportConfiguration() {
HashMap<String, TransportConfiguration> transportConfigurationMap = new HashMap<String, TransportConfiguration>();
TransportConfiguration tc = transportConfigurationMap.get("machinename:5455");
if(tc == null){
Map<String, Object> connectionParams = new HashMap<String, Object>();
connectionParams.put(org.hornetq.core.remoting.impl.netty.TransportConstants.HOST_PROP_NAME,"machinename");
connectionParams.put(org.hornetq.core.remoting.impl.netty.TransportConstants.PORT_PROP_NAME,Integer.valueOf("5455"));
tc = new TransportConfiguration(NettyConnectorFactory.class.getName(), connectionParams);
transportConfigurationMap.put("machinename:5455", tc);
}
return tc;

最佳答案

是的,它会知道您的旧消息。但是在这种情况下,您的旧消费者仍然处于打开状态,因此消费者将在其缓冲区中缓存消息,除非您关闭它,或者您更改 consumer-window-size = 0。

大多数消息系统会在消费者上提前缓存,因此下次您在消费者上调用 receive 时,消息将准备好接收。

但是,如果您的消费者速度很慢并且您没有那么多消息,则消息将位于客户端的缓冲区中,直到您关闭该消费者。

对于生产中的快速消费者,最好始终提前缓存,因为这将提高您的吞吐量,如果没有缓存,您的吞吐量将受到网络延迟的限制。

在 HornetQ 案例中,您可以通过设置 consumer-window-size=0 来应对缓慢的消费者。

http://docs.jboss.org/hornetq/2.3.0.beta3/docs/user-manual/html/flow-control.html#flow-control.consumer.window

如果您通过 JNDI 查找实例化连接工厂:

   <connection-factory name="ConnectionFactory">
<connectors>
<connector-ref connector-name="netty-connector"/>
</connectors>
<entries>
<entry name="ConnectionFactory"/>
</entries>

<!-- We set the consumer window size to 0, which means messages are not buffered at all
on the client side -->
<consumer-window-size>0</consumer-window-size>

</connection-factory>

或者在您直接实例化连接工厂的情况下,您必须在实例中设置 consumerWindowSize:

TransportConfiguration transportConfiguration = new
TransportConfiguration(NettyConnectorFactory.class.getName());
HornetQConnectionFactory cf =
HornetQJMSClient.createConnectionFactoryWithoutHA(JMSFactoryType.CF,getTransportConfiguration());
cf.setConsumerWindowSize(0) // <<<<<< here

这是来自 HornetQ 发行版的运行示例,位于 examples/jms/no-consumer-buffering。它与您的代码片段完全相同,并且每次都有效:

 // Step 5. Create a JMS Session
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

// Step 6. Create a JMS Message Producer
MessageProducer producer = session.createProducer(queue);

// Step 7. Create a JMS MessageConsumer

MessageConsumer consumer1 = session.createConsumer(queue);

// Step 8. Start the connection

connection.start();

// Step 9. Send 10 messages to the queue

final int numMessages = 10;

for (int i = 0; i < numMessages; i++)
{
TextMessage message = session.createTextMessage("This is text message: " + i);

producer.send(message);
}


System.out.println("Sent messages");

// Step 10. Create another consumer on the same queue

MessageConsumer consumer2 = session.createConsumer(queue);

// Step 11. Consume three messages from consumer2

for (int i = 0; i < 3; i++)
{
TextMessage message = (TextMessage)consumer2.receive(2000);

System.out.println("Consumed message from consumer2: " + message.getText());
}

正如您在此示例中所见,正在接收旧消息。

任何与此不同的都是您的系统配置错误。也许您没有设置正确的连接工厂?

顺便说一句:在 ActiveMQ 上,您可以管理预取限制以管理相同的行为:

http://activemq.apache.org/what-is-the-prefetch-limit-for.html

此问题与 JMS queue with multiple consumers 完全相同

至于追溯消息,这是 ActiveMQ 上的另一个仅适用于主题的功能,即使用旧消息创建的订阅。

关于java - 新创建的客户端是否知道 hornetq 中的旧消息?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/15201590/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com