gpt4 book ai didi

java - 如何解决paho mqtt客户端的异步连接问题?

转载 作者:行者123 更新时间:2023-11-29 08:32:48 26 4
gpt4 key购买 nike

背景

我一直在使用 MQTT 进行一个项目,并遇到了一个奇怪的问题。我正在使用paho作为我的 MQTT 客户端和 VerneMQ作为经纪人。

VerneMQ 代理服务已启动并正在运行,我可以通过 runnnig netstat 确认这一点,并且可以看到 127.0.0.1:1883 条目位于 LISTENING 模式。

这是我为客户提供的代码:

public class Producer implements MqttCallback {

private String brokerUri;
private String clientId;

public Producer(String brokerUri, String clientId){
this.brokerUri = brokerUri;
this.clientId = clientId;
}

public void doProduce(String topic, String payload){
MemoryPersistence memoryPersistence = new MemoryPersistence();

try {
MqttAsyncClient mqttAsyncClient = new MqttAsyncClient(brokerUri, clientId, memoryPersistence);
MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();
mqttConnectOptions.setCleanSession(true);
mqttAsyncClient.setCallback(this);
mqttAsyncClient.connect(mqttConnectOptions);
MqttMessage mqttMessage = new MqttMessage();
mqttMessage.setPayload(payload.getBytes());
mqttAsyncClient.publish(topic, mqttMessage);
} catch (MqttException e) {
e.printStackTrace();
}
}

public void connectionLost(Throwable throwable) {

}


public void messageArrived(String s, MqttMessage mqttMessage) throws Exception {

}


public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
System.out.println("Message delivered!");
}
}

以下是我的主类

public class Main {
public static void main(String[] args) {
Producer producer = new Producer("tcp://127.0.0.1:1883", "producer1");
producer.doProduce("dummyTopic", "dummyMessage");
}
}

问题

当我运行应用程序时,我在输出中看到 Client is not connect (32104) 异常。

如果我将 Producer 中的 mqttAsyncClient.connect(mqttConnectOptions); 行更改为 mqttAsyncClient.connect(mqttConnectOptions).waitForCompletion();类中,我可以成功连接到代理,并且可以在输出中看到 Message returned!

如果我没记错的话,waitForCompletion() 将阻止调用,直到收到响应。通过添加这一行,我有效地将 AsyncClient 连接更改为阻塞连接,这不是我想要的方法。

问题

如何解决此问题,以便 paho MQTT 客户端以非阻塞方式连接到代理?我一路上是不是错过了什么?

最佳答案

IMqttAsyncClient 的文档中对此进行了介绍

 IMqttToken token method(parms, Object userContext, IMqttActionListener callback)

In this form a callback is registered with the method. The callback will be notified when the action succeeds or fails. The callback is invoked on the thread managed by the MQTT client so it is important that processing is minimised in the callback. If not the operation of the MQTT client will be inhibited. For example to be notified (called back) when a connect completes:

IMqttToken conToken;
conToken = asyncClient.connect("some context", new MqttAsyncActionListener() {
public void onSuccess(IMqttToken asyncActionToken) {
log("Connected");
}

public void onFailure(IMqttToken asyncActionToken, Throwable exception) {
log ("connect failed" +exception);
}
});

An optional context object can be passed into the method which will then be made available in the callback. The context is stored by the MQTT client) in the token which is then returned to the invoker. The token is provided to the callback methods where the context can then be accessed.

所以你的 try/catch block 应该如下所示:

try {
MqttAsyncClient mqttAsyncClient = new MqttAsyncClient(brokerUri, clientId, memoryPersistence);
MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();
mqttConnectOptions.setCleanSession(true);
mqttAsyncClient.setCallback(this);
mqttAsyncClient.connect(mqttConnectOptions, null, new MqttAsyncActionListener() {
public void onSuccess(IMqttToken asyncActionToken) {
MqttMessage mqttMessage = new MqttMessage();
mqttMessage.setPayload(payload.getBytes());
mqttAsyncClient.publish(topic, mqttMessage);
}

public void onFailure(IMqttToken asyncActionToken, Throwable exception) {
exception.printStackTrace();
}
});

} catch (MqttException e) {
e.printStackTrace();
}

关于java - 如何解决paho mqtt客户端的异步连接问题?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/46580780/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com