gpt4 book ai didi

java - 我怎样才能正确地将数据分配给两个消费者?

转载 作者:太空宇宙 更新时间:2023-11-04 11:42:38 26 4
gpt4 key购买 nike

在我的应用程序中,我使用主动 mq,在消费者端,我运行两个实例(消费者)来处理请求。由于两个实例监听同一个队列,因此发生了一些冲突。如果多次接收相同的数据,一个请求处理多次,为了克服这个问题并与两个实例进行通信,我实现了 hazelcast 并且它运行良好,但有时数据未正确分配到两个实例中,如果我发送批量数据,则只有一个实例正在处理所有任务。

我在生产者端使用的代码。

public synchronized static void createMQRequestForPoster(Long zoneId, List<String> postJobs, int priority) throws JMSException {

Connection connection = null;
try {
connection = factory.createConnection();

connection.start();
Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
Queue queue = session.createQueue("customQueue");
MessageProducer producer = session.createProducer(queue);
producer.setDeliveryMode(DeliveryMode.PERSISTENT);

logger.info("List of jobs adding into poster queue: "+postJobs.size());
for(String str : postJobs) {

TextMessage message = session.createTextMessage();
JSONObject obj = new JSONObject();
obj.put("priority", priority);
obj.put("zoneId", zoneId);
obj.put("postJobs", str);
logger.debug(obj.toString());
message.setText(obj.toString());
message.setIntProperty(ActiveMQReqProcessor.REQ_TYPE, 0);
producer.send(message);
}
} catch (JMSException | JSONException e) {
logger.warn("Failed to add poster request to ActiveMq", e);
} finally {
if(connection != null)
connection.close();
}
}

我在消费者端使用的代码。

私有(private)静态无效activeMQPostProcessor() {

    ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(AppCoding.NZ_JMS_URL);
Connection connection = null;
try {
connection = connectionFactory.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue("customQueue");


MessageConsumer consumer = session.createConsumer(queue);
MessageListener listener = new MessageListener() {

@Override
public void onMessage(Message message) {

if (message instanceof TextMessage) {

try {
TextMessage textMessage = (TextMessage) message;
logger.info("Received message " + textMessage.getText());
JSONObject jsonObj = new JSONObject(textMessage.getText());
HazelcastClusterInstance.getInstance().add(processOnPoster(jsonObj));
message.acknowledge();
} catch (JSONException e) {

} catch (JMSException e) {

}

logger.info("Adding Raw Message to Internal Queue");
}
}
};
consumer.setMessageListener(listener);
logger.info("Waiting for new posts from selector, scheduler.");

} catch (JMSException e) {
logger.info(e);
}
}

我只观察过这种情况。我该如何处理这个问题?

最佳答案

您很可能会看到预取消息的平衡不平衡。消费者连接并请求预取 block ,第一个 block 往往会在第二个 block 进入之前获得大批量,因此它似乎会占用消息。

您需要根据您的用例调整预取,请参阅prefetch documentation

关于java - 我怎样才能正确地将数据分配给两个消费者?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/42640569/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com