gpt4 book ai didi

java - 一个代理 url,其中包含 mqtt 中的许多主题

转载 作者:行者123 更新时间:2023-11-29 05:20:55 25 4
gpt4 key购买 nike

我有两个主题和一个代理 URL。我需要使用一个代理 URL 发布到这两个主题。

我是用一个代理 URL 和一个主题完成的。然后我尝试处理两个主题并为每个主题编写两个订阅者类,但是当我运行两个订阅者类时,一个将显示连接丢失。推荐一些好的例子来做到这一点。

MQTTPublisher.java

import java.io.BufferedReader;
import java.io.InputStreamReader;

import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttClientPersistence;
import org.eclipse.paho.client.mqttv3.MqttDefaultFilePersistence;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.MqttTopic;

public class MQTTPublisher {


static final String BROKER_URL = "tcp://localhost:1883";// public mosquitto
// server
static final String TOPIC = "iotm/ej";// Change according to your
// application
static final String TOPIC1 = "iotm/stream1";

public static void main(String args[]) {

try {
// Creating new default persistence for mqtt client
MqttClientPersistence persistence = new MqttDefaultFilePersistence(
"/tmp");

// mqtt client with specific url and client id
MqttClient client1 = new MqttClient(BROKER_URL, "Publisher-ID",
persistence);

client.connect();

MqttTopic myTopic = client1.getTopic(TOPIC);
MqttTopic myTopic1 = client1.getTopic(TOPIC1);
String msg = "AMMA!DEVI!dURGA";
System.out.println("Enter the message to publish,Type quit to exit\n");
BufferedReader br = new BufferedReader(new
InputStreamReader(System.in));
msg = br.readLine();
while (!msg.equals("quit")) {
myTopic.publish(new MqttMessage(msg.getBytes()));
System.out.println("Message published on" + TOPIC);
myTopic1.publish(new MqttMessage(msg.getBytes()));

System.out.println("Message published on" + TOPIC1);
msg = br.readLine();
}
myTopic.publish(new MqttMessage(msg.getBytes()));

myTopic1.publish(new MqttMessage(msg.getBytes()));

// client.disconnect();
} catch (MqttException e) {
e.printStackTrace();
} catch (Exception e) {
System.err.println(e.getMessage());
}
}
}

MQTTSubscriber.java

import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttDefaultFilePersistence;
import org.eclipse.paho.client.mqttv3.MqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.MqttTopic;

public class MQTTSubscriber {

static final String BROKER_URL = "tcp://localhost:1883";// public
// mosquitto server
static final String TOPIC = "iotm/ej"; // Change according to your



public static void main(String args[]) {

try {
// Creating new default persistence for mqtt client
MqttDefaultFilePersistence persistence = new MqttDefaultFilePersistence(
"/tmp");

// mqtt client with specific url and a random client id
MqttClient client1 = new MqttClient(BROKER_URL, "Subscriber-ID",
persistence);
client1.connect();
System.out.println("Subscribing to topic '" + TOPIC + "' from "
+ client1.getServerURI());
// Subscribing to specific topic
client1.subscribe(TOPIC);

// It will trigger when a new message is arrived
MqttCallback callback = new MqttCallback() {
@Override
public void messageArrived(MqttTopic arg0, MqttMessage arg1)
throws Exception {
System.out.println("Message:"
+ new String(arg1.getPayload()));

}

@Override
public void deliveryComplete(MqttDeliveryToken arg0) {

}

@Override
public void connectionLost(Throwable arg0) {
System.out.println("Connection lost");
}
};
// Continue waiting for messages until the Enter is pressed
client1.setCallback(callback);
/*
* System.out.println("Press <Enter> to exit"); try {
* System.in.read(); } catch (IOException e) { // If we can't read
* we'll just exit }
*/
// client.disconnect();
// System.out.println("Client Disconnected");

} catch (MqttException e) {
e.printStackTrace();
} catch (Exception e) {
System.err.println(e.getMessage());
}
}

}

MQTTSubscriber2.java

import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttDefaultFilePersistence;
import org.eclipse.paho.client.mqttv3.MqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.MqttTopic;

public class MQTTSubscriber2 {


static final String BROKER_URL = "tcp://localhost:1883";// public
// mosquitto server
static final String TOPIC = "iotm/stream1";



public static void main(String args[]) {

try {
// Creating new default persistence for mqtt client
MqttDefaultFilePersistence persistence = new MqttDefaultFilePersistence(
"/tmp");

// mqtt client with specific url and a random client id
MqttClient client = new MqttClient(BROKER_URL, "Subscriber-ID",
persistence);
client.connect();
System.out.println("Subscribing to topic '" + TOPIC + "' from "
+ client.getServerURI());
// Subscribing to specific topic
client.subscribe(TOPIC);

// It will trigger when a new message is arrived
MqttCallback callback = new MqttCallback() {
@Override
public void messageArrived(MqttTopic arg0, MqttMessage arg1)
throws Exception {
System.out.println("Message:"
+ new String(arg1.getPayload()));

}

@Override
public void deliveryComplete(MqttDeliveryToken arg0) {

}

@Override
public void connectionLost(Throwable arg0) {
System.out.println("Connection lost");
}
};
// Continue waiting for messages until the Enter is pressed
client.setCallback(callback);
/*
* System.out.println("Press <Enter> to exit"); try {
* System.in.read(); } catch (IOException e) { // If we can't read
* we'll just exit }
*/
// client.disconnect();
// System.out.println("Client Disconnected");

} catch (MqttException e) {
e.printStackTrace();
} catch (Exception e) {
System.err.println(e.getMessage());
}
}

最佳答案

如果您正在运行 2 个独立的订阅者代码实例,那么它们都需要不同的客户端 ID。如果您使用相同的方式运行 2,那么当第二个连接时,第一个将与代理断开连接。

关于java - 一个代理 url,其中包含 mqtt 中的许多主题,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/24796051/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com