- iOS/Objective-C 元类和类别
- objective-c - -1001 错误,当 NSURLSession 通过 httpproxy 和/etc/hosts
- java - 使用网络类获取 url 地址
- ios - 推送通知中不播放声音
我是 HornetQ 的新手,所以请多多包涵。首先让我告诉你我的要求:
我需要一个消息队列中间件,它可以在具有低延迟和持久性的不同进程之间传递大约 1k 大小的消息(即它应该在系统崩溃后仍然存在)。我会有多个进程写入相同的队列,并且类似地有多个进程从同一队列读取。
为此,我选择了 HornetQ,因为它在持久性消息传递方面的评级最高。
我目前使用 Hornetq v2.2.2Final 作为独立服务器。
我能够使用核心 api (ClientSession) 成功创建持久/非持久队列,并成功将消息发布到队列 (ClientProducer)。
同样,我能够使用核心 api (ClientConsumer) 从队列中读取消息。
此后问题就来了,当客户端读取消息后,消息仍然留在队列中,即队列中的消息数保持不变。也许我弄错了,但我的印象是,一旦消息被消费 (read + ack),它就会从队列中删除。但这并没有发生就我而言,相同的消息被一遍又一遍地阅读。
另外,我想告诉大家,我已经尝试过将非持久队列与非持久消息一起使用。但问题仍然存在。
我正在使用的生产者代码:
public class HQProducer implements Runnable {
private ClientProducer producer;
private boolean killme;
private ClientSession session;
private boolean durableMsg;
public HQProducer(String host, int port, String address, String queueName,
boolean deleteQ, boolean durable, boolean durableMsg, int pRate) {
this.durableMsg = durableMsg;
try {
HashMap map = new HashMap();
map.put("host", host);
map.put("port", port);
TransportConfiguration config = new TransportConfiguration(NettyConnectorFactory.class.getName(), map);
ServerLocator locator = HornetQClient.createServerLocatorWithoutHA(config);
ClientSessionFactory factory = locator.createSessionFactory();
session = factory.createSession();
if (queueExists(queueName)) {
if (deleteQ) {
System.out.println("Deleting existing queue :: " + queueName);
session.deleteQueue(queueName);
System.out.println("Creating queue :: " + queueName);
session.createQueue(address, queueName, true);
}
} else {
System.out.println("Creating new queue :: " + queueName);
session.createQueue(address, queueName, durable);
}
producer = session.createProducer(SimpleString.toSimpleString(address), pRate);
killme = false;
} catch (Exception ex) {
Logger.getLogger(HQTestProducer.class.getName()).log(Level.SEVERE, null, ex);
}
}
@Override
public void run() {
long time = System.currentTimeMillis();
int cnt = 0;
long timediff;
while (!killme) {
try {
ClientMessage message = session.createMessage(durableMsg);
message.getBodyBuffer().writeString("Hello world");
producer.send(message);
cnt++;
timediff = ((System.currentTimeMillis() - time) / 1000);
if (timediff >= 1) {
System.out.println("Producer tps :: " + cnt);
cnt = 0;
time = System.currentTimeMillis();
}
} catch (HornetQException ex) {
Logger.getLogger(HQProducer.class.getName()).log(Level.SEVERE, null, ex);
}
}
try {
session.close();
} catch (HornetQException ex) {
Logger.getLogger(HQProducer.class.getName()).log(Level.SEVERE, null, ex);
}
}
public void setKillMe(boolean killme) {
this.killme = killme;
}
private boolean queueExists(String qname) {
boolean res = false;
try {
//ClientSession.BindingQuery bq = session.bindingQuery(SimpleString.toSimpleString(qname));
QueueQuery queueQuery = session.queueQuery(SimpleString.toSimpleString(qname));
if (queueQuery.isExists()) {
res = true;
}
} catch (HornetQException ex) {
res = false;
}
return res;
}
}
消费者的代码也是:
public class HQConsumer implements Runnable {
private ClientSession session;
private ClientConsumer consumer;
private boolean killMe;
public HQConsumer(String host, int port, String queueName, boolean browseOnly) {
try {
HashMap map = new HashMap();
map.put("host", host);
map.put("port", port);
TransportConfiguration config = new TransportConfiguration(NettyConnectorFactory.class.getName(), map);
ServerLocator locator = HornetQClient.createServerLocatorWithoutHA(config);
ClientSessionFactory factory = locator.createSessionFactory();
session = factory.createSession();
session.start();
consumer = session.createConsumer(queueName, "",0,-1,browseOnly);
killMe = false;
} catch (Exception ex) {
Logger.getLogger(HQTestProducer.class.getName()).log(Level.SEVERE, null, ex);
}
}
@Override
public void run() {
long time = System.currentTimeMillis();
int cnt = 0;
long timediff;
while (!killMe) {
try {
ClientMessage msgReceived = consumer.receive();
msgReceived.acknowledge();
//System.out.println("message = " + msgReceived.getBodyBuffer().readString());
cnt++;
timediff = ((System.currentTimeMillis() - time) / 1000);
if (timediff >= 1) {
System.out.println("ConSumer tps :: " + cnt);
cnt = 0;
time = System.currentTimeMillis();
}
} catch (HornetQException ex) {
Logger.getLogger(HQConsumer.class.getName()).log(Level.SEVERE, null, ex);
}
}
try {
session.close();
} catch (HornetQException ex) {
Logger.getLogger(HQConsumer.class.getName()).log(Level.SEVERE, null, ex);
}
}
public void setKillMe(boolean killMe) {
this.killMe = killMe;
}
}
HornetQ 服务器配置::
<configuration xmlns="urn:hornetq"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="urn:hornetq /schema/hornetq-configuration.xsd">
<paging-directory>${data.dir:../data}/paging</paging-directory>
<bindings-directory>${data.dir:../data}/bindings</bindings-directory>
<journal-directory>${data.dir:../data}/journal</journal-directory>
<journal-min-files>10</journal-min-files>
<large-messages-directory>${data.dir:../data}/large-messages</large-messages-directory>
<connectors>
<connector name="netty">
<factory-class>org.hornetq.core.remoting.impl.netty.NettyConnectorFactory</factory-class>
<param key="host" value="${hornetq.remoting.netty.host:localhost}"/>
<param key="port" value="${hornetq.remoting.netty.port:5445}"/>
</connector>
<connector name="netty-throughput">
<factory-class>org.hornetq.core.remoting.impl.netty.NettyConnectorFactory</factory-class>
<param key="host" value="${hornetq.remoting.netty.host:localhost}"/>
<param key="port" value="${hornetq.remoting.netty.batch.port:5455}"/>
<param key="batch-delay" value="50"/>
</connector>
</connectors>
<acceptors>
<acceptor name="netty">
<factory-class>org.hornetq.core.remoting.impl.netty.NettyAcceptorFactory</factory-class>
<param key="host" value="${hornetq.remoting.netty.host:localhost}"/>
<param key="port" value="${hornetq.remoting.netty.port:5445}"/>
</acceptor>
<acceptor name="netty-throughput">
<factory-class>org.hornetq.core.remoting.impl.netty.NettyAcceptorFactory</factory-class>
<param key="host" value="${hornetq.remoting.netty.host:localhost}"/>
<param key="port" value="${hornetq.remoting.netty.batch.port:5455}"/>
<param key="batch-delay" value="50"/>
<param key="direct-deliver" value="false"/>
</acceptor>
</acceptors>
<security-settings>
<security-setting match="#">
<permission type="createNonDurableQueue" roles="guest"/>
<permission type="deleteNonDurableQueue" roles="guest"/>
<permission type="createDurableQueue" roles="guest"/>
<permission type="deleteDurableQueue" roles="guest"/>
<permission type="consume" roles="guest"/>
<permission type="send" roles="guest"/>
</security-setting>
</security-settings>
<address-settings>
<!--default for catch all-->
<address-setting match="#">
<dead-letter-address>jms.queue.DLQ</dead-letter-address>
<expiry-address>jms.queue.ExpiryQueue</expiry-address>
<redelivery-delay>0</redelivery-delay>
<max-size-bytes>10485760</max-size-bytes>
<message-counter-history-day-limit>10</message-counter-history-day-limit>
<address-full-policy>BLOCK</address-full-policy>
</address-setting>
</address-settings>
</configuration>
最佳答案
使用 hornetq 核心 api,您必须明确确认一条消息。我看不到您的测试中发生了什么。
如果您没有确认,这就是您的消息被阻止的原因。我需要查看您的完整示例才能为您提供完整的答案。
另外:你应该定义你的createSession:createSession(true, true, 0)
核心 API 具有批量 ACK 的选项。您没有使用事务处理 session ,因此在您达到 serverLocator 上配置的 ackBatchSize 之前,您不会向服务器发送 ack。有了这个,只要您在消息中调用 acknowledge() ,任何 ack 都会被发送到服务器。
您当前使用的选项相当于具有特定 DUPS_SIZE 的 JMS DUPS_OK。
(在与您进行一些迭代后发布编辑我的初始答案)
关于java - 使用核心 api 消费后 HornetQ 消息仍保留在队列中,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/6452505/
有没有办法在同一个虚拟机中启动多个 HornetQ 嵌入式代理?例如,在同一进程中运行且完全不相关的两个独立应用程序。 如果我尝试注册第二个 EmbeddedJms 实例,而一个实例已经在运行,这就是
我正在尝试用我自己的测试项目重新创建 HornetQ 示例。但是我遇到了类加载器问题。显然,我缺少文档中未指定的一些依赖项。 文档让我添加 hornetq-core-client.jar netty.
如何使用 HornetQ 以编程方式创建 JMS 主题和队列? 最佳答案 http://docs.jboss.org/hornetq/2.2.14.Final/user-manual/en/html/
我构建了一个 Spring JMS 解决方案,该解决方案是针对 JBoss 7.1.1 和外部 HornetQ 实现 2.2.1.4 构建的。此连接并成功运行。 但是我现在使用 EAP6,并尝试连接到
(在 11.11.11 编辑,底部编辑) 我有两个运行 HornetQ(2.2.5 AS7 版)服务器的 JBoss AS 6.1 服务器。 让我们称另一个为 CLIENT,另一个为 MASTER。
我正在尝试运行一本书(HornetQ Messaging Developers Guide)中的一个简单的 HornetQ 示例,但收到错误消息。我没有使用maven,因为我想坚持使用书中给出的示例。
我使用 SBT 0.13.1 . 当我添加 HornetQ 作为我的依赖项时: libraryDependencies += "org.hornetq" % "hornetq-server" % "2
我发现在我们的实际环境中经常发生以下异常: 2013-01-08 00:09:45,886 ERROR [org.jboss.aspects.tx.TxPolicy] (Thread-70534 (H
我在带有 hornetQ 的 ubuntu 上使用 jboss AS 6 Final 我使用管理面板在名为 Message Buffer Queue 的服务器上创建了一个新队列。 我收到以下错误: U
我正在使用 HornetQ 发送电子邮件。 文件附件使用对象存储系统进行带外传输(不作为消息的一部分)。这会增加一些开销,我想通过将小文件直接放入消息属性来避免这些开销。 我知道我可以发送巨大的消息体
我已阅读 HornetQ 常见问题解答,但找不到任何相关内容。我正在使用 HornetQ 2.2.5,它包含一个错误(是的,只有一个;)。根据这篇文章https://community.jboss.o
我已经配置了hornetq来处理大消息,但是如果客户端没有启动,大消息目录就会变得太大,消耗整个磁盘空间 请帮助弄清楚如何在邮件发送后删除或清理大型邮件文件夹? 也未在 jboss/server/de
我在独立模式下使用默认配置运行 hornetQ,我可以从本地系统连接到它,如果我想从另一个系统连接,必须更改哪些配置才能实现?! 最佳答案 您首先需要定义另一个系统的含义,您是指另一个 HornetQ
对于当前的项目,我尝试使用 JBoss 7.1 和 HornetQ (JMS) 设置以下场景,我认为这是一个相当常见的用例:有三个应用程序服务器。多个MDB应各自处理一个冗长计算过程的分解片段,任务应
我正在尝试独立运行 HornetQ 服务器。但我不知道该怎么做,我下载了 hornetq-2.0.0.GA 并使用 build.sh 启动服务器,但我希望通过 java 代码启动 HornetQ 服务
我想以某种方式延迟整个消息组的消息。 问题是,属于每个消息组的所有消息都必须按照它们发布的顺序进行处理。如果其中一条消息无法被使用 - 我们希望延迟它并延迟同一消息组中的其余消息。我不想阻止消费者 -
我使用 HornetQ 作为队列提供程序,因为它具有持久性功能。但是,在我重新启动应用程序后,队列中的所有消息都丢失了。可能是配置问题? 代码如下: // Step 1. Create the Con
我使用 EJB 来实现命令模式。 EJB 是执行业务逻辑的命令服务。我知道在 J2EE 中 EJB 管理事务以及事务超时。
我正在开发一个使用 HornetQ 作为消息传递服务器的应用程序。在 HornetQ 启动时,我总是收到与 HornetQ live lock 相关的错误。 我针对这个特定问题进行了研究,但无法得出相
我正在尝试将 ActiveMQ-CPP 与 HornetQ 一起使用。我正在使用 ActiveMQ-CPP 捆绑示例,但我很难使用它。生产者工作得很好,但消费者给我以下信息: * BEGIN SERV
我是一名优秀的程序员,十分优秀!