gpt4 book ai didi

java - 使用核心 api 消费后 HornetQ 消息仍保留在队列中

转载 作者:塔克拉玛干 更新时间:2023-11-03 04:06:27 24 4
gpt4 key购买 nike

我是 HornetQ 的新手,所以请多多包涵。首先让我告诉你我的要求:

我需要一个消息队列中间件,它可以在具有低延迟和持久性的不同进程之间传递大约 1k 大小的消息(即它应该在系统崩溃后仍然存在)。我会有多个进程写入相同的队列,并且类似地有多个进程从同一队列读取。

为此,我选择了 HornetQ,因为它在持久性消息传递方面的评级最高。

我目前使用 Hornetq v2.2.2Final 作为独立服务器
我能够使用核心 api (ClientSession) 成功创建持久/非持久队列,并成功将消息发布到队列 (ClientProducer)
同样,我能够使用核心 api (ClientConsumer) 从队列中读取消息。

此后问题就来了,当客户端读取消息后,消息仍然留在队列中,即队列中的消息数保持不变。也许我弄错了,但我的印象是,一旦消息被消费 (read + ack),它就会从队列中删除。但这并没有发生就我而言,相同的消息被一遍又一遍地阅读。

另外,我想告诉大家,我已经尝试过将非持久队列与非持久消息一起使用。但问题仍然存在

我正在使用的生产者代码:

public class HQProducer implements Runnable {

private ClientProducer producer;
private boolean killme;
private ClientSession session;
private boolean durableMsg;

public HQProducer(String host, int port, String address, String queueName,
boolean deleteQ, boolean durable, boolean durableMsg, int pRate) {
this.durableMsg = durableMsg;
try {
HashMap map = new HashMap();
map.put("host", host);
map.put("port", port);

TransportConfiguration config = new TransportConfiguration(NettyConnectorFactory.class.getName(), map);

ServerLocator locator = HornetQClient.createServerLocatorWithoutHA(config);

ClientSessionFactory factory = locator.createSessionFactory();

session = factory.createSession();

if (queueExists(queueName)) {
if (deleteQ) {
System.out.println("Deleting existing queue :: " + queueName);
session.deleteQueue(queueName);
System.out.println("Creating queue :: " + queueName);
session.createQueue(address, queueName, true);
}
} else {
System.out.println("Creating new queue :: " + queueName);
session.createQueue(address, queueName, durable);
}
producer = session.createProducer(SimpleString.toSimpleString(address), pRate);

killme = false;
} catch (Exception ex) {
Logger.getLogger(HQTestProducer.class.getName()).log(Level.SEVERE, null, ex);
}
}

@Override
public void run() {
long time = System.currentTimeMillis();
int cnt = 0;
long timediff;
while (!killme) {
try {
ClientMessage message = session.createMessage(durableMsg);

message.getBodyBuffer().writeString("Hello world");

producer.send(message);
cnt++;
timediff = ((System.currentTimeMillis() - time) / 1000);
if (timediff >= 1) {
System.out.println("Producer tps :: " + cnt);
cnt = 0;
time = System.currentTimeMillis();
}
} catch (HornetQException ex) {
Logger.getLogger(HQProducer.class.getName()).log(Level.SEVERE, null, ex);
}
}
try {
session.close();
} catch (HornetQException ex) {
Logger.getLogger(HQProducer.class.getName()).log(Level.SEVERE, null, ex);
}
}

public void setKillMe(boolean killme) {
this.killme = killme;
}

private boolean queueExists(String qname) {
boolean res = false;
try {
//ClientSession.BindingQuery bq = session.bindingQuery(SimpleString.toSimpleString(qname));
QueueQuery queueQuery = session.queueQuery(SimpleString.toSimpleString(qname));
if (queueQuery.isExists()) {
res = true;
}
} catch (HornetQException ex) {
res = false;
}
return res;
}
}

消费者的代码也是:

