- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我试图弄清楚如何使用相同的 try catch block 在 HiveMQ 客户端中接收多条消息,甚至使用不同的客户端。我按照这个例子:
上面的示例适用于一个客户端和一个发布和订阅,但如果可能的话,我希望在 try catch 的同一 block 中执行多个这些操作。
package com.main;
import java.util.UUID;
import com.hivemq.client.mqtt.MqttGlobalPublishFilter;
import com.hivemq.client.mqtt.datatypes.MqttQos;
import com.hivemq.client.mqtt.mqtt5.Mqtt5BlockingClient;
import com.hivemq.client.mqtt.mqtt5.Mqtt5BlockingClient.Mqtt5Publishes;
import com.hivemq.client.mqtt.mqtt5.Mqtt5Client;
import com.hivemq.client.mqtt.mqtt5.message.publish.Mqtt5Publish;
import java.util.logging.Logger;
import java.util.NoSuchElementException;
import java.util.logging.Level;
import java.util.concurrent.TimeUnit;
public class Main {
private static final Logger LOGGER = Logger.getLogger(Main.class.getName()); // Creates a logger instance
public static void main(String[] args) {
Mqtt5BlockingClient client1 = Mqtt5Client.builder()
.identifier(UUID.randomUUID().toString()) // the unique identifier of the MQTT client. The ID is randomly generated between
.serverHost("localhost") // the host name or IP address of the MQTT server. Kept it 0.0.0.0 for testing. localhost is default if not specified.
.serverPort(1883) // specifies the port of the server
.buildBlocking(); // creates the client builder
client1.connect(); // connects the client
System.out.println("Client1 Connected");
Mqtt5BlockingClient client2 = Mqtt5Client.builder()
.identifier(UUID.randomUUID().toString()) // the unique identifier of the MQTT client. The ID is randomly generated between
.serverHost("localhost") // the host name or IP address of the MQTT server. Kept it 0.0.0.0 for testing. localhost is default if not specified.
.serverPort(1883) // specifies the port of the server
.buildBlocking(); // creates the client builder
client2.connect(); // connects the client
System.out.println("Client2 Connected");
String testmessage = "How is it going!";
byte[] messagebytesend = testmessage.getBytes(); // stores a message as a byte array to be used in the payload
try {
Mqtt5Publishes publishes = client1.publishes(MqttGlobalPublishFilter.ALL); // creates a "publishes" instance thats used to queue incoming messages
// .ALL - filters all incoming Publish messages
client1.subscribeWith() // creates a subscription
.topicFilter("test/something1/topic") // filters to receive messages only on this topic (# = Multilevel wild card, + = single level wild card)
.qos(MqttQos.AT_LEAST_ONCE) // Sets the QoS to 2 (At least once)
.send();
System.out.println("The client1 has subscribed");
client1.publishWith() // publishes the message to the subscribed topic
.topic("test/something1/topic") // publishes to the specified topic
.qos(MqttQos.AT_LEAST_ONCE)
.payload(messagebytesend) // the contents of the message
.send();
System.out.println("The client1 has published");
Mqtt5Publish receivedMessage = publishes.receive(5,TimeUnit.SECONDS).get(); // receives the message using the "publishes" instance waiting up to 5 seconds // .get() returns the object if available or throws a NoSuchElementException
byte[] tempdata = receivedMessage.getPayloadAsBytes(); // converts the "Optional" type message to a byte array
System.out.println();
String getdata = new String(tempdata); // converts the byte array to a String
System.out.println(getdata);
client2.subscribeWith() // creates a subscription
.topicFilter("test/something2/topic") // filters to receive messages only on this topic (# = Multilevel wild card, + = single level wild card)
.qos(MqttQos.AT_LEAST_ONCE) // Sets the QoS to 2 (At least once)
.send();
System.out.println("The client2 has subscribed");
client2.publishWith() // publishes the message to the subscribed topic
.topic("test/something2/topic") // publishes to the specified topic
.qos(MqttQos.AT_LEAST_ONCE)
.payload("The second message :P".getBytes()) // the contents of the message
.send();
System.out.println("The client2 has published");
System.out.println();
// VV Why isn't the publish instance below receiving the second message? Do i need another try catch? VV
receivedMessage = publishes.receive(5,TimeUnit.SECONDS).get(); // receives the message using the "publishes" instance waiting up to 5 seconds // .get() returns the object if available or throws a NoSuchElementException
byte[] tempdata2 = receivedMessage.getPayloadAsBytes(); // converts the "Optional" type message to a byte array
System.out.println();
getdata = new String(tempdata2); // converts the byte array to a String
System.out.println(getdata);
}
catch (InterruptedException e) { // Catches interruptions in the thread
LOGGER.log(Level.SEVERE, "The thread was interrupted while waiting for a message to be received", e);
}
catch (NoSuchElementException e){
System.out.println("There are no received messages"); // Handles when a publish instance has no messages
}
client1.disconnect();
System.out.println("Client1 Disconnected");
client2.disconnect();
System.out.println("Client2 Disconnected");
}
}
我得到的输出:
客户端1已连接
客户端2已连接
client1已订阅
client1已发布
进展如何!
client2已订阅
client2已发布
没有收到消息
客户端1已断开连接
客户端2已断开连接
我想要的输出:
客户端1已连接
客户端2已连接
client1已订阅
client1已发布
进展如何!
client2已订阅
client2已发布
第二条消息:P
客户端1已断开连接
客户端2已断开连接
最佳答案
我运行了你的代码并发现了这个警告日志:
2019-06-11 20:32:22,774 WARN - No publish flow registered for MqttStatefulPublish{stateless=MqttPublish{topic=test/something2/topic, payload=21byte, qos=AT_LEAST_ONCE, retain=false}, packetIdentifier=51, dup=false, topicAlias=0, subscriptionIdentifiers=[1]}.
您似乎忘记为第二个客户端设置发布过滤器。事实上,在等待第二条消息(针对 client2)的代码中,您检查了 client1 的消息流。所以你只需要为client2添加一个发布过滤器:
Mqtt5Publishes publishesClient2 = client2.publishes(MqttGlobalPublishFilter.ALL);
然后等待 client2 的消息:
// VV Why isn't the publish instance below receiving the second message? Do i need another try catch? VV
receivedMessage = publishesClient2.receive(5,TimeUnit.SECONDS).get();
结果:
之前:
Client1 Connected
Client2 Connected
The client1 has subscribed
The client1 has published
How is it going!
The client2 has subscribed
The client2 has published
2019-06-11 20:46:36,537 WARN - No publish flow registered for MqttStatefulPublish{stateless=MqttPublish{topic=test/something2/topic, payload=21byte, qos=AT_LEAST_ONCE, retain=false}, packetIdentifier=51, dup=false, topicAlias=0, subscriptionIdentifiers=[1]}.
There are no received messages
Client1 Disconnected
Client2 Disconnected
之后:
Client1 Connected
Client2 Connected
The client1 has subscribed
The client1 has published
How is it going!
The client2 has subscribed
The client2 has published
The second message :P
Client1 Disconnected
Client2 Disconnected
编辑:我希望这是您正在寻找的解决方案,因为所需的输出与我修复后得到的输出不同。因为 NoSuchElementException 不再抛出/捕获。因此,在第二条消息丢失后,“没有收到的消息”。
编辑响应评论:用于收集具有异步风格的 client2 的发布消息的片段(只需将 try block 中的代码替换为以下代码):
// The list where we put our received publish messages
final List<Mqtt5Publish> incomingMessagesClient2 = new LinkedList<>();
// With the async flavour we can add a consumer for the incoming publish messages
client2.toAsync().publishes(MqttGlobalPublishFilter.ALL, mqtt5Publish ->
incomingMessagesClient2.add(mqtt5Publish));
client1.publishes(MqttGlobalPublishFilter.ALL); // creates a "publishes" instance thats used to queue incoming messages
client2.subscribeWith() // creates a subscription
.topicFilter("test/something1/topic") // filters to receive messages only on this topic (# = Multilevel wild card, + = single level wild card)
.qos(MqttQos.AT_LEAST_ONCE) // Sets the QoS to 2 (At least once)
.send();
System.out.println("The client2 has subscribed");
client1.publishWith() // publishes the message to the subscribed topic
.topic("test/something1/topic") // publishes to the specified topic
.qos(MqttQos.AT_LEAST_ONCE)
.payload(messagebytesend) // the contents of the message
.send();
System.out.println("The client1 has published");
client1.publishWith() // publishes the message to the subscribed topic
.topic("test/something1/topic") // publishes to the specified topic
.qos(MqttQos.AT_LEAST_ONCE)
.payload("The second message :P".getBytes()) // the contents of the message
.send();
System.out.println("The client1 has published");
System.out.println();
TimeUnit.SECONDS.sleep(5);
incomingMessagesClient2.forEach(mqtt5Publish -> System.out.println(new String(mqtt5Publish.getPayloadAsBytes())));
最诚挚的问候,
来自 HiveMQ 团队的 Michael
关于java - 如何在HiveMQ Client中接收多条消息? (MQTT),我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56533751/
我想将 HiveMQ 客户端和 HiveMQ 社区版(代理的实现)合并到一个项目中。我尝试将 HiveMQ 客户端添加为 Hive MQ 社区版(代理)中的 build.gradle 文件的依赖项。它
我正在编写一个主类,它将创建一些客户端并测试它们的订阅和发布。我想显示客户端连接的信息,例如连接的数据和时间、用于连接的 clientId、clientIP,无论它们是否正常连接。我不熟悉使用 Log
我正在使用 HiveMQ Java 客户端连接到 HiveMQ 代理。 Blocking Client Subscriber 不使用任何消息。使用 MQTTBox 发布和订阅工作正常。这是代码。我正在
我正在为我的组织内部运行的专有服务器连接 RESTful API。我们正在使用 HiveMQ,并且我根据 HiveMQ 网站上非常有用的文档创建了一个简单的插件。 我还回顾了缓存和非阻塞哲学,所以我明
我将 HiveMQ 服务器配置为识别 TLS 并创建了 TLS 通信。我想打印出正在使用的密码套件。我已经使用了 getSslConfig() 但我最终得到了它作为输出: 可选[com.hivemq.
我正在尝试使用 HiveMQ 通过简单的身份验证将客户端连接到服务器。在 HiveMQ Client 上,我创建了一个客户端并使用 connectWith() 指定我想通过简单例份验证进行连接。当我输
我正在尝试使用以下指南在我的 Amazon EC2 实例 (ubuntu/images/hvm-ssd/ubuntu-xenial-16.04-amd64-server-20170414) 上设置 H
尝试结合 HiveMQ 的两个特性:共享订阅和持久 session 。 如果创建了一个非常简单的消息生成器。和一个非常简单的消费者。当运行多个消费者时,所有消费者都会收到所有消息。 将消费者的 cle
我正在运行 hivemq mqtt 代理社区版,并希望添加 prometheus 扩展进行监控。 两者均来自 hivemq marketplace 预编译和 github project page .
有没有办法使用某种 setter 方法为客户端分配名称并使用另一种方法检索它?到目前为止,我只是创建了一些方法来打印语句和转换,因此我一直将客户端名称作为字符串手动传递给静态方法,例如 PubSubU
我使用的是 HiveMQ 客户端版本 1.0.1,但我决定更新到最近发布的版本 1.1。我完全从头开始,将项目作为 Gradle 项目导入并尝试构建。只有在忽略一些失败的测试后,构建才能工作。我在 3
我按照教程从从 here 下载的 paho 图形用户界面向 hiveMQ(通过执行我从他们网站下载的 run.bat 文件运行)发送消息,它工作正常并显示客户端连接和发送消息的日志,工作正常,现在我已
我一直在尝试在我的 Android 应用程序中使用 hivemq 实现 mqtt。尽管我使用了他们文档中的相同规范和配置,但我仍然无法建立成功的连接。 我之前能够将 paho 用于 mqtt,但如果应
问题已解决:检查帖子底部 我无法连接到 websockets。端口 1883 工作正常。这是 MQTT.fx 的输出: 2017-01-21 07:46:26,293 INFO --- Broker
因此,我有一个在我的本地计算机上运行的 hivemq 服务器,我正试图将它连接到我可以使用 C# over TLS 中的 M2Mqtt 库连接到服务器的位置。我按照本指南为 C# 创建了证书。 htt
我创建了一个客户端来使用安全连接和加密负载进行测试,因此我想使用默认的 SSL 配置。我尝试这样做,但我得到了一个 ConnectionClosedException 并且服务器立即关闭。我应该在服务
我尝试先设置创建客户端以测试 MQTT 是否正常工作,然后我将实现 connect() 方法。我下载了最新版本的 HiveMQ(一个用 Java 完成的开源 MQTT 实现),在将项目作为 Gradl
我正在尝试为 CentOS 上的 HiveMQ 3.4.2 的 Web UI 启用 ssl 连接。我已经像这样更改了 config.xml 文件 ... true
我想使用 TLS 1.3 与 HiveMQ 进行安全通信。我已经配置了 HiveMQ 社区版服务器 config.xml 文件以指定使用 TLS 1.3 密码套件,并将其指向包含 256 位椭圆曲线
我是一名优秀的程序员,十分优秀!