gpt4 book ai didi

java - 大黄蜂Q : Core API Producer and Asynchronous Consumer problems

转载 作者:行者123 更新时间:2023-11-30 11:37:20 25 4
gpt4 key购买 nike

您好,我是 HornetQ 的新手。我已经编写了一个测试包,它有一个带有生产者和异步消费者的嵌入式 HornetQ 服务器。希望我已经正确实现了。现在问题如下……

  1. 当我通过 prodcuer 向队列发送消息时,它返回成功,但是当尝试使用队列中的消息时,似乎没有消息被使用。就好像消费者是不活跃的。

  2. 当我尝试使用 setSendAcknowledgementHandler 方法时出现错误:

    java.lang.IllegalStateException: You can't set confirmationHandler on a connection with confirmation-window-size < 0. Look at the documentation for more information.
    at org.hornetq.core.protocol.core.impl.ChannelImpl.setCommandConfirmationHandler(ChannelImpl.java:330)
    at org.hornetq.core.client.impl.ClientSessionImpl.setSendAcknowledgementHandler(ClientSessionImpl.java:943)
    at org.hornetq.core.client.impl.DelegatingSession.setSendAcknowledgementHandler(DelegatingSession.java:493)
    at componentsII.QueueProducer.SendMessage(QueueProducer.java:9)
    at componentsII.StartClients.main(StartClients.java:11)

检查下面的类。

连接和 session

public class QueueConnection {

private static org.hornetq.api.core.TransportConfiguration transport;
private static org.hornetq.api.core.client.ClientSessionFactory sharedfactory;
public static org.hornetq.api.core.client.ClientSession sharedSession;
private static java.util.HashMap<String,Object> maps;
private static boolean started= false;


public static void setMaps(String []key, Object[] value){
maps= new java.util.HashMap<String, Object>() ;

if(key.length!=value.length){
maps=null;
}else{
for(int x=0;x<value.length;x++){
maps.put(key[x], value[x]);
}
}

}

private static org.hornetq.api.core.client.ClientSession session(){
try{
if(sharedSession==null){
sharedSession=factory().createSession(true,true,0);
}
}catch(org.hornetq.api.core.HornetQException e){
e.printStackTrace();
}catch(Exception e){
e.printStackTrace();
}
return sharedSession;
}

public static synchronized int size(String queueName){
int count = 0;
try {
org.hornetq.api.core.client.ClientSession.QueueQuery result;
result = sharedSession.queueQuery(new org.hornetq.api.core.SimpleString(queueName));
count = (int) result.getMessageCount();
} catch (org.hornetq.api.core.HornetQException e) {
e.printStackTrace();
}
return count;
}
public static void startSession(){
try{

if (session() != null || started!=true){
session().start();
started=true;
System.out.println("Client Session started");
}else{
System.out.println("Client Session already started");
}
}catch(org.hornetq.api.core.HornetQException e){
e.printStackTrace();
}catch(Exception e){
e.printStackTrace();
}
}

public static void stopSession(){
try{
if (session() != null ){
session().stop();
factory().close();
sharedSession=null;
sharedfactory=null;
System.out.println("Client Session stopped");
}
}catch(org.hornetq.api.core.HornetQException e){
e.printStackTrace();
}catch(Exception e){
e.printStackTrace();
}
}

private static org.hornetq.api.core.TransportConfiguration TransportConfigs(){
transport=new org.hornetq.api.core.TransportConfiguration(org.hornetq.core.remoting.impl.netty.NettyConnectorFactory.class.getName(),maps);
return transport;
}

private static org.hornetq.api.core.client.ClientSessionFactory factory(){
try{
if(sharedfactory==null){
org.hornetq.api.core.client.ServerLocator locator=org.hornetq.api.core.client.HornetQClient.createServerLocator(true,TransportConfigs());
locator.setAckBatchSize(0);
locator.setReconnectAttempts(3);
locator.setConfirmationWindowSize(2);
sharedfactory=locator.createSessionFactory();
}
}catch(Exception e){
e.printStackTrace();
}
return sharedfactory;
}
}

消费者

public class QueueConsumer {


public static void Recieve(String queuename){
try {
org.hornetq.api.core.client.ClientConsumer consumer = QueueConnection.sharedSession.createConsumer(queuename);
consumer.setMessageHandler(new msghandler());
} catch (org.hornetq.api.core.HornetQException e) {
e.printStackTrace();
} catch(Exception e){
e.printStackTrace();
}
}

private static class msghandler implements org.hornetq.api.core.client.MessageHandler {
@Override
public void onMessage(org.hornetq.api.core.client.ClientMessage msg) {
System.out.println("Message consumed ~"+msg.getStringProperty("myMsg"));

}
}
}

制作人

public class QueueProducer {

public static void SendMessage(String queuename,String msg){
try{
QueueConnection.sharedSession.setSendAcknowledgementHandler(new acknowledegeHandler());
org.hornetq.api.core.client.ClientProducer producer= QueueConnection.sharedSession.createProducer(queuename);
org.hornetq.api.core.client.ClientMessage message = QueueConnection.sharedSession.createMessage(org.hornetq.api.core.client.ClientMessage.TEXT_TYPE,true);
message.putStringProperty("myMsg", msg);
producer.send(queuename,message);
System.out.println("Message Sent to "+queuename);

}catch(org.hornetq.api.core.HornetQException e){
e.printStackTrace();
}catch(Exception e){
e.printStackTrace();
}
}

private static class acknowledegeHandler implements org.hornetq.api.core.client.SendAcknowledgementHandler{
@Override
public void sendAcknowledged( org.hornetq.api.core.Message msg) {
System.out.println("Received acknowledgement for message ~: "+msg.getStringProperty("myMsg"));
}
}
}

