gpt4 book ai didi

java - Moquette 中的 Paho 客户端不会使用离线消息

转载 作者:行者123 更新时间:2023-12-02 04:55:11 25 4
gpt4 key购买 nike

我在通过 eclipse Paho 客户端在 Moquette 服务器中使用离线 MQTT 消息时遇到问题。

以下是我所遵循的步骤。

  1. 创建并启动了 Moquette MQTT 代理。
  2. 使用 eclipse Paho 客户端创建了一个简单的 MQTT 消费者应用程序。
  3. 设置消费者使用主题“devices/reported/#”的数据,QOS:1,CleanSession:False
  4. 创建了一个简单的 MQTT 数据发布器,以使用 Eclipse Paho 将数据发布到 MQTT 代理。
  5. 使用 MQTT 数据发布程序将消息发布到:“devices/reported/client_1”主题,QoS 为:1

上述步骤成功,没有任何问题。

然后我停止了我的消费者应用程序,并将 MQTT 数据发送到具有相同主题的代理。使用我的发布者应用程序 - 服务器能够接收这些消息,但此时没有任何消费者使用此消息,因为我已经停止了我的消费者。然后我再次启动我的消费者应用程序。它已成功连接到代理,但是,它没有收到我在消费者关闭时发送给代理的任何消息。

我是否需要对 Moquette 服务器进行任何特定配置才能保留数据(使用干净 session : false)?或者我错过了什么?

请在下面找到我的示例代码,

Moquette 服务器初始化

package com.gbids.mqtt.moquette.main;

import com.gbids.mqtt.moquette.server.PublishInterceptor;
import io.moquette.interception.InterceptHandler;
import io.moquette.server.Server;
import io.moquette.server.config.IConfig;
import io.moquette.server.config.MemoryConfig;

import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;

public class ServerLauncher {

public static void main(String[] args) throws IOException {
Properties props = new Properties();
final IConfig configs = new MemoryConfig(props);

final Server mqttBroker = new Server();
final List<? extends InterceptHandler> userHandlers = Arrays.asList(new PublishInterceptor());
mqttBroker.startServer(configs, userHandlers);

System.out.println("moquette mqtt broker started, press ctrl-c to shutdown..");
Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
public void run() {
System.out.println("stopping moquette mqtt broker..");
mqttBroker.stopServer();
System.out.println("moquette mqtt broker stopped");
}
});
}
}

MQTT 消费者

package com.gbids.mqtt.moquette.main;

import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;

public class ConsumerLauncher implements MqttCallback {

private static final String topicPrefix = "devices/reported";
private static final String broker = "tcp://0.0.0.0:1883";
private static final String clientIdPrefix = "consumer";

public static void main(String[] args) throws MqttException {
final String clientId = "consumer_1";
MqttClient sampleClient = new MqttClient(broker, clientId, new MemoryPersistence());
MqttConnectOptions connOpts = new MqttConnectOptions();
connOpts.setCleanSession(false);
sampleClient.connect(connOpts);
sampleClient.subscribe(topicPrefix + "/#", 1);
sampleClient.setCallback(new ConsumerLauncher());
}

public void connectionLost(Throwable throwable) {
System.out.println("Consumer connection lost : " + throwable.getMessage());
}

public void messageArrived(String s, MqttMessage mqttMessage) throws Exception {
System.out.println("Message arrived from topic : " + s + " | Message : " + new String(mqttMessage.getPayload()) + " | Message ID : " +mqttMessage.getId());
}

public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
System.out.println("Delivery completed from : " + clientIdPrefix + "_1");
}
}

MQTT 发布者

package com.gbids.mqtt.moquette.main;

import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;

public class ClientLauncher {

private static final String content = "{\"randomData\": 25}";
private static final String willContent = "Client disconnected unexpectedly";
private static final String broker = "tcp://0.0.0.0:1883";
private static final String clientIdPrefix = "client";

public static void main(String[] args) throws Exception{
sendDataWithQOSOne();
System.exit(0);
}

private static void sendDataWithQOSOne(){
try {
final String clientId = "client_1";
MqttClient sampleClient = new MqttClient(broker, clientId, new MemoryPersistence());
MqttConnectOptions connOpts = new MqttConnectOptions();
connOpts.setCleanSession(false); // for publisher - this is not needed I think
sampleClient.connect(connOpts);
MqttMessage message = new MqttMessage(content.getBytes());
message.setQos(1);
final String topic = "devices/reported/" + clientId;
sampleClient.publish(topic, message);
System.out.println("Message published from : " + clientId + " with payload of : " + content);
sampleClient.disconnect();
} catch (MqttException me) {
me.printStackTrace();
}
}
}

最佳答案

在您的情况下,您需要在 ClientLauncher 中创建 MqttMessage 时将 retained 标志设置为 true (出版商)。默认值为 false,如 documentation 中所示。 .

...    
message.setRetained(true)
...

设置此标志可以使消息保留在代理上并发送到新连接的客户端。请注意,代理仅保留主题的最后一条消息。无法为特定主题保留多条消息。

关于java - Moquette 中的 Paho 客户端不会使用离线消息,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/49151167/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com