gpt4 book ai didi

java - 如何使用java客户端并发发布MQTT消息?

转载 作者:行者123 更新时间:2023-12-02 09:55:28 24 4
gpt4 key购买 nike

我正在尝试使用 5 个 java 客户端同时发布 MQTT 消息,以便每个 java 客户端同时向 MQTT 代理(HIVEMQ)发布 1000 条关于特定主题的消息

我打开了多个线程,每个线程创建一个 mqtt 客户端并使用 ssl 连接到代理,并尝试同时发布 1000 条消息,消息正在发送,但所有连接都没有成功连接到代理,并且我不断收到异常

Client is not connected (32104)
at org.eclipse.paho.client.mqttv3.internal.ExceptionHelper.createMqttException(ExceptionHelper.java:31)
at org.eclipse.paho.client.mqttv3.internal.ClientComms.sendNoWait(ClientComms.java:199)
at org.eclipse.paho.client.mqttv3.MqttAsyncClient.publish(MqttAsyncClient.java:1355)
at org.eclipse.paho.client.mqttv3.MqttClient.publish(MqttClient.java:583)
at org.eclipse.paho.client.mqttv3.MqttClient.publish(MqttClient.java:575)
at com.test.MqttPublishSample.publishMessages(MqttPublishSample.java:122)
at com.test.MqttPublishSample.lambda$start$0(MqttPublishSample.java:74)
at java.base/java.lang.Thread.run(Thread.java:834)
public class MqttPublishSample {

public static void main(String... args) throws InterruptedException {

new MqttPublishSample().start();

}

public void start() throws InterruptedException {


for(int i=0;i<5;i++){

new Thread(()->{
MqttClient client = null;
try {
client = obtainConnection();//code to obtain connection using MqttClient
publishMessages(client);//code to publish message using simple for loop

} catch (MqttException e) {
e.printStackTrace();
}

}).start();
}
}
public MqttClient obtainConnection() throws MqttException {
String clientId = "sslTestClient"+ThreadLocalRandom.current().nextInt(0,5);
MqttClient client = null;
try {
client = new MqttClient("ssl://localhost:8883", clientId, new MemoryPersistence());
} catch (MqttException e) {
e.printStackTrace();
}

MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();
mqttConnectOptions.setUserName("user1");
mqttConnectOptions.setPassword("pass1".toCharArray());
try {
mqttConnectOptions.setSocketFactory(getTruststoreFactory());
} catch (Exception e) {
e.printStackTrace();
}

System.out.println("connecting...");
client.connect(mqttConnectOptions);
return client;
}

我希望所有客户端都能成功连接到代理并发布消息,无一异常(exception)

最佳答案

您可能在线程上使用相同的 clientID,因此,服务器将断开重复的连接。当您使用 LocalThreadRandom 时,有可能发生冲突(足够大,因为只有 5 个选择)。您可以使用由generateClientId()提供的唯一标识符,或者在线程之间共享一个方法来跟踪它们。

关于java - 如何使用java客户端并发发布MQTT消息?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56038884/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com