gpt4 book ai didi

java - 从 Java 读取来自 MQ 的所有消息

转载 作者:行者123 更新时间:2023-11-29 08:27:39 26 4
gpt4 key购买 nike

我开发了一个 Java 应用程序来读取来自 MQ 的消息。 Java 应用程序必须从 MQ 读取所有消息,将它们放入列表并返回列表。我正在使用 while 循环一条一条地读取消息,如果我捕获到 2033 异常并返回列表,则会中断。

我的问题是我在阅读单条消息后收到 2033 异常。例如,我已将大约 10 条消息推送到队列并运行我的应用程序,第一个循环读取第一条消息并将其放入列表中。但是在第二个循环中,它在获取第二条消息时抛出 2033 异常。然后我需要运行应用程序来读取第二条消息,同样的事情发生了。

由于我是 MQ 的新手,所以我找不到它发生的原因。我正在使用 Java8 和 IBM MQ 核心库。下面是我的代码..

package com.reciever.mq;


import com.ibm.mq.*;
import com.ibm.mq.constants.MQConstants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.*;


public class MQReceiver {

private static final Logger LOGGER = LoggerFactory.getLogger(MQReceiver.class);

private static int GET_OPTIONS_CONSTANT = MQConstants.MQGMO_WAIT |
MQConstants.MQGMO_PROPERTIES_COMPATIBILITY |
MQConstants.MQGMO_ALL_SEGMENTS_AVAILABLE |
MQConstants.MQGMO_COMPLETE_MSG |
MQConstants.MQGMO_ALL_MSGS_AVAILABLE |
MQConstants.MQGMO_SYNCPOINT;

public static MQQueueManager queueManager;

public static void main(String[] args) throws MQException {

Hashtable<String, Object> mqOptions = new Hashtable<>();

mqOptions.put(MQConstants.HOST_NAME_PROPERTY, "host.name"); //just a placeHolder
mqOptions.put(MQConstants.CHANNEL_PROPERTY, "channelName");
mqOptions.put(MQConstants.USER_ID_PROPERTY, "userName");
mqOptions.put(MQConstants.TRANSPORT_PROPERTY, MQConstants.TRANSPORT_MQSERIES);
mqOptions.put(MQConstants.PORT_PROPERTY, 1980);

queueManager = new MQQueueManager("queueManager", mqOptions);
int options = MQConstants.MQOO_INPUT_AS_Q_DEF | MQConstants.MQOO_OUTPUT
| MQConstants.MQOO_FAIL_IF_QUIESCING | MQConstants.MQOO_PASS_ALL_CONTEXT | MQConstants.MQOO_INQUIRE;

MQQueue mq = queueManager.accessQueue("queueNametoRead", options);
MQException.logExclude(2033);

MQMessage mqMessage = new MQMessage();
MQGetMessageOptions getOptions = new MQGetMessageOptions();
getOptions.options = GET_OPTIONS_CONSTANT;
getOptions.waitInterval = 1;

//byte[] buffer = null;

List<byte[]> msgList = new ArrayList<>();

System.out.println("Current depth : " + mq.getCurrentDepth());

boolean hasMsges = true;

while(hasMsges){

byte[] buffer = null;

try{

mq.get(mqMessage, getOptions);

buffer = new byte[mqMessage.getDataLength()];
mqMessage.readFully(buffer);

queueManager.commit();

System.out.println("Recieved Message....");
msgList.add(buffer);

} catch (MQException e) {
if((e.completionCode == MQConstants.MQCC_FAILED) &&
(e.reasonCode == MQConstants.MQRC_NO_MSG_AVAILABLE)){
System.out.println(" No more messages.. : " + e.getReason());

break;
}else if (e.getReason() != 2033) {
LOGGER.error("Error getting message from the queue: " + "", e);
}
} catch (IOException e) {
LOGGER.error("Error reading the message from the queue: " + "", e);
}
/* finally {
if(mq != null) mq.close();
if(queueManager != null) queueManager.disconnect();
}*/

}

System.out.println("Final List size : " + msgList.size());

}

}

下面是我运行程序后的系统输出

Current depth : 9
Recieved Message...
No more messages.. : 2033
Final List size : 1

从上面的输出来看,它打印的当前深度是9。但是它在第二次循环时爆发了。

最佳答案

消息 ID 和相关 ID 字段在从队列中获取消息时填充。如果您使用相同的 MQMessage 来获取下一条消息,那么您必须首先重置这两个字段。所以就在您调用 get 方法之前添加这两行。

 mqMessage.messageId     = MQConstants.MQMI_NONE;
mqMessage.correlationId = MQConstants.MQCI_NONE;

关于java - 从 Java 读取来自 MQ 的所有消息,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/51239671/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com