gpt4 book ai didi

java - 当崩溃的核心客户端重新连接时,将重新传送已确认的消息

转载 作者:行者123 更新时间:2023-12-02 10:34:43 25 4
gpt4 key购买 nike

我已经设置了一个在本地运行的独立 HornetQ 实例。出于测试目的,我使用 HornetQ core API 创建了一个消费者,它将每 500 毫秒接收一条消息。

当我的客户端连接并读取队列中的所有消息时,我在消费者端遇到了奇怪的行为,如果我强制关闭它(没有正确关闭 session /连接),那么下次我再次启动这个消费者时,它将读取队列中的旧消息。这是我的消费者示例:

//HornetQ 消费者代码

   public void readMessage() {
ClientSession session = null;
try {
if (sf != null) {
session = sf.createSession(true, true);

ClientConsumer messageConsumer = session.createConsumer(JMS_QUEUE_NAME);
session.start();

while (true) {
ClientMessage messageReceived = messageConsumer.receive(1000);
if (messageReceived != null && messageReceived.getStringProperty(MESSAGE_PROPERTY_NAME) != null) {
System.out.println("Received JMS TextMessage:" + messageReceived.getStringProperty(MESSAGE_PROPERTY_NAME));
messageReceived.acknowledge();
}

Thread.sleep(500);
}
}
} catch (Exception e) {
LOGGER.error("Error while adding message by producer.", e);
} finally {
try {
session.close();
} catch (HornetQException e) {
LOGGER.error("Error while closing producer session,", e);
}
}
}

有人可以告诉我为什么它会这样工作,以及我应该在客户端/服务器端使用什么样的配置,以便如果消费者读取消息,它将从队列中删除该消息?

最佳答案

确认完成后,您不会提交 session ,并且您不会在启用自动提交确认的情况下创建 session 。因此,您应该执行以下操作之一:

  • 在一次或多次调用 acknowledge() 后显式调用 session.commit()
  • 或者通过使用 sf.createSession(true,true)sf.createSession(false,true) ( boolean 值)创建 session 来启用隐式自动提交确认第二个是控制确认的自动提交)。

请记住,当您启用确认的自动提交时,有一个内部缓冲区需要在确认刷新到代理之前达到特定的大小。像这样的批量确认可以极大地提高某些大容量用例的性能。默认情况下,您需要确认 1,048,576 字节的消息才能刷新缓冲区并将确认发送到代理。您可以通过在 ServerLocator 实例上调用 setAckBatchSize 或使用不同的 createSession 方法(例如 sf. createSession(true, true, myAckBatchSize))。

如果确认缓冲区没有刷新并且您的客户端崩溃,那么当客户端返回时,相应的消息仍将在队列中。如果缓冲区尚未达到其阈值,当消费者正常关闭时,它仍然会被刷新。

关于java - 当崩溃的核心客户端重新连接时,将重新传送已确认的消息,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/53376762/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com