初始化客户端

public class StartClients {

private static void initialize(){
String keys[]={org.hornetq.core.remoting.impl.netty.TransportConstants.HOST_PROP_NAME,org.hornetq.core.remoting.impl.netty.TransportConstants.PORT_PROP_NAME};
Object values[]={"localhost",5664};

QueueConnection.setMaps(keys,values);
QueueConnection.startSession();
}
public static void main(String []args){

new Thread(){@Override
public void run(){
System.out.println("Producer Thread");
StartClients.initialize();
String QueueName= "queues.Queue1";

for(int a=1;a<10;a++){
QueueProducer.SendMessage(QueueName, "Message "+a+" to the embedded HornetQ Server");
}
System.out.println("Queue "+QueueName+" has "+QueueConnection.size(QueueName)+" messages on send");
QueueConnection.stopSession();
}}.start();

new Thread(){@Override
public void run(){
try{
sleep(5000);
System.out.println();
System.out.println("Consumer Thread");
StartClients.initialize();
String QueueName= "queues.Queue1";
QueueConsumer.Recieve(QueueName);
System.out.println("Queue "+QueueName+" has "+QueueConnection.size(QueueName)+" messages after consumption");
QueueConnection.stopSession();
}catch(java.lang.InterruptedException e){
e.printStackTrace();
}catch(Exception e){
e.printStackTrace();
}
}}.start();

}
}

嵌入式服务器

import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.core.config.Configuration;
import org.hornetq.core.config.CoreQueueConfiguration;
import org.hornetq.core.config.impl.ConfigurationImpl;
import org.hornetq.core.remoting.impl.netty.NettyAcceptorFactory;
import org.hornetq.core.remoting.impl.netty.TransportConstants;
import org.hornetq.core.server.JournalType;
import org.hornetq.core.server.embedded.EmbeddedHornetQ;

public class QueueServer {
public static void StartServer(){

try {

//Connection configurations
Map<String, Object> params = new HashMap<String, Object>();
params.put(TransportConstants.HOST_PROP_NAME, "localhost");
params.put(TransportConstants.PORT_PROP_NAME, 5664);
params.put(TransportConstants.USE_NIO_PROP_NAME, true);
params.put(TransportConstants.TCP_NODELAY_PROPNAME, true);
params.put(TransportConstants.NIO_REMOTING_THREADS_PROPNAME,3);

Map<String, Object> params2 = new HashMap<String, Object>();
params2.put(TransportConstants.HOST_PROP_NAME, "localhost");
params2.put(TransportConstants.PORT_PROP_NAME, 5665);
params2.put(TransportConstants.USE_NIO_PROP_NAME, true);
params2.put(TransportConstants.TCP_NODELAY_PROPNAME, true);
params2.put(TransportConstants.NIO_REMOTING_THREADS_PROPNAME,3);

Map<String, Object> params3 = new HashMap<String, Object>();
params3.put(TransportConstants.HOST_PROP_NAME, "localhost");
params3.put(TransportConstants.PORT_PROP_NAME, 5666);
params3.put(TransportConstants.USE_NIO_PROP_NAME, true);
params3.put(TransportConstants.TCP_NODELAY_PROPNAME, true);
params3.put(TransportConstants.NIO_REMOTING_THREADS_PROPNAME,3);

Map<String, Object> params4 = new HashMap<String, Object>();
params4.put(TransportConstants.HOST_PROP_NAME, "localhost");
params4.put(TransportConstants.PORT_PROP_NAME, 5667);
params4.put(TransportConstants.USE_NIO_PROP_NAME, true);
params4.put(TransportConstants.TCP_NODELAY_PROPNAME, true);
params4.put(TransportConstants.NIO_REMOTING_THREADS_PROPNAME,3);

//Server configurations
Configuration config= new ConfigurationImpl();
HashSet<TransportConfiguration>transports= new HashSet <TransportConfiguration>();
transports.add(new TransportConfiguration(NettyAcceptorFactory.class.getName(),params));
transports.add(new TransportConfiguration(NettyAcceptorFactory.class.getName(),params2));
transports.add(new TransportConfiguration(NettyAcceptorFactory.class.getName(),params3));
transports.add(new TransportConfiguration(NettyAcceptorFactory.class.getName(),params4));


//Queues Configurations
List<CoreQueueConfiguration> queueConfigs = new ArrayList<CoreQueueConfiguration>();
String queueName="queues.Queue";
for(int x=1;x<5;x++){
queueConfigs.add(new CoreQueueConfiguration(queueName.concat(String.valueOf(x)),queueName.concat(String.valueOf(x)), null, true));
}

//Set Configurations
config.setAcceptorConfigurations(transports);
config.setQueueConfigurations(queueConfigs);
config.setJournalType(JournalType.NIO);
config.setSecurityEnabled(false);

//Starting server
EmbeddedHornetQ embedded = new EmbeddedHornetQ();
embedded.setConfiguration(config);
embedded.start();
Thread.sleep(6000000);
embedded.stop();

} catch (Exception ex) {
Logger.getLogger(QueueServer.class.getName()).log(Level.SEVERE, null, ex);
}

}
public static void main(String [] args){
StartServer();
}
}

我哪里出错了?

最佳答案

需要在ClientSessionFactory中定义确认窗口大小相关的结算方法。

你可以在这里查看文档

http://docs.jboss.org/hornetq/2.2.5.Final/user-manual/en/html/client-reconnection.html

如果还要检查

HornetQ Messaging developers' guide

关于java - 大黄蜂Q : Core API Producer and Asynchronous Consumer problems,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/14094101/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com