gpt4 book ai didi

java - MQTT从主题接收消息

转载 作者:行者123 更新时间:2023-12-02 11:58:41 25 4
gpt4 key购买 nike

我正在编写一个从 MQTT 代理接收消息的程序服务器从客户端获取ID并使其成为主题名称:例如topic1、topic2。然后,在订阅时,服务器传递主题的名称,然后从该主题读取消息。这是我的服务器:

public class AnalyticServer {

// The server socket.
private static ServerSocket serverSocket = null;
// The client socket.
private static Socket clientSocket = null;

// This server can accept up to maxClientsCount clients' connections.
private static final int maxClientsCount = 5;
private static final clientThread[] threads = new clientThread[maxClientsCount];

public static void main(String args[]) throws MqttException, InterruptedException {





// The default port number.
int portNumber = 4544;

//Open Server
try {
serverSocket = new ServerSocket(portNumber);
} catch (IOException e) {
System.out.println(e);
}
//


//When server is listening
System.out.println("Server is now listening at port 4544");
while (true) {
try {
//Make connection

clientSocket = serverSocket.accept();

System.out.println("Connected");



int i = 0;
//Find thread null to run the connection
for (i = 0; i < maxClientsCount; i++) {
if (threads[i] == null) {
(threads[i] = new clientThread(clientSocket, threads)).start();
break;
}
}
if (i == maxClientsCount) {
PrintStream os = new PrintStream(clientSocket.getOutputStream());
os.println("Server is now, please try again later");
os.close();
clientSocket.close();
}
} catch (IOException e) {
System.out.println(e);
}
}
}
}

//Thread control each Request
class clientThread extends Thread {


private Socket clientSocket = null;
private final clientThread[] threads;
private int maxClientsCount;

public clientThread(Socket clientSocket, clientThread[] threads) {
this.clientSocket = clientSocket;
this.threads = threads;
maxClientsCount = threads.length;
}

public void run() {
int maxClientsCount = this.maxClientsCount;
clientThread[] threads = this.threads;


try {
int identifier=0;
//get id
InputStream input = null;
input = clientSocket.getInputStream();


identifier=input.read();

String topic="topic".concat(String.valueOf(identifier));

//Subscribe
try {
System.out.println("subscribing");
Subscribe receive=new Subscribe(topic);


} catch (MqttException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (URISyntaxException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
// open image
FileInputStream imgPath = new FileInputStream("image.jpg");
BufferedImage bufferedImage = ImageIO.read(imgPath);

Thread.sleep(1200);
ByteArrayOutputStream baos = new ByteArrayOutputStream();
ImageIO.write( bufferedImage, "jpg", baos );
baos.flush();
byte[] imageInByte = baos.toByteArray();
baos.close();



//SendImage
DataOutputStream outToClient = new DataOutputStream(clientSocket.getOutputStream());
outToClient.write(imageInByte);
System.out.println(outToClient.size());

clientSocket.close();

} catch (IOException e) {
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}

'这是我的订阅类(class)

public class Subscribe implements MqttCallback {

private final int qos = 1;
static String topic=null;

private MqttClient client;

String subText = "abc";
public Subscribe(String topic) throws MqttException, URISyntaxException {
this.topic=topic;

String host = "tcp://m14.cloudmqtt.com:19484";
String username = "***";
String password = "********";
String clientId = MqttClient.generateClientId();
MqttConnectOptions conOpt = new MqttConnectOptions();
conOpt.setCleanSession(true);
conOpt.setUserName(username);
conOpt.setPassword(password.toCharArray());



this.client = new MqttClient(host, clientId, new MemoryPersistence());

;
this.client.setCallback(this);

this.client.connect(conOpt);
this.client.subscribe(topic,1);
System.out.println("subscribe topic: " +this.topic);
}





/**
* @see MqttCallback#connectionLost(Throwable)
*/
public void connectionLost(Throwable cause) {
System.out.println("Connection lost because: " + cause);
System.exit(1);
}

/**
* @see MqttCallback#deliveryComplete(IMqttDeliveryToken)
*/
public void deliveryComplete(IMqttDeliveryToken token) {
}

/**
* @throws IOException
* @see MqttCallback#messageArrived(String, MqttMessage)
*/

public void messageArrived(String topic, MqttMessage message) throws MqttException, IOException {
System.out.println("1");
subText = message.getPayload().toString();

System.out.println("Received"+subText);

}
}

订阅构造函数中的主题是正确的,但是,this.client.setCallback(this)似乎没有调用messageArrived方法。所以我无法收到任何东西。

有人知道吗?非常感谢

最佳答案

您发布的有关订阅者的信息还不够多,但这里有一个简单的工作:

import java.io.IOException;
import java.sql.Timestamp;

import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MqttDefaultFilePersistence;

public class TestSub implements MqttCallback
{
public static void main(String[] args)
{
String url = "tcp://iot.eclipse.org:1883";
String clientId = "TestSub_"+System.currentTimeMillis();
String topicName = "test/ABC/one";
int qos = 1;
boolean cleanSession = true;
String userName = "myUserId";
String password = "mypwd";

try
{
new TestSub(url, clientId, cleanSession, userName, password, topicName, qos);
}
catch (MqttException me)
{
System.out.println(me.getLocalizedMessage());
System.out.println(me.getCause());
me.printStackTrace();
}
}

public TestSub(String url, String clientId, boolean cleanSession, String userName, String password, String topicName, int qos) throws MqttException
{
String tmpDir = System.getProperty("java.io.tmpdir");
MqttDefaultFilePersistence dataStore = new MqttDefaultFilePersistence(tmpDir);
MqttClient client;
MqttConnectOptions conOpt;

try
{
conOpt = new MqttConnectOptions();
conOpt.setMqttVersion(MqttConnectOptions.MQTT_VERSION_DEFAULT);
conOpt.setCleanSession(cleanSession);
if (userName != null)
conOpt.setUserName(userName);

if (password != null)
conOpt.setPassword(password.toCharArray());

// Construct an MQTT blocking mode client
client = new MqttClient(url, clientId, dataStore);

// Set this wrapper as the callback handler
client.setCallback(this);

// Connect to the MQTT server
client.connect(conOpt);
System.out.println("Connected to " + url + " with client ID " + client.getClientId());

System.out.println("Subscribing to topic \"" + topicName + "\" qos " + qos);
client.subscribe(topicName, qos);

// Continue waiting for messages until the Enter is pressed
System.out.println("Press <Enter> to exit");
try
{
System.in.read();
}
catch (IOException e)
{
// If we can't read we'll just exit
}

// Disconnect the client from the server
client.disconnect();
System.out.println("Disconnected");

}
catch (MqttException e)
{
e.printStackTrace();
System.out.println("Unable to set up client: " + e.toString());
System.exit(1);
}
}

public void connectionLost(Throwable cause)
{
System.out.println("Connection lost! " + cause.getLocalizedMessage());
System.exit(1);
}

public void deliveryComplete(IMqttDeliveryToken token)
{
}

public void messageArrived(String topic, MqttMessage message)
throws MqttException
{
String time = new Timestamp(System.currentTimeMillis()).toString();
System.out.println("Time:\t" + time + " Topic:\t" + topic + " Message:\t" + new String(message.getPayload()));
}
}

关于java - MQTT从主题接收消息,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/47418076/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com