public class HQConsumer implements Runnable {

private ClientSession session;
private ClientConsumer consumer;
private boolean killMe;

public HQConsumer(String host, int port, String queueName, boolean browseOnly) {
try {
HashMap map = new HashMap();
map.put("host", host);
map.put("port", port);

TransportConfiguration config = new TransportConfiguration(NettyConnectorFactory.class.getName(), map);

ServerLocator locator = HornetQClient.createServerLocatorWithoutHA(config);

ClientSessionFactory factory = locator.createSessionFactory();

session = factory.createSession();

session.start();

consumer = session.createConsumer(queueName, "",0,-1,browseOnly);

killMe = false;
} catch (Exception ex) {
Logger.getLogger(HQTestProducer.class.getName()).log(Level.SEVERE, null, ex);
}
}

@Override
public void run() {
long time = System.currentTimeMillis();
int cnt = 0;
long timediff;
while (!killMe) {
try {
ClientMessage msgReceived = consumer.receive();
msgReceived.acknowledge();
//System.out.println("message = " + msgReceived.getBodyBuffer().readString());
cnt++;
timediff = ((System.currentTimeMillis() - time) / 1000);
if (timediff >= 1) {
System.out.println("ConSumer tps :: " + cnt);
cnt = 0;
time = System.currentTimeMillis();
}
} catch (HornetQException ex) {
Logger.getLogger(HQConsumer.class.getName()).log(Level.SEVERE, null, ex);
}
}
try {
session.close();
} catch (HornetQException ex) {
Logger.getLogger(HQConsumer.class.getName()).log(Level.SEVERE, null, ex);
}
}

public void setKillMe(boolean killMe) {
this.killMe = killMe;
}
}

HornetQ 服务器配置::

<configuration xmlns="urn:hornetq"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="urn:hornetq /schema/hornetq-configuration.xsd">

<paging-directory>${data.dir:../data}/paging</paging-directory>

<bindings-directory>${data.dir:../data}/bindings</bindings-directory>

<journal-directory>${data.dir:../data}/journal</journal-directory>

<journal-min-files>10</journal-min-files>

<large-messages-directory>${data.dir:../data}/large-messages</large-messages-directory>

<connectors>
<connector name="netty">
<factory-class>org.hornetq.core.remoting.impl.netty.NettyConnectorFactory</factory-class>
<param key="host" value="${hornetq.remoting.netty.host:localhost}"/>
<param key="port" value="${hornetq.remoting.netty.port:5445}"/>
</connector>

<connector name="netty-throughput">
<factory-class>org.hornetq.core.remoting.impl.netty.NettyConnectorFactory</factory-class>
<param key="host" value="${hornetq.remoting.netty.host:localhost}"/>
<param key="port" value="${hornetq.remoting.netty.batch.port:5455}"/>
<param key="batch-delay" value="50"/>
</connector>
</connectors>

<acceptors>
<acceptor name="netty">
<factory-class>org.hornetq.core.remoting.impl.netty.NettyAcceptorFactory</factory-class>
<param key="host" value="${hornetq.remoting.netty.host:localhost}"/>
<param key="port" value="${hornetq.remoting.netty.port:5445}"/>
</acceptor>

<acceptor name="netty-throughput">
<factory-class>org.hornetq.core.remoting.impl.netty.NettyAcceptorFactory</factory-class>
<param key="host" value="${hornetq.remoting.netty.host:localhost}"/>
<param key="port" value="${hornetq.remoting.netty.batch.port:5455}"/>
<param key="batch-delay" value="50"/>
<param key="direct-deliver" value="false"/>
</acceptor>
</acceptors>

<security-settings>
<security-setting match="#">
<permission type="createNonDurableQueue" roles="guest"/>
<permission type="deleteNonDurableQueue" roles="guest"/>
<permission type="createDurableQueue" roles="guest"/>
<permission type="deleteDurableQueue" roles="guest"/>
<permission type="consume" roles="guest"/>
<permission type="send" roles="guest"/>
</security-setting>
</security-settings>

<address-settings>
<!--default for catch all-->
<address-setting match="#">
<dead-letter-address>jms.queue.DLQ</dead-letter-address>
<expiry-address>jms.queue.ExpiryQueue</expiry-address>
<redelivery-delay>0</redelivery-delay>
<max-size-bytes>10485760</max-size-bytes>
<message-counter-history-day-limit>10</message-counter-history-day-limit>
<address-full-policy>BLOCK</address-full-policy>
</address-setting>
</address-settings>

</configuration>

最佳答案

使用 hornetq 核心 api,您必须明确确认一条消息。我看不到您的测试中发生了什么。

如果您没有确认,这就是您的消息被阻止的原因。我需要查看您的完整示例才能为您提供完整的答案。

另外:你应该定义你的createSession:createSession(true, true, 0)

核心 API 具有批量 ACK 的选项。您没有使用事务处理 session ,因此在您达到 serverLocator 上配置的 ackBatchSize 之前,您不会向服务器发送 ack。有了这个,只要您在消息中调用 acknowledge() ,任何 ack 都会被发送到服务器。

您当前使用的选项相当于具有特定 DUPS_SIZE 的 JMS DUPS_OK。

(在与您进行一些迭代后发布编辑我的初始答案)

关于java - 使用核心 api 消费后 HornetQ 消息仍保留在队列中,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/6452505/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com