- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
您好,我是 HornetQ 的新手。我已经编写了一个测试包,它有一个带有生产者和异步消费者的嵌入式 HornetQ 服务器。希望我已经正确实现了。现在问题如下……
当我通过 prodcuer 向队列发送消息时,它返回成功,但是当尝试使用队列中的消息时,似乎没有消息被使用。就好像消费者是不活跃的。
当我尝试使用 setSendAcknowledgementHandler
方法时出现错误:
java.lang.IllegalStateException: You can't set confirmationHandler on a connection with confirmation-window-size < 0. Look at the documentation for more information.
at org.hornetq.core.protocol.core.impl.ChannelImpl.setCommandConfirmationHandler(ChannelImpl.java:330)
at org.hornetq.core.client.impl.ClientSessionImpl.setSendAcknowledgementHandler(ClientSessionImpl.java:943)
at org.hornetq.core.client.impl.DelegatingSession.setSendAcknowledgementHandler(DelegatingSession.java:493)
at componentsII.QueueProducer.SendMessage(QueueProducer.java:9)
at componentsII.StartClients.main(StartClients.java:11)
检查下面的类。
连接和 session
public class QueueConnection {
private static org.hornetq.api.core.TransportConfiguration transport;
private static org.hornetq.api.core.client.ClientSessionFactory sharedfactory;
public static org.hornetq.api.core.client.ClientSession sharedSession;
private static java.util.HashMap<String,Object> maps;
private static boolean started= false;
public static void setMaps(String []key, Object[] value){
maps= new java.util.HashMap<String, Object>() ;
if(key.length!=value.length){
maps=null;
}else{
for(int x=0;x<value.length;x++){
maps.put(key[x], value[x]);
}
}
}
private static org.hornetq.api.core.client.ClientSession session(){
try{
if(sharedSession==null){
sharedSession=factory().createSession(true,true,0);
}
}catch(org.hornetq.api.core.HornetQException e){
e.printStackTrace();
}catch(Exception e){
e.printStackTrace();
}
return sharedSession;
}
public static synchronized int size(String queueName){
int count = 0;
try {
org.hornetq.api.core.client.ClientSession.QueueQuery result;
result = sharedSession.queueQuery(new org.hornetq.api.core.SimpleString(queueName));
count = (int) result.getMessageCount();
} catch (org.hornetq.api.core.HornetQException e) {
e.printStackTrace();
}
return count;
}
public static void startSession(){
try{
if (session() != null || started!=true){
session().start();
started=true;
System.out.println("Client Session started");
}else{
System.out.println("Client Session already started");
}
}catch(org.hornetq.api.core.HornetQException e){
e.printStackTrace();
}catch(Exception e){
e.printStackTrace();
}
}
public static void stopSession(){
try{
if (session() != null ){
session().stop();
factory().close();
sharedSession=null;
sharedfactory=null;
System.out.println("Client Session stopped");
}
}catch(org.hornetq.api.core.HornetQException e){
e.printStackTrace();
}catch(Exception e){
e.printStackTrace();
}
}
private static org.hornetq.api.core.TransportConfiguration TransportConfigs(){
transport=new org.hornetq.api.core.TransportConfiguration(org.hornetq.core.remoting.impl.netty.NettyConnectorFactory.class.getName(),maps);
return transport;
}
private static org.hornetq.api.core.client.ClientSessionFactory factory(){
try{
if(sharedfactory==null){
org.hornetq.api.core.client.ServerLocator locator=org.hornetq.api.core.client.HornetQClient.createServerLocator(true,TransportConfigs());
locator.setAckBatchSize(0);
locator.setReconnectAttempts(3);
locator.setConfirmationWindowSize(2);
sharedfactory=locator.createSessionFactory();
}
}catch(Exception e){
e.printStackTrace();
}
return sharedfactory;
}
}
消费者
public class QueueConsumer {
public static void Recieve(String queuename){
try {
org.hornetq.api.core.client.ClientConsumer consumer = QueueConnection.sharedSession.createConsumer(queuename);
consumer.setMessageHandler(new msghandler());
} catch (org.hornetq.api.core.HornetQException e) {
e.printStackTrace();
} catch(Exception e){
e.printStackTrace();
}
}
private static class msghandler implements org.hornetq.api.core.client.MessageHandler {
@Override
public void onMessage(org.hornetq.api.core.client.ClientMessage msg) {
System.out.println("Message consumed ~"+msg.getStringProperty("myMsg"));
}
}
}
制作人
public class QueueProducer {
public static void SendMessage(String queuename,String msg){
try{
QueueConnection.sharedSession.setSendAcknowledgementHandler(new acknowledegeHandler());
org.hornetq.api.core.client.ClientProducer producer= QueueConnection.sharedSession.createProducer(queuename);
org.hornetq.api.core.client.ClientMessage message = QueueConnection.sharedSession.createMessage(org.hornetq.api.core.client.ClientMessage.TEXT_TYPE,true);
message.putStringProperty("myMsg", msg);
producer.send(queuename,message);
System.out.println("Message Sent to "+queuename);
}catch(org.hornetq.api.core.HornetQException e){
e.printStackTrace();
}catch(Exception e){
e.printStackTrace();
}
}
private static class acknowledegeHandler implements org.hornetq.api.core.client.SendAcknowledgementHandler{
@Override
public void sendAcknowledged( org.hornetq.api.core.Message msg) {
System.out.println("Received acknowledgement for message ~: "+msg.getStringProperty("myMsg"));
}
}
}
初始化客户端
public class StartClients {
private static void initialize(){
String keys[]={org.hornetq.core.remoting.impl.netty.TransportConstants.HOST_PROP_NAME,org.hornetq.core.remoting.impl.netty.TransportConstants.PORT_PROP_NAME};
Object values[]={"localhost",5664};
QueueConnection.setMaps(keys,values);
QueueConnection.startSession();
}
public static void main(String []args){
new Thread(){@Override
public void run(){
System.out.println("Producer Thread");
StartClients.initialize();
String QueueName= "queues.Queue1";
for(int a=1;a<10;a++){
QueueProducer.SendMessage(QueueName, "Message "+a+" to the embedded HornetQ Server");
}
System.out.println("Queue "+QueueName+" has "+QueueConnection.size(QueueName)+" messages on send");
QueueConnection.stopSession();
}}.start();
new Thread(){@Override
public void run(){
try{
sleep(5000);
System.out.println();
System.out.println("Consumer Thread");
StartClients.initialize();
String QueueName= "queues.Queue1";
QueueConsumer.Recieve(QueueName);
System.out.println("Queue "+QueueName+" has "+QueueConnection.size(QueueName)+" messages after consumption");
QueueConnection.stopSession();
}catch(java.lang.InterruptedException e){
e.printStackTrace();
}catch(Exception e){
e.printStackTrace();
}
}}.start();
}
}
嵌入式服务器
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.core.config.Configuration;
import org.hornetq.core.config.CoreQueueConfiguration;
import org.hornetq.core.config.impl.ConfigurationImpl;
import org.hornetq.core.remoting.impl.netty.NettyAcceptorFactory;
import org.hornetq.core.remoting.impl.netty.TransportConstants;
import org.hornetq.core.server.JournalType;
import org.hornetq.core.server.embedded.EmbeddedHornetQ;
public class QueueServer {
public static void StartServer(){
try {
//Connection configurations
Map<String, Object> params = new HashMap<String, Object>();
params.put(TransportConstants.HOST_PROP_NAME, "localhost");
params.put(TransportConstants.PORT_PROP_NAME, 5664);
params.put(TransportConstants.USE_NIO_PROP_NAME, true);
params.put(TransportConstants.TCP_NODELAY_PROPNAME, true);
params.put(TransportConstants.NIO_REMOTING_THREADS_PROPNAME,3);
Map<String, Object> params2 = new HashMap<String, Object>();
params2.put(TransportConstants.HOST_PROP_NAME, "localhost");
params2.put(TransportConstants.PORT_PROP_NAME, 5665);
params2.put(TransportConstants.USE_NIO_PROP_NAME, true);
params2.put(TransportConstants.TCP_NODELAY_PROPNAME, true);
params2.put(TransportConstants.NIO_REMOTING_THREADS_PROPNAME,3);
Map<String, Object> params3 = new HashMap<String, Object>();
params3.put(TransportConstants.HOST_PROP_NAME, "localhost");
params3.put(TransportConstants.PORT_PROP_NAME, 5666);
params3.put(TransportConstants.USE_NIO_PROP_NAME, true);
params3.put(TransportConstants.TCP_NODELAY_PROPNAME, true);
params3.put(TransportConstants.NIO_REMOTING_THREADS_PROPNAME,3);
Map<String, Object> params4 = new HashMap<String, Object>();
params4.put(TransportConstants.HOST_PROP_NAME, "localhost");
params4.put(TransportConstants.PORT_PROP_NAME, 5667);
params4.put(TransportConstants.USE_NIO_PROP_NAME, true);
params4.put(TransportConstants.TCP_NODELAY_PROPNAME, true);
params4.put(TransportConstants.NIO_REMOTING_THREADS_PROPNAME,3);
//Server configurations
Configuration config= new ConfigurationImpl();
HashSet<TransportConfiguration>transports= new HashSet <TransportConfiguration>();
transports.add(new TransportConfiguration(NettyAcceptorFactory.class.getName(),params));
transports.add(new TransportConfiguration(NettyAcceptorFactory.class.getName(),params2));
transports.add(new TransportConfiguration(NettyAcceptorFactory.class.getName(),params3));
transports.add(new TransportConfiguration(NettyAcceptorFactory.class.getName(),params4));
//Queues Configurations
List<CoreQueueConfiguration> queueConfigs = new ArrayList<CoreQueueConfiguration>();
String queueName="queues.Queue";
for(int x=1;x<5;x++){
queueConfigs.add(new CoreQueueConfiguration(queueName.concat(String.valueOf(x)),queueName.concat(String.valueOf(x)), null, true));
}
//Set Configurations
config.setAcceptorConfigurations(transports);
config.setQueueConfigurations(queueConfigs);
config.setJournalType(JournalType.NIO);
config.setSecurityEnabled(false);
//Starting server
EmbeddedHornetQ embedded = new EmbeddedHornetQ();
embedded.setConfiguration(config);
embedded.start();
Thread.sleep(6000000);
embedded.stop();
} catch (Exception ex) {
Logger.getLogger(QueueServer.class.getName()).log(Level.SEVERE, null, ex);
}
}
public static void main(String [] args){
StartServer();
}
}
我哪里出错了?
最佳答案
需要在ClientSessionFactory中定义确认窗口大小相关的结算方法。
你可以在这里查看文档
http://docs.jboss.org/hornetq/2.2.5.Final/user-manual/en/html/client-reconnection.html
如果还要检查
关于java - 大黄蜂Q : Core API Producer and Asynchronous Consumer problems,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/14094101/
我有一个生产者/消费者场景,我不希望一个生产者交付产品,也不希望多个消费者消费这些产品。然而,常见的情况是交付的产品仅由一个消费者消费,而其他消费者永远看不到该特定产品。我不想完成的是每个消费者消费一
我正在设计一个系统,其中将有 n 个生产者和 m 个消费者,其中 n 和 m 是数字,n != m。 我想这样设计系统, 任何生产者在生产时不得阻止其他生产者 任何消费者都不应在消费时阻止其他消费者
关于 REST Web 服务。 @Produces("application/json") 和 @Produces(MediaType.APPICATION_JSON) 两者的工作方式相同,但第二个需
我正在尝试使用 Kafka: import java.util.Properties; import org.apache.kafka.clients.producer.Producer; impor
当我使用 Producer.flush() 时,它可以工作,但根据 kafka confluent issue 性能较差,但按照建议,我使用 Producer.poll(0) 但不会向主题生成任何消息
我正在针对 Python 的 confluent-kafka 使用 native java 实现测试 Apache Kafka Producer,以查看哪个具有最大吞吐量。 我正在使用 docker-
我看到 @products 注释允许我传递单个字符串和字符串列表。所以我只是想知道这是如何在java中完成的,如果我需要使用允许以下行为的方法来实现它,我该怎么做?或者这个注释是特定的,所以我们不能在
我正在开发一个迁移学习应用程序,我正在其中针对我的数据流重新训练 MobileNetV2。 我正在使用 retrain.py 重新训练模型来自tensorflow-hub并且没有做任何修改。 当我从终
在 Cloud Foundry 中,我能够向非 ssl url(“kafkaURL:9092”)生成消息。但它不适用于 ssl url(“kafkaURL:9093”)。 Kafka 服务器版本 0.
我正在使用 kafka 向消费者发送消息。但是由于某种原因,当我使用 Producer.send(record, new MyProducerCallback()); 向主题发送记录时,该主题的使用者
我正在编写一个演示应用程序来创建一个 Kafka Producer。我创建了一个主题并在 Kafka 上运行了一个生产者和消费者,它似乎正在工作。我正在编写一个 spring 应用程序来创建一个生产者
我在我的项目中使用 spring boot v2.2.4 和 Apache Kafka。 下面是我的pom.xml文件: org.springframewo
我正在尝试使用 java 程序制作 Kafka 生产者。但是当我运行程序时我收到了一些警告,没有任何错误但是生产者没有发送数据并且警告如下所示。 [kafka-producer-network-thr
我正在尝试加载一个简单的文本文件而不是 Kafka 中的标准输入。下载 Kafka 后,我执行了以下步骤: 启动动物园管理员: bin/zookeeper-server-start.sh config
我有一个类,它生成一个 ElasticSearch 客户端以与 @Inject 一起使用 @Produces @ApplicationScoped public Client createClient
对于一个新项目,我们在客户端使用 jQuery 组件,其中之一是 blueImp 文件 uploader 。我们愉快地编写代码,在 Chrome 和 Firefox 中一切都运行良好……直到有人尝试在
我有一些开发要做,我尝试看看是否有可以使用的设计模式。问题很简单: 我有一个启动许多线程的主线程。主线程必须等待每个线程完成然后再做其他事情。现有的代码有点难看。我有一个 while 循环来检查线程组
我正在使用驱动对象模型工具 CodeFluentEntities以便将模型部署到数据库引擎。 我正在考虑使用 localStorage 数据库引擎(如 IndexedDB 或 Web SQL)来为没有
我无法停止 ActiveMQ Producer。 场景是:我为内存使用和临时存储设置了较低的值。
我正在尝试结合使用 CDI (weld-se 2) 和 JavaFX,并且我想使用自定义创建的注释来注释我的 Controller 类,以便使用我的工厂方法管理此类创建。我想应该如下所示,但这段代码不
我是一名优秀的程序员,十分优秀!