- android - RelativeLayout 背景可绘制重叠内容
- android - 如何链接 cpufeatures lib 以获取 native android 库?
- java - OnItemClickListener 不起作用,但 OnLongItemClickListener 在自定义 ListView 中起作用
- java - Android 文件转字符串
我正在使用 org.fusesource.mqtt (mqtt-client-1.0-20120208.162159-18-uber) 并基于非阻塞示例用 Java 编写了一个监听器。
我按以下方式使用我的监听器类:Listener mqList = new Listener("tcp://localhost:1883", "mytopic/#", "c:/test.log", true);
新线程(mqList).start();
这很完美。如果我创建两个实例/线程,那么似乎会出现冲突,并且我会收到连接/断开连接消息。
这是失败的用法:
Listener mqList = new Listener("tcp://localhost:1883", "mytopic/#", "c:/test.log", true);
new Thread(mqList).start( );
Listener mqList1 = new Listener("tcp://localhost:1883", "mytopic1/#", "c:/test1.log", true);
new Thread(mqList1).start( );
我的 Listener 类非常简单,我很困惑为什么它在多线程中不起作用。有什么想法/提示吗?
这是我的类定义:
import org.fusesource.hawtbuf.Buffer;
import org.fusesource.hawtbuf.UTF8Buffer;
import org.fusesource.mqtt.client.*;
import java.io.IOException;
import java.util.ArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.logging.*;
import java.io.*;
import java.net.URISyntaxException;
public class Listener implements Runnable{
private static final long DEFAULT_SLEEP_BEFORE_RE_ATTEMPT_IN_SECONDS = 5000;
private static final long DEFAULT_MAX_RE_ATTEMPT_DURATION_IN_SECONDS = 3600 * 3;
private long listenerSleepBeforeReAttemptInSeconds;
private long listenerMaxReAttemptDurationInSeconds;
private MQTT mqtt;
private ArrayList<Topic> topics;
private boolean listenerDebug;
private String listenerHostURI;
private String listenerTopic;
private String listenerLogFile;
private long listenerLastSuccessfulSubscription;
private Logger fLogger;
private String NEW_LINE = System.getProperty("line.separator");
public Listener(String listenerHostURI, String listenerTopic, String logFile, boolean debug) {
this(listenerHostURI, listenerTopic, logFile, DEFAULT_SLEEP_BEFORE_RE_ATTEMPT_IN_SECONDS, DEFAULT_MAX_RE_ATTEMPT_DURATION_IN_SECONDS, debug);
}
public Listener(String listenerHostURI, String listenerTopic, String logFile, long listenerSleepBeforeReAttemptInSeconds, long listenerMaxReAttemptDurationInSeconds, boolean debug) {
init(listenerHostURI, listenerTopic, logFile, listenerSleepBeforeReAttemptInSeconds, listenerMaxReAttemptDurationInSeconds, debug);
}
private void init(String listenerHostURI, String listenerTopic, String logFile, long listenerSleepBeforeReAttemptInSeconds, long listenerMaxReAttemptDurationInSeconds, boolean debug) {
this.listenerHostURI = listenerHostURI;
this.listenerTopic = listenerTopic;
this.listenerLogFile = logFile;
this.listenerSleepBeforeReAttemptInSeconds = listenerSleepBeforeReAttemptInSeconds;
this.listenerMaxReAttemptDurationInSeconds = listenerMaxReAttemptDurationInSeconds;
this.listenerDebug = debug;
initMQTT();
}
private void initMQTT() {
mqtt = new MQTT();
listenerLastSuccessfulSubscription = System.currentTimeMillis();
try {
fLogger = Logger.getLogger("eTactica.mqtt.listener");
FileHandler handler = new FileHandler(listenerLogFile);
fLogger.addHandler(handler);
} catch (IOException e) {
System.out.println("Logger - Failed");
}
try {
mqtt.setHost(listenerHostURI);
} catch (URISyntaxException e) {
stderr("setHost failed: " + e);
stderr(e);
}
QoS qos = QoS.AT_MOST_ONCE;
topics = new ArrayList<Topic>();
topics.add(new Topic(listenerTopic, qos));
}
private void stdout(String x) {
if (listenerDebug) {
fLogger.log(Level.INFO, x + NEW_LINE);
}
}
private void stderr(String x) {
if (listenerDebug) {
fLogger.log(Level.SEVERE, x + NEW_LINE);
}
}
private void stderr(Throwable e) {
if (listenerDebug) {
StringWriter sw = new StringWriter();
PrintWriter pw = new PrintWriter(sw);
e.printStackTrace(pw);
fLogger.log(Level.SEVERE, sw.toString() + NEW_LINE);
}
}
private void subscriptionSuccessful() {
listenerLastSuccessfulSubscription = System.currentTimeMillis();
}
private boolean tryToListen() {
return ((System.currentTimeMillis() - listenerLastSuccessfulSubscription) < listenerMaxReAttemptDurationInSeconds * 1000);
}
private void sleepBeforeReAttempt() throws InterruptedException {
stdout(String.format(("Listener stopped, re-attempt in %s seconds."), listenerSleepBeforeReAttemptInSeconds));
Thread.sleep(listenerSleepBeforeReAttemptInSeconds);
}
private void listenerReAttemptsOver() {
stdout(String.format(("Listener stopped since reattempts have failed for %s seconds."), listenerMaxReAttemptDurationInSeconds));
}
private void listen() {
final CallbackConnection connection = mqtt.callbackConnection();
final CountDownLatch done = new CountDownLatch(1);
/* Runtime.getRuntime().addShutdownHook(new Thread(){
@Override
public void run() {
setName("MQTT client shutdown");
stderr("Disconnecting the client.");
connection.getDispatchQueue().execute(new Runnable() {
public void run() {
connection.disconnect(new Callback<Void>() {
public void onSuccess(Void value) {
stdout("Disconnecting onSuccess.");
done.countDown();
}
public void onFailure(Throwable value) {
stderr("Disconnecting onFailure: " + value);
stderr(value);
done.countDown();
}
});
}
});
}
});
*/
connection.listener(new org.fusesource.mqtt.client.Listener() {
public void onConnected() {
stdout("Listener onConnected");
}
public void onDisconnected() {
stdout("Listener onDisconnected");
}
public void onPublish(UTF8Buffer topic, Buffer body, Runnable ack) {
stdout(topic + " --> " + body.toString());
ack.run();
}
public void onFailure(Throwable value) {
stdout("Listener onFailure: " + value);
stderr(value);
done.countDown();
}
});
connection.resume();
connection.connect(new Callback<Void>() {
public void onFailure(Throwable value) {
stderr("Connect onFailure...: " + value);
stderr(value);
done.countDown();
}
public void onSuccess(Void value) {
final Topic[] ta = topics.toArray(new Topic[topics.size()]);
connection.subscribe(ta, new Callback<byte[]>() {
public void onSuccess(byte[] value) {
for (int i = 0; i < value.length; i++) {
stdout("Subscribed to Topic: " + ta[i].name() + " with QoS: " + QoS.values()[value[i]]);
}
subscriptionSuccessful();
}
public void onFailure(Throwable value) {
stderr("Subscribe failed: " + value);
stderr(value);
done.countDown();
}
});
}
});
try {
done.await();
} catch (Exception e) {
stderr(e);
}
}
@Override
public void run() {
while (tryToListen()) {
initMQTT();
listen();
try {
sleepBeforeReAttempt();
} catch (InterruptedException e) {
stderr("Sleep failed:" + e);
stderr(e);
}
}
listenerReAttemptsOver();
}
}
最佳答案
一个 TCP 端口只能有一个监听器。 “tcp://localhost:1883”中的数字对于每个监听器必须是唯一的。大概在某个地方(我不熟悉这个特定的 API)你可能也在用一个端口号启动一个客户端;数字必须在客户端和服务器之间匹配。
关于java - Java 中的 MQTT 客户端 - 在线程中启动我的监听器,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/9432192/
我有一个 PowerBI Online 数据集,它是在 PowerBI 桌面中创建然后在线发布的。到目前为止,一切都很好。 我通过 PowerBI pusblish 从 Excel 连接到数据集,按预
关闭。这个问题不符合Stack Overflow guidelines .它目前不接受答案。 我们不允许提问寻求书籍、工具、软件库等的推荐。您可以编辑问题,以便用事实和引用来回答。 关闭 2 年前。
我必须对一些太大而无法放入内存的数据训练分类模型,我正在使用 scikit learn 和 pandas 来进行分析。所以这是我的问题,如何在在线学习管道中使用验证来调整超参数? 我使用带有chuck
我正在开发一个应用程序,该应用程序将从 webservice 获取数据和图像并将其存储在设备中以供离线使用。同时,应用程序会将一些数据存储在 sqlite db 中,并将一些图像作为默认数据。 这是应
是否可以使用 FileReader API 和 onprogress 事件访问随 HTML5 传入的数据? 如果是这样,是否有 MD5 或其他快速散列算法的“在线”版本,以便我可以在文件完全读取之前开
希望任何人都可以帮助我更改下面的代码,我的临时文件包含以下代码: Temp=8.4* Humidity=70.4% 代替代码 Temp = 24 *C, Hum = 40 % 适用于以下脚本。 我需
我必须创建一个功能类似于联系人应用程序的应用程序。您可以在客户的 iPhone 上添加一个联系人,它应该会上传到客户的 iPad 上。如果客户在他们的 iPad 上更新联系人,它应该会在他们的 iPh
在 gitlab.com 上审查 merge 请求时,有时我必须在完成 merge 之前进行 rebase。 在 gitlab 上按“Rebase”后,我有一个特定的管道步骤失败,因为它无法验证用户的
关闭。这个问题是off-topic .它目前不接受答案。 想改善这个问题吗? Update the question所以它是 on-topic对于堆栈溢出。 9年前关闭。 Improve this q
我正在尝试在 azure 上托管 SQL 服务器以与节点应用程序进行通信。我已经成功地完成了创建数据库服务器和数据库本身的过程。现在,我想编辑我的数据库结构。据我发现online ,应该有一种方法可以
我在 Quickbooks Intuit 开发人员 API 中使用 Oauth 2 获得了访问 token 。 范围是 com.intuit.quickbooks.accounting 我能够使用 Q
关闭。这个问题不满足Stack Overflow guidelines .它目前不接受答案。 想改善这个问题吗?更新问题,使其成为 on-topic对于堆栈溢出。 6年前关闭。 Improve thi
是否可以使Angular Material progress spinner与文本并大致与字符的大小一致地显示? 我想要类似的东西: please wait 微调器仅与“请稍候”文本成行出现。 这可
关闭。这个问题不满足Stack Overflow guidelines .它目前不接受答案。 想改善这个问题吗?更新问题,使其成为 on-topic对于堆栈溢出。 7年前关闭。 Improve thi
就目前而言,这个问题不适合我们的问答形式。我们希望答案得到事实、引用资料或专业知识的支持,但这个问题可能会引发辩论、争论、投票或扩展讨论。如果您觉得这个问题可以改进并可能重新打开,visit the
我有一个每天运行的Powershell脚本。今天它失败了,因为我正在使用的域 Controller 不可用。在继续执行脚本的其余部分之前,我想确保可以连接到可用的DC。 $LdapServer = "
我想制作一款在线 Flash 游戏,它将具有社交功能,但游戏玩法将主要是单人游戏。例如,屏幕上不会同时出现两个玩家,社交互动将通过异步消息进行,不会有实时聊天或其他任何内容。大部分逻辑将发生在客户端中
这几天我开始在线玩OpenShift。我部署了一个非常简单的“Hello World”Java 示例(1 行代码!),没有任何依赖项(没有 Spring!)命令行是这样的: oc.exe new-a
已关闭。此问题不符合Stack Overflow guidelines 。目前不接受答案。 要求我们推荐或查找工具、库或最喜欢的场外资源的问题对于 Stack Overflow 来说是偏离主题的,因为
所以我一直在网上学习Java(初学者),并且我一直在尝试制作一个用于制作矩形的类文件。但是,我的在线 java 评估器指出它找不到实例变量。 This is the comment on it.我的代
我是一名优秀的程序员,十分优秀!