gpt4 book ai didi

java - 我如何知道 ACK 对应于 MQTT 上的哪个发布消息?

转载 作者:行者123 更新时间:2023-11-29 04:36:28 26 4
gpt4 key购买 nike

我正在为 Mqtt paho 驱动程序苦苦挣扎......

每当收到我的发布时,我正在使用 IMqttDeliveryToken 从服务器获取确认。

为了将它与实际发布的消息进行比较,我在 MqttMessage 上设置了一个 ID,以便从 IMqttDeliveryToken 中检索它...但是它不起作用...IMqttDeliveryToken.getMessageId() 返回了一个不正确的 ID当我尝试在 QoS 不同于 0 的 IMqttDeliveryToken.getMessage() 之后获取 ID 时,它返回 NPE。

阅读 Javadoc 后,我了解到这是通常的行为:

Until the message has been delivered, the message being delivered will be returned. Once the message has been delivered null will be returned.

这让我想到另一个问题...... deliveryComplete() 方法真的是在 Broker 发送确认后调用的吗?

这是我的代码:

client.setCallback(new MqttCallback() {
@Override
public void connectionLost(Throwable thrwbl) { }

@Override
public void messageArrived(String string, MqttMessage mm) throws Exception { }

@Override
public void deliveryComplete(IMqttDeliveryToken token) {
try {
System.out.println("Message ID from getMessageId() method : " + token.getMessageId());
MqttMessage message = token.getMessage();
System.out.println("Message ID from getMessage() method : " + message.getId());
} catch (MqttException ex) {
System.out.println(ex);
} catch (Exception ex) {
System.out.println(ex);
}
}
});

MqttMessage message = new MqttMessage();
message.setId(76);
message.setPayload("pouet".getBytes());
message.setQos(0);

client.publish("TEST", message);

QoS 为 0 时:

Message ID from getMessageId() method : 1
Message ID from getMessage() method : 76

QoS 为 1 时:

Message ID from getMessageId() method : 1
java.lang.NullPointerException

最佳答案

如 git MqttMessage.java 中所述

/**
* This is only to be used internally to provide the MQTT id of a message
* received from the server. Has no effect when publishing messages.
* @param messageId
*/
public void setId(int messageId) {
this.messageId = messageId;
}

这是在发布消息时没有使用的地方。现在要了解为什么来自 getMessageId() 方法的消息 ID:1 发生了,请查看以下内容。

public IMqttDeliveryToken publish(String topic, MqttMessage message, Object userContext, IMqttActionListener callback) throws MqttException,
MqttPersistenceException {
final String methodName = "publish";
//@TRACE 111=< topic={0} message={1}userContext={1} callback={2}
log.fine(CLASS_NAME,methodName,"111", new Object[] {topic, userContext, callback});

//Checks if a topic is valid when publishing a message.
MqttTopic.validate(topic, false/*wildcards NOT allowed*/);

MqttDeliveryToken token = new MqttDeliveryToken(getClientId());
token.setActionCallback(callback);
token.setUserContext(userContext);
token.setMessage(message);
token.internalTok.setTopics(new String[] {topic});

MqttPublish pubMsg = new MqttPublish(topic, message);
comms.sendNoWait(pubMsg, token);

//@TRACE 112=<
log.fine(CLASS_NAME,methodName,"112");

return token;
}

MqttDeliveryToken 不在此处设置消息 ID。在发布时创建 MqttPublish 实例,该实例在内部将多级扩展到 MqttWireMessage.java并且该值默认设置为 0。

public MqttWireMessage(byte type) {
this.type = type;
// Use zero as the default message ID. Can't use -1, as that is serialized
// as 65535, which would be a valid ID.
this.msgId = 0;
}

当在 ClientState.java 中调用 Final Send 进行 mqtt 发布时,转发了 msgId = 0 的 MqttWireMessage 实例(内部 MqttPublish 是从上面的代码发送的),因此如果条件为真并且调用 getNextMessageId() 返回 1(因为它是第一条消息,否则它会返回后续值取决于最后的消息 ID)并设置为 token ,您在代码中在 deliveryComplete() 中跟踪该 token 。

public void send(MqttWireMessage message, MqttToken token) throws MqttException {
final String methodName = "send";
if (message.isMessageIdRequired() && (message.getMessageId() == 0)) {
message.setMessageId(getNextMessageId());
}
if (token != null ) {
try {
token.internalTok.setMessageID(message.getMessageId());
} catch (Exception e) {
}
}

/////......
}

关于java - 我如何知道 ACK 对应于 MQTT 上的哪个发布消息?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/41243254/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com