gpt4 book ai didi

java - ActiveMQ中的并发消息消费

转载 作者:行者123 更新时间:2023-11-30 08:13:06 27 4
gpt4 key购买 nike

我使用ActiveMQServer作为代理。

服务器.java

import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;

import org.apache.activemq.api.core.TransportConfiguration;
import org.apache.activemq.core.config.Configuration;
import org.apache.activemq.core.config.impl.ConfigurationImpl;
import org.apache.activemq.core.remoting.impl.netty.NettyAcceptorFactory;
import org.apache.activemq.core.server.ActiveMQServer;
import org.apache.activemq.core.server.ActiveMQServers;

public class Server {
public static void main(final String arg[]) throws Exception
{
try
{
// Step 1. Create the Configuration, and set the properties accordingly
Configuration configuration = new ConfigurationImpl();
//we only need this for the server lock file
configuration.setJournalDirectory("target/data/journal");
configuration.setPersistenceEnabled(false); // http://activemq.apache.org/what-is-the-difference-between-persistent-and-non-persistent-delivery.html
configuration.setSecurityEnabled(false); // http://activemq.apache.org/security.html
/**
* this map with configuration values is not necessary (it configures the default values).
* If you want to modify it to run the example in two different hosts, remember to also
* modify the client's Connector at {@link EmbeddedRemoteExample}.
*/
Map<String, Object> map = new HashMap<String, Object>();
map.put("host", "localhost");
map.put("port", 61616);

// https://access.redhat.com/documentation/en-US/JBoss_Enterprise_Application_Platform/5/html/HornetQ_User_Guide/ch14s04.html
TransportConfiguration transpConf = new TransportConfiguration(NettyAcceptorFactory.class.getName(),map);

HashSet<TransportConfiguration> setTransp = new HashSet<TransportConfiguration>();
setTransp.add(transpConf);

configuration.setAcceptorConfigurations(setTransp); // https://github.com/apache/activemq-6/blob/master/activemq-server/src/main/java/org/apache/activemq/spi/core/remoting/Acceptor.java

// Step 2. Create and start the server
ActiveMQServer server = ActiveMQServers.newActiveMQServer(configuration);
server.start();
}
catch (Exception e)
{
e.printStackTrace();
throw e;
}
}
}

我有一个 Client.java 类,我在其中创建队列、消息生产者和消息消费者。

Client.java

import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;

import org.apache.activemq.api.core.ActiveMQException;
import org.apache.activemq.api.core.SimpleString;
import org.apache.activemq.api.core.TransportConfiguration;
import org.apache.activemq.api.core.client.ActiveMQClient;
import org.apache.activemq.api.core.client.ClientConsumer;
import org.apache.activemq.api.core.client.ClientMessage;
import org.apache.activemq.api.core.client.ClientProducer;
import org.apache.activemq.api.core.client.ClientSession;
import org.apache.activemq.api.core.client.ClientSessionFactory;
import org.apache.activemq.api.core.client.MessageHandler;
import org.apache.activemq.api.core.client.ServerLocator;
import org.apache.activemq.core.remoting.impl.netty.NettyConnectorFactory;

