- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我有一个在 Android 中运行的 MQTT 客户端,以及一个在服务器中运行的 MQTT 代理。我的问题是我们将在哪里使用应用程序,我们有一些连接断开,所以我的网络应用程序需要知道客户端的当前状态。
所以我们现在正在做的是:1 - 服务器向客户端发送一个随机数(每个客户端将收到不同的随机数)2 - android 客户端接收号码并发送到网络服务3 - Web 服务写入 SQL 数据库4 - 服务器等待 4 秒到 android 客户端的响应,如果服务器发送的随机数 == 到数据库中的数字,则客户端已连接。
但现在的问题是,多用户发送随机数时,唯一写入数据库的是最后一个,因此这是一个巨大的设计错误。
为了修复,唯一好的解决方案是从 MQTT 客户端获得直接响应,并且每个客户端必须是唯一的,但我不知道是否可行,或者是否是最好的方法。
画一些图来更好地理解: Flow
这是我的安卓代码:
public class MQTTService extends Service implements MqttCallback {
public static final String DEBUG_TAG = "MqttService"; // Debug TAG
private static final String MQTT_THREAD_NAME = "MqttService[" + DEBUG_TAG + "]"; // Handler
// Thread
// ID
private String MQTT_BROKER = ""; // Broker URL
// or IP
// Address
private static final int MQTT_PORT = 1883; // Broker Port
public static final int MQTT_QOS_0 = 0; // QOS Level 0 ( Delivery Once no
// confirmation )
public static final int MQTT_QOS_1 = 1; // QOS Level 1 ( Delivery at least
// Once with confirmation )
public static final int MQTT_QOS_2 = 2; // QOS Level 2 ( Delivery only once
// with confirmation with handshake
// )
private static final int MQTT_KEEP_ALIVE = 30000; // KeepAlive Interval in
// MS
private static final String MQTT_KEEP_ALIVE_TOPIC_FORMAT = "/users/%s/keepalive"; // Topic
// format
// for
// KeepAlives
private static final byte[] MQTT_KEEP_ALIVE_MESSAGE = { 0 }; // Keep Alive
// message
// to send
private static final int MQTT_KEEP_ALIVE_QOS = MQTT_QOS_2; // Default
// Keepalive QOS
private static final boolean MQTT_CLEAN_SESSION = false; // Start a clean
// session?
private static final String MQTT_URL_FORMAT = "tcp://%s:%d"; // URL Format
// normally
// don't
// change
public static final String ACTION_START = DEBUG_TAG + ".START"; // Action
// to
// start
public static final String ACTION_STOP = DEBUG_TAG + ".STOP"; // Action to
// stop
public static final String ACTION_KEEPALIVE = DEBUG_TAG + ".KEEPALIVE"; // Action
// to
// keep
// alive
// used
// by
// alarm
// manager
private static final String ACTION_RECONNECT = DEBUG_TAG + ".RECONNECT"; // Action
// to
// reconnect
// private final String DEVICE_ID_FORMAT = "andr_%s"; // Device ID
// Format, add
// any prefix
// you'd like
// Note: There
// is a 23
// character
// limit you
// will get
// An NPE if you
// go over that
// limit
private boolean mStarted = false; // Is the Client started?
private String user_ID; // Device ID, Secure.ANDROID_ID
private Handler mConnHandler; // Seperate Handler thread for networking
private MqttDefaultFilePersistence mDataStore; // Defaults to FileStore
private MemoryPersistence mMemStore; // On Fail reverts to MemoryStore
private MqttConnectOptions mOpts; // Connection Options
private MqttTopic mKeepAliveTopic; // Instance Variable for Keepalive topic
private MqttClient mClient; // Mqtt Client
private AlarmManager mAlarmManager; // Alarm manager to perform repeating
// tasks
private ConnectivityManager mConnectivityManager; // To check for
// connectivity changes
public static final String TAG_CONNECTED = "CONNECTED";
public static final String TAG_ASSIGNED = "ASSIGNED";
public static final String TAG_REFRESH = "REFRESH";
public String TOPIC_CONNECTED = null;
public String TOPIC_ASSIGNED = null;
public String TOPIC_REFRESH = null;
private Intent intent;
private PendingIntent alarmIntent;
private AppMaintenance appStatus;
/**
* Initializes the DeviceId and most instance variables Including the
* Connection Handler, Datastore, Alarm Manager and ConnectivityManager.
*/
@Override
public void onCreate() {
super.onCreate();
// mDeviceId = String.format(DEVICE_ID_FORMAT,
// Secure.getString(getContentResolver(), Secure.ANDROID_ID));
android.os.Debug.waitForDebugger(); // Debugger
appStatus = (AppMaintenance) getApplicationContext();
ExceptionHandler.register(this, appStatus.getException_URL());
HandlerThread thread = new HandlerThread(MQTT_THREAD_NAME);
thread.start();
mConnHandler = new Handler(thread.getLooper());
try {
mDataStore = new MqttDefaultFilePersistence(getCacheDir().getAbsolutePath());
} catch (Exception e) {
// writeToFile("Exception - onCreate()");
e.printStackTrace();
mDataStore = null;
mMemStore = new MemoryPersistence();
}
mOpts = new MqttConnectOptions();
mOpts.setCleanSession(MQTT_CLEAN_SESSION);
// Do not set keep alive interval on mOpts we keep track of it with
// alarm's
mAlarmManager = (AlarmManager) getSystemService(ALARM_SERVICE);
mConnectivityManager = (ConnectivityManager) getSystemService(CONNECTIVITY_SERVICE);
registerReceiver(mConnectivityReceiver, new IntentFilter(ConnectivityManager.CONNECTIVITY_ACTION));
}
/**
* Start MQTT Client
*
* @param Context
* context to start the service with
* @return void
*/
public static void actionStart(Context ctx) {
Intent i = new Intent(ctx, MQTTService.class);
i.setAction(ACTION_START);
ctx.startService(i);
}
/**
* Stop MQTT Client
*
* @param Context
* context to start the service with
* @return void
*/
public static void actionStop(Context ctx) {
Intent i = new Intent(ctx, MQTTService.class);
i.setAction(ACTION_STOP);
ctx.startService(i);
}
/**
* Send a KeepAlive Message
*
* @param Context
* context to start the service with
* @return void
*/
public static void actionKeepalive(Context ctx) {
Intent i = new Intent(ctx, MQTTService.class);
i.setAction(ACTION_KEEPALIVE);
ctx.startService(i);
}
/**
* Service onStartCommand Handles the action passed via the Intent
*
* @return START_REDELIVER_INTENT
*/
@Override
public int onStartCommand(Intent intent, int flags, int startId) {
super.onStartCommand(intent, flags, startId);
this.intent = intent;
SharedPreferences myPrefs = getSharedPreferences("UserPreferences", MODE_PRIVATE);
MQTT_BROKER = myPrefs.getString("broker", "");
user_ID = myPrefs.getString("userID", "");
String action = intent.getAction();
TOPIC_CONNECTED = user_ID + "\\" + TAG_CONNECTED;
TOPIC_ASSIGNED = user_ID + "\\" + TAG_ASSIGNED;
TOPIC_REFRESH = user_ID + "\\" + TAG_REFRESH;
Log.i(DEBUG_TAG, "Received action of " + action);
// writeToFile("Received action of " + action);
if (user_ID.isEmpty() || user_ID == null)
action = null;
if (action == null) {
Log.i(DEBUG_TAG, "Starting service with no action\n Probably from a crash");
// writeToFile("Starting service with no action\n Probably from a crash");
Toast.makeText(getApplicationContext(), getString(R.string.mqtt_warning_userid), Toast.LENGTH_LONG).show();
action = null;
} else {
if (action.equals(ACTION_START)) {
Log.i(DEBUG_TAG, "Received ACTION_START");
// writeToFile("Received ACTION_START");
start();
} else if (action.equals(ACTION_STOP)) {
Log.i(DEBUG_TAG, "Received ACTION_STOP");
// writeToFile("Received ACTION_STOP");
stop();
} else if (action.equals(ACTION_KEEPALIVE)) {
Log.i(DEBUG_TAG, "Received ACTION_KEEPALIVE");
// writeToFile("Received ACTION_KEEPALIVE");
keepAlive();
} else if (action.equals(ACTION_RECONNECT)) {
Log.i(DEBUG_TAG, "Received ACTION_RECONNECT");
// writeToFile("Received ACTION_RECONNECT");
reconnectIfNecessary();
}
}
return START_NOT_STICKY;
}
/**
* Attempts connect to the Mqtt Broker and listen for Connectivity changes
* via ConnectivityManager.CONNECTVITIY_ACTION BroadcastReceiver
*/
private synchronized void start() {
if (mStarted) {
Log.i(DEBUG_TAG, "Attempt to start while already started");
// writeToFile("Attempt to start while already started");
return;
}
if (hasScheduledKeepAlives()) {
stopKeepAlives();
}
connect();
}
/**
* Attempts to stop the Mqtt client as well as halting all keep alive
* messages queued in the alarm manager
*/
private synchronized void stop() {
if (!mStarted) {
Log.i(DEBUG_TAG, "Attemt to stop connection that isn't running");
// writeToFile("Attemt to stop connection that isn't running");
return;
}
if (mClient != null) {
mConnHandler.post(new Runnable() {
@Override
public void run() {
try {
mClient.disconnect();
} catch (Exception ex) {
// writeToFile("Exception - stop() ");
ex.printStackTrace();
mClient = null;
mStarted = false;
} finally {
mClient = null;
mStarted = false;
stopKeepAlives();
}
}
});
}
}
/**
* Connects to the broker with the appropriate datastore
*/
private synchronized void connect() {
String url = String.format(Locale.US, MQTT_URL_FORMAT, MQTT_BROKER, MQTT_PORT);
Log.i(DEBUG_TAG, "Connecting with URL: " + url);
// writeToFile("Connecting with URL: " + url);
try {
if (mDataStore != null) {
Log.i(DEBUG_TAG, "Connecting with DataStore");
// writeToFile("Connecting with DataStore");
mClient = new MqttClient(url, user_ID, mDataStore);
} else {
Log.i(DEBUG_TAG, "Connecting with MemStore");
// writeToFile("Connecting with MemStore");
mClient = new MqttClient(url, user_ID, mMemStore);
}
} catch (Exception e) {
// writeToFile("Exception - connect L.343");
e.printStackTrace();
}
mConnHandler.post(new Runnable() {
@Override
public void run() {
try {
mClient.connect(mOpts);
mClient.subscribe(new String[] { TOPIC_CONNECTED, TOPIC_ASSIGNED, TOPIC_REFRESH }, new int[] { MQTT_QOS_0,
MQTT_KEEP_ALIVE_QOS, MQTT_KEEP_ALIVE_QOS });
mClient.setCallback(new MQTTPushCallback(MQTTService.this, intent, user_ID, TOPIC_CONNECTED, TOPIC_ASSIGNED,
TOPIC_REFRESH));
mStarted = true; // Service is now connected
Log.i(DEBUG_TAG, "Successfully connected and subscribed starting keep alives");
// writeToFile("Successfully connected and subscribed starting keep alives");
startKeepAlives();
} catch (Exception e) {
// writeToFile("Exception - connect L.366");
e.printStackTrace();
}
}
});
}
/**
* Schedules keep alives via a PendingIntent in the Alarm Manager
*/
private void startKeepAlives() {
Intent i = new Intent();
i.setClass(this, MQTTService.class);
i.setAction(ACTION_KEEPALIVE);
alarmIntent = PendingIntent.getService(this, 0, i, 0);
mAlarmManager.setRepeating(AlarmManager.RTC_WAKEUP, System.currentTimeMillis() + MQTT_KEEP_ALIVE, MQTT_KEEP_ALIVE, alarmIntent);
Log.i(DEBUG_TAG, "Started keepAlives sucessfully");
// writeToFile("Started keepAlives sucessfully");
}
/**
* Cancels the Pending Intent in the alarm manager
*/
private void stopKeepAlives() {
if (mAlarmManager != null) {
mAlarmManager.cancel(alarmIntent);
}
}
/**
* Publishes a KeepALive to the topic in the broker
*/
private synchronized void keepAlive() {
// if (isForeground()) {
if (isConnected()) {
try {
sendKeepAlive();
return;
} catch (MqttConnectivityException ex) {
// writeToFile("Exception - KeepAlive() 1");
ex.printStackTrace();
reconnectIfNecessary();
} catch (MqttPersistenceException ex) {
// writeToFile("Exception - KeepAlive() 2");
ex.printStackTrace();
stop();
restartService();
} catch (MqttException ex) {
// writeToFile("Exception - KeepAlive() 3");
ex.printStackTrace();
stop();
restartService();
} catch (Exception ex) {
// writeToFile("Exception - KeepAlive() 4");
ex.printStackTrace();
stop();
restartService();
}
}
}
/**
* Checks the current connectivity and reconnects if it is required.
*/
private synchronized void reconnectIfNecessary() {
if (!mStarted && mClient == null)
start();
}
/**
* Query's the NetworkInfo via ConnectivityManager to return the current
* connected state
*
* @return boolean true if we are connected false otherwise
*/
private boolean isNetworkAvailable() {
NetworkInfo info = mConnectivityManager.getActiveNetworkInfo();
return info == null ? false : info.isConnected();
}
/**
* Verifies the client State with our local connected state
*
* @return true if its a match we are connected false if we aren't connected
*/
private boolean isConnected() {
if (mStarted && mClient != null && !mClient.isConnected()) {
Log.i(DEBUG_TAG, "Mismatch between what we think is connected and what is connected");
// writeToFile("Mismatch between what we think is connected and what is connected");
}
if (mClient != null) {
return mStarted && mClient.isConnected() ? true : false;
}
return false;
}
/**
* Receiver that listens for connectivity changes via ConnectivityManager
*/
private final BroadcastReceiver mConnectivityReceiver = new BroadcastReceiver() {
@Override
public void onReceive(Context context, Intent intent) {
// writeToFile("isNetworkAvailable = " + isNetworkAvailable());
if (isNetworkAvailable() && !mStarted) {
Log.i(DEBUG_TAG, "Connectivity Changed...");
// Intent i = new Intent(context, MQTTService.class);
// i.setAction(ACTION_RECONNECT);
// context.startService(i);
restartService();
} else if (!isNetworkAvailable()) {
stop();
}
}
};
/**
* Sends a Keep Alive message to the specified topic
*
* @see MQTT_KEEP_ALIVE_MESSAGE
* @see MQTT_KEEP_ALIVE_TOPIC_FORMAT
* @return MqttDeliveryToken specified token you can choose to wait for
* completion
*/
private synchronized MqttDeliveryToken sendKeepAlive() throws MqttConnectivityException, MqttPersistenceException, MqttException {
if (!isConnected())
throw new MqttConnectivityException();
if (mKeepAliveTopic == null) {
mKeepAliveTopic = mClient.getTopic(String.format(Locale.US, MQTT_KEEP_ALIVE_TOPIC_FORMAT, user_ID));
}
Log.i(DEBUG_TAG, "Sending Keepalive to " + MQTT_BROKER);
// writeToFile("Sending Keepalive to " + MQTT_BROKER);
MqttMessage message = new MqttMessage(MQTT_KEEP_ALIVE_MESSAGE);
message.setQos(MQTT_KEEP_ALIVE_QOS);
return mKeepAliveTopic.publish(message);
}
/**
* Query's the AlarmManager to check if there is a keep alive currently
* scheduled
*
* @return true if there is currently one scheduled false otherwise
*/
private synchronized boolean hasScheduledKeepAlives() {
Intent i = new Intent();
i.setClass(this, MQTTService.class);
i.setAction(ACTION_KEEPALIVE);
PendingIntent pi = PendingIntent.getBroadcast(this, 0, i, PendingIntent.FLAG_NO_CREATE);
return pi != null ? true : false;
}
@Override
public IBinder onBind(Intent arg0) {
return null;
}
/**
* Connectivity Lost from broker
*/
@Override
public void connectionLost(Throwable arg0) {
stopKeepAlives();
mClient = null;
if (isNetworkAvailable()) {
reconnectIfNecessary();
}
}
/**
* Publish Message Completion
*/
@Override
public void deliveryComplete(IMqttDeliveryToken arg0) {
// TODO Auto-generated method stub
}
/**
* Received Message from broker
*/
@Override
public void messageArrived(String arg0, MqttMessage arg1) throws Exception {
// Log.i(DEBUG_TAG,
// " Topic:\t" + topic.getName() + " Message:\t"
// + new String(message.getPayload()) + " QoS:\t"
// + message.getQos());
}
/**
* MqttConnectivityException Exception class
*/
private class MqttConnectivityException extends Exception {
private static final long serialVersionUID = -7385866796799469420L;
}
@Override
public void onDestroy() {
try {
mClient.unsubscribe(new String[] { TOPIC_CONNECTED, TOPIC_ASSIGNED, TOPIC_REFRESH });
mClient.disconnect(0);
} catch (Exception e) {
// writeToFile("Exception - onDestroy() 1");
e.printStackTrace();
} finally {
new WS_LOGOUT(this).execute(user_ID);
}
}
public void restartService() {
mKeepAliveTopic = null;
actionStart(getApplicationContext()); // restart the service
}
最佳答案
当知道客户端是否断开连接时,您可以忍受什么样的延迟?
当服务器检测到 MQTT 保持 Activity 时间已过期但未收到来自客户端的 ping 时,您可以使用 Last Will and Testament 功能向主题发布值。
您可以在连接时设置保持 Activity 时间。但是根据您的要求(电池/网络使用情况),您需要确定将其设置为什么。如果我没记错的话默认是 30 秒(可能是 60)
当您的客户端连接时,它可以在持久主题上设置一个标志以表明它在线,并且 LWT 可以将其设置为 0。
例如
连接时将“1”发布到客户端/[uniqueid]/online
设置 LWT 发布“0”到 client/[uniqueid]/online
关于Android Mosquitto - 客户端连接状态,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/23088420/
我想在 RPi 上的 docker 中运行 eclipse-mosquitto mqtt 服务器。 我用来运行它的命令是: docker run --name mqtt --restart=alway
我有一个运行多个 Docker 容器的 linux 系统。其中之一是从 mosquitto 1.6.7 docker 镜像运行的 mosquitto 容器。 我无法控制 Mosquitto 容器的创建
我正在尝试使用 TLS 通过 mqtt 进行通信。我的系统中安装了 ubuntu。为了使用 TLS,我使用以下链接创建了证书: http://www.embedded101.com/Blogs/Pao
我已经在我的 ARM 目标板上实现了 mosquitto。 我在Makefile中添加了-DWITH_SRV,因为我需要在我的系统中添加SRV Loopup函数。 但是返回错误 fatal error
我正在为 mosquitto MQTT 代理(版本 1.5)实现一个插件,我正在努力访问我的插件实现中指向 mosquitto 客户端结构的指针的某些元素: #include ... int mos
在mosquitto broker中,有一些写事件日志文件的配置。我正在使用以下配置并运行代理。 log_type all log_dest file /root/Files/mosquitto.lo
我正在使用 mosquitto 作为 MQTT 代理,虽然它提供了广泛的日志记录功能,但我无法找到如何将实际主题的消息记录到文件中(甚至是按主题排序的文件树,甚至数据库)。我看到了 log_desc
我有一个负载均衡器,即 aws elb 所有的 pub/sub 都将通过那个 elb 来 elb 下的两个蚊子经纪人 A 和蚊子经纪人 B 一个 mosquitto 代理在这两个代理之间同步主题(mo
Mosqitto ( http://mosquitto.org/ ) 支持哪些消息队列? 另外我想知道在 MQTT 协议(protocol)中,为每个主题创建不同的消息队列,或者在内存限制的情况下可以
我已经在我的 ubuntu 机器上安装了 mosquitto 服务器和客户端软件包。当我运行命令“mosquitto”来运行 mosquitto 服务器时,我收到错误“错误:地址已在使用中”。为什么我
我正在尝试增加 Mosquitto 代理的最大打开文件连接。但是我读到增加的并发连接不仅仅由 Mosquitto 控制。 根据我们的研究,我们决定 100k 并发连接,我们的目标是 1.6 GB RA
我正在使用 C 语言的 mosquitto 代理和 Paho 客户端库。我希望代理只允许发布一定数量的消息,并阻止任何超过该数量的消息。我尝试将配置中的最大飞行消息选项设置为某个数字,但它不起作用。在
我一直在尝试使用 mosquitto lib 开发 C 代码,通过 TLS 在 mosquitto 代理上发布消息。我在 mosquitto 端配置了 TLS,效果很好。我能够使用 mosquitto
我有一个在 Android 中运行的 MQTT 客户端,以及一个在服务器中运行的 MQTT 代理。我的问题是我们将在哪里使用应用程序,我们有一些连接断开,所以我的网络应用程序需要知道客户端的当前状态。
我在使用 libmosquitto 开发 iOS 应用程序时遇到了一些问题 在我的代码中AppDelegate.swift import UIKit @UIApplicationMain class
我想知道是否有一种方法可以将 Mosquitto 配置为在连接到外部客户端时需要 TLS 和客户端证书,而不需要内部客户端的 TLS 和客户端证书。我应该对 CA(证书颁发机构)或 .conf 文件做
我正在使用 mosquitto 1.4.10。 我有一个客户端,可以批量向同一主题发布多条消息(例如,每批 50 条消息)。客户端将每隔几秒发送一次每个批处理(例如,每 5 秒)。发送一批或几批后,在
我想在更改时重新加载 mosquitto 密码文件。是否可以在 Windows 上发送 SIGHUP(“信号挂断”)或类似的 mosquitto 服务器? 最佳答案 这是可以做到的。首先,您必须设置
我如何制作一个像 mosquitto-auth-plug 这样的插件用于授权,或者一个用于存储消息有效负载或其他用途的插件?我应该用 C 编写还是可以使用 python? 最佳答案 任何用于 mosq
我已经实现了一个不安全的 mosquitto 代理,它可以通过端口 1883 定期发送大量数据(每分钟约 200kb 文件)。 由于我已经实现了 TLS,尽管设置了 message_size_limi
我是一名优秀的程序员,十分优秀!