- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我正在编写一个从 MQTT 代理接收消息的程序服务器从客户端获取ID并使其成为主题名称:例如topic1、topic2。然后,在订阅时,服务器传递主题的名称,然后从该主题读取消息。这是我的服务器:
public class AnalyticServer {
// The server socket.
private static ServerSocket serverSocket = null;
// The client socket.
private static Socket clientSocket = null;
// This server can accept up to maxClientsCount clients' connections.
private static final int maxClientsCount = 5;
private static final clientThread[] threads = new clientThread[maxClientsCount];
public static void main(String args[]) throws MqttException, InterruptedException {
// The default port number.
int portNumber = 4544;
//Open Server
try {
serverSocket = new ServerSocket(portNumber);
} catch (IOException e) {
System.out.println(e);
}
//
//When server is listening
System.out.println("Server is now listening at port 4544");
while (true) {
try {
//Make connection
clientSocket = serverSocket.accept();
System.out.println("Connected");
int i = 0;
//Find thread null to run the connection
for (i = 0; i < maxClientsCount; i++) {
if (threads[i] == null) {
(threads[i] = new clientThread(clientSocket, threads)).start();
break;
}
}
if (i == maxClientsCount) {
PrintStream os = new PrintStream(clientSocket.getOutputStream());
os.println("Server is now, please try again later");
os.close();
clientSocket.close();
}
} catch (IOException e) {
System.out.println(e);
}
}
}
}
//Thread control each Request
class clientThread extends Thread {
private Socket clientSocket = null;
private final clientThread[] threads;
private int maxClientsCount;
public clientThread(Socket clientSocket, clientThread[] threads) {
this.clientSocket = clientSocket;
this.threads = threads;
maxClientsCount = threads.length;
}
public void run() {
int maxClientsCount = this.maxClientsCount;
clientThread[] threads = this.threads;
try {
int identifier=0;
//get id
InputStream input = null;
input = clientSocket.getInputStream();
identifier=input.read();
String topic="topic".concat(String.valueOf(identifier));
//Subscribe
try {
System.out.println("subscribing");
Subscribe receive=new Subscribe(topic);
} catch (MqttException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (URISyntaxException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
// open image
FileInputStream imgPath = new FileInputStream("image.jpg");
BufferedImage bufferedImage = ImageIO.read(imgPath);
Thread.sleep(1200);
ByteArrayOutputStream baos = new ByteArrayOutputStream();
ImageIO.write( bufferedImage, "jpg", baos );
baos.flush();
byte[] imageInByte = baos.toByteArray();
baos.close();
//SendImage
DataOutputStream outToClient = new DataOutputStream(clientSocket.getOutputStream());
outToClient.write(imageInByte);
System.out.println(outToClient.size());
clientSocket.close();
} catch (IOException e) {
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
'这是我的订阅类(class)
public class Subscribe implements MqttCallback {
private final int qos = 1;
static String topic=null;
private MqttClient client;
String subText = "abc";
public Subscribe(String topic) throws MqttException, URISyntaxException {
this.topic=topic;
String host = "tcp://m14.cloudmqtt.com:19484";
String username = "***";
String password = "********";
String clientId = MqttClient.generateClientId();
MqttConnectOptions conOpt = new MqttConnectOptions();
conOpt.setCleanSession(true);
conOpt.setUserName(username);
conOpt.setPassword(password.toCharArray());
this.client = new MqttClient(host, clientId, new MemoryPersistence());
;
this.client.setCallback(this);
this.client.connect(conOpt);
this.client.subscribe(topic,1);
System.out.println("subscribe topic: " +this.topic);
}
/**
* @see MqttCallback#connectionLost(Throwable)
*/
public void connectionLost(Throwable cause) {
System.out.println("Connection lost because: " + cause);
System.exit(1);
}
/**
* @see MqttCallback#deliveryComplete(IMqttDeliveryToken)
*/
public void deliveryComplete(IMqttDeliveryToken token) {
}
/**
* @throws IOException
* @see MqttCallback#messageArrived(String, MqttMessage)
*/
public void messageArrived(String topic, MqttMessage message) throws MqttException, IOException {
System.out.println("1");
subText = message.getPayload().toString();
System.out.println("Received"+subText);
}
}
订阅构造函数中的主题是正确的,但是,this.client.setCallback(this)似乎没有调用messageArrived方法。所以我无法收到任何东西。
有人知道吗?非常感谢
最佳答案
您发布的有关订阅者的信息还不够多,但这里有一个简单的工作:
import java.io.IOException;
import java.sql.Timestamp;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MqttDefaultFilePersistence;
public class TestSub implements MqttCallback
{
public static void main(String[] args)
{
String url = "tcp://iot.eclipse.org:1883";
String clientId = "TestSub_"+System.currentTimeMillis();
String topicName = "test/ABC/one";
int qos = 1;
boolean cleanSession = true;
String userName = "myUserId";
String password = "mypwd";
try
{
new TestSub(url, clientId, cleanSession, userName, password, topicName, qos);
}
catch (MqttException me)
{
System.out.println(me.getLocalizedMessage());
System.out.println(me.getCause());
me.printStackTrace();
}
}
public TestSub(String url, String clientId, boolean cleanSession, String userName, String password, String topicName, int qos) throws MqttException
{
String tmpDir = System.getProperty("java.io.tmpdir");
MqttDefaultFilePersistence dataStore = new MqttDefaultFilePersistence(tmpDir);
MqttClient client;
MqttConnectOptions conOpt;
try
{
conOpt = new MqttConnectOptions();
conOpt.setMqttVersion(MqttConnectOptions.MQTT_VERSION_DEFAULT);
conOpt.setCleanSession(cleanSession);
if (userName != null)
conOpt.setUserName(userName);
if (password != null)
conOpt.setPassword(password.toCharArray());
// Construct an MQTT blocking mode client
client = new MqttClient(url, clientId, dataStore);
// Set this wrapper as the callback handler
client.setCallback(this);
// Connect to the MQTT server
client.connect(conOpt);
System.out.println("Connected to " + url + " with client ID " + client.getClientId());
System.out.println("Subscribing to topic \"" + topicName + "\" qos " + qos);
client.subscribe(topicName, qos);
// Continue waiting for messages until the Enter is pressed
System.out.println("Press <Enter> to exit");
try
{
System.in.read();
}
catch (IOException e)
{
// If we can't read we'll just exit
}
// Disconnect the client from the server
client.disconnect();
System.out.println("Disconnected");
}
catch (MqttException e)
{
e.printStackTrace();
System.out.println("Unable to set up client: " + e.toString());
System.exit(1);
}
}
public void connectionLost(Throwable cause)
{
System.out.println("Connection lost! " + cause.getLocalizedMessage());
System.exit(1);
}
public void deliveryComplete(IMqttDeliveryToken token)
{
}
public void messageArrived(String topic, MqttMessage message)
throws MqttException
{
String time = new Timestamp(System.currentTimeMillis()).toString();
System.out.println("Time:\t" + time + " Topic:\t" + topic + " Message:\t" + new String(message.getPayload()));
}
}
关于java - MQTT从主题接收消息,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/47418076/
这个问题已经有答案了: Cannot create AlertDialog: AppCompat error (2 个回答) 已关闭 6 年前。 当我在列表项中调用警报对话框时,我的应用程序崩溃了。我
我在 Angular 应用程序中安装了 Material UI,现在我收到错误,没有导出的成员 Observable 错误, 我删除了节点模块并重新安装,问题仍然存在 ERROR in node_mo
我有一个架构,其中有两个独立的应用程序。原始来源是一个sql数据库。 App1 监听 CDC 表以跟踪对该数据库中表的更改、规范化和序列化这些更改。它获取这些序列化消息并将它们发送到 Kafka 主题
这个问题在这里已经有了答案: Material Design, AppCompat, and Backwards Compatibility (1 个回答) 关闭 6 年前。 我收到如下错误信息:
我喜欢新 Office 套件和 Visual Studio 上的窗口镶边: 当然,我仍在为 Windows 7 开发应用程序,但我想知道是否有一种快速且简单的方法(阅读:WPF 样式或 Windows
我正在使用 HoloEverywhere-1.6.8。 我有一个基于 Holo.Theme 的自定义主题。 ... 我遇到了下面的崩溃,但它只出现在以下设备上: Galaxy Tab 10.1 P
我正在尝试为 Angular 的 DevExtreme 小部件加载主题。我采用了不同的方法: 在 angular.json 中设置样式但不会产生任何影响: "projects": { "my-proj
我想定义一个 android 样式,它扩展了一个在不同的应用程序/包中定义的样式,而不是作为库导入。 从对android资源的xml引用的定义here : @[:]/ 似乎可以在定义资源的地方指定一个
我正在尝试测试一种制作主题的方法,但我使用的方法并没有给我预期的结果。这是我的设置: drawable/dummy.xml 值/mythemes.xml @style
通过 telnet 使用 IMAP,我希望能够从特定的给定电子邮件中提取主题。现在我知道 fetch 命令负责从电子邮件中获取数据。 我的问题是,如何在不使用对 BODY[HEADER.FIELDS
我刚刚开始使用 RStudio 中的一些新的 knitr 功能。 我已经尝试在 R Markdown 设置对话框中选择几个不同的主题,但这些似乎对我生成的文档的样式没有任何明显的影响。应该,还是我错过
在我的应用程序中,我有多种主题样式(您可以将它们视为不同的、单独的 CSS 样式文件)。我想开始使用 CSS 模块,但我什至不知道如何 import我的第一个文件。 让我们假设以下(简单)目录结构:
有没有一种方法可以在一个 Azure 主题订阅上拥有多个监听客户端,并且它们都接收所有消息?我的理解是订阅的唯一实现是发布的消息仅传递到该订阅上的一个客户端,因为它就像一个队列。 可以使用同一订阅将这
我有一台 super 光滑的显示器,所以白天我可以比深色主题上的代码更好地看到自己的倒影。因此,我认为如果我可以在 vimrc 中有一个简单的 if 开关来根据一天中的时间设置深色主题或浅色主题,那就
我希望在我的 Symfony2 项目中提供基本的主题支持,因此我希望为每个主题提供单独的静态文件(css、js、img)。 我尝试添加 assetic: read_from: %kernel
有没有一种方法可以在一个 Azure 主题订阅上拥有多个监听客户端,并且它们都接收所有消息?我的理解是订阅的唯一实现是发布的消息仅传递到该订阅上的一个客户端,因为它就像一个队列。 可以使用同一订阅将这
在 NES 上有多个处于 WAITING 状态的“Discovery Worker”和“Keep Alive”线程是预期的行为吗? "DiscoveryWorker-10" Id=62 WAITING
我正在尝试找到最适合加载图像的颜色并将其应用到背景中。适应图像并使 UI 感觉更自然。 到目前为止我已经找到了 2 个方案: 1> 平均像素(下面的代码): final Color acclimati
我知道每个请求都由一个 servlet 线程提供服务,但是对于一个用户 session ,两个请求是否可以由两个不同的线程提供服务? 如果上述情况真的发生,那么第一个请求服务线程存储的线程局部变量被第
我无法理解操作栏外观与主题化之间的交互模式。我的应用设置为使用默认主题,我认为它是深色的: 通过应用范围内的样式从应用中删除操作栏会导致主要 Activity 的黑色背景: 没有 and
我是一名优秀的程序员,十分优秀!