public class Client
{

private static final String queueName = "queue.exampleQueue";
private static final String propName = "myprop";

ClientSessionFactory sf = null;
ClientSession session = null;
ClientProducer producer = null;
ClientMessage message = null;
ClientConsumer consumer = null;
String name;

public Client(String name){
this.name = name;
}

public void initializeComponents(){

try
{
// Step 3. As we are not using a JNDI environment we instantiate the objects directly

/**
* this map with configuration values is not necessary (it configures the default values).
* If you modify it to run the example in two different hosts, remember to also modify the
* server's Acceptor at {@link EmbeddedServer}
*/
Map<String,Object> map = new HashMap<String,Object>();
map.put("host", "localhost");
map.put("port", 61616);
// -------------------------------------------------------

ServerLocator serverLocator = ActiveMQClient.createServerLocatorWithoutHA(new TransportConfiguration(NettyConnectorFactory.class.getName(), map));
sf = serverLocator.createSessionFactory();

// Step 4. Create a core queue
ClientSession coreSession = sf.createSession(false, false, false);

if (!coreSession.queueQuery(new SimpleString(queueName)).isExists())
coreSession.createTemporaryQueue(queueName, queueName);

coreSession.close();

// Step 5. Create the session, and producer
session = sf.createSession();

producer = session.createProducer(queueName); //

// Step 7. Create the message consumer and start the connection
consumer = session.createConsumer(queueName);
session.start();

// Step 8. Receive the message.
consumer.setMessageHandler(new MessageHandler(){

public void onMessage(ClientMessage message)
{
System.out.println("client " + name + " received message " + message.getStringProperty(propName));
}

});

}
catch (Exception e)
{
e.printStackTrace();
}

}

public void sendMessage(String messageText){

// Step 6. Create and send a message
message = session.createMessage(false);
message.putStringProperty(propName, messageText);

try {
System.out.println("Producer is going to send a message");
producer.send(message);
} catch (ActiveMQException e) {
e.printStackTrace();
}
}

public void cleanUpConnection(){
if (sf != null)
{
sf.close();
}
}

}

在这个 main 中,我创建了一个队列、两个生产者和两个消费者。

       public static void main(final String[] args)
{
Client cl1 = new Client("cl1");
cl1.initializeComponents();

Client cl2 = new Client("cl2");
cl2.initializeComponents();

for (int i = 0; i < 10; i++){
try {

Date date = new Date();
String formattedDate = new SimpleDateFormat("MM/dd/yyyy HH:mm:ss").format(date);

cl1.sendMessage(formattedDate + " number of iteration " + i);
Thread.sleep(2000);

} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}

}

cl1.cleanUpConnection();
cl2.cleanUpConnection();

}

这是 main 的输出:

Producer is going to send a message
client cl1 received message 05/06/2015 16:56:22 number of iteration 0
Producer is going to send a message
client cl2 received message 05/06/2015 16:56:24 number of iteration 1
Producer is going to send a message
client cl1 received message 05/06/2015 16:56:26 number of iteration 2
Producer is going to send a message
client cl2 received message 05/06/2015 16:56:28 number of iteration 3
Producer is going to send a message
client cl1 received message 05/06/2015 16:56:30 number of iteration 4
Producer is going to send a message
client cl2 received message 05/06/2015 16:56:32 number of iteration 5
Producer is going to send a message
client cl1 received message 05/06/2015 16:56:34 number of iteration 6
Producer is going to send a message
client cl2 received message 05/06/2015 16:56:36 number of iteration 7
Producer is going to send a message
client cl1 received message 05/06/2015 16:56:38 number of iteration 8
Producer is going to send a message
client cl2 received message 05/06/2015 16:56:40 number of iteration 9
client cl2 received message 05/06/2015 16:56:22 number of iteration 0
client cl2 received message 05/06/2015 16:56:26 number of iteration 2
client cl2 received message 05/06/2015 16:56:30 number of iteration 4
client cl2 received message 05/06/2015 16:56:34 number of iteration 6
client cl2 received message 05/06/2015 16:56:38 number of iteration 8

我想问一下,如何让所有消费者并发消息消费。

我的意思是:

client cl1 received ... message of iteration 0
client cl2 received ... message of iteration 0
client cl1 received ... message of iteration 1
client cl2 received ... message of iteration 1

我找到了prefetch limit设置,但不知道如何使用 ActiveMQConnectionFactoryActiveMQConnection 类,而不在 client.java 类中进行重构。是否有其他选项可以使所有消费者并发消息消费?

最佳答案

每次迭代您仅发送一条消息,因此只有一个客户端会收到消息。如果您希望每个客户端接收相同的消息,请使用主题而不是队列。

关于java - ActiveMQ中的并发消息消费,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/30080780/

27 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com