gpt4 book ai didi

java - Java 中的 MQTT 客户端 - 在线程中启动我的监听器

转载 作者:可可西里 更新时间:2023-11-01 02:43:56 28 4
gpt4 key购买 nike

我正在使用 org.fusesource.mqtt (mqtt-client-1.0-20120208.162159-18-uber) 并基于非阻塞示例用 Java 编写了一个监听器。

我按以下方式使用我的监听器类:Listener mqList = new Listener("tcp://localhost:1883", "mytopic/#", "c:/test.log", true);
新线程(mqList).start();

这很完美。如果我创建两个实例/线程,那么似乎会出现冲突,并且我会收到连接/断开连接消息。

这是失败的用法:

Listener mqList = new Listener("tcp://localhost:1883", "mytopic/#", "c:/test.log", true);       
new Thread(mqList).start( );

Listener mqList1 = new Listener("tcp://localhost:1883", "mytopic1/#", "c:/test1.log", true);
new Thread(mqList1).start( );

我的 Listener 类非常简单,我很困惑为什么它在多线程中不起作用。有什么想法/提示吗?

这是我的类定义:

import org.fusesource.hawtbuf.Buffer;
import org.fusesource.hawtbuf.UTF8Buffer;
import org.fusesource.mqtt.client.*;

import java.io.IOException;
import java.util.ArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.logging.*;
import java.io.*;
import java.net.URISyntaxException;

public class Listener implements Runnable{
private static final long DEFAULT_SLEEP_BEFORE_RE_ATTEMPT_IN_SECONDS = 5000;
private static final long DEFAULT_MAX_RE_ATTEMPT_DURATION_IN_SECONDS = 3600 * 3;

private long listenerSleepBeforeReAttemptInSeconds;
private long listenerMaxReAttemptDurationInSeconds;
private MQTT mqtt;

private ArrayList<Topic> topics;
private boolean listenerDebug;
private String listenerHostURI;
private String listenerTopic;
private String listenerLogFile;
private long listenerLastSuccessfulSubscription;

private Logger fLogger;
private String NEW_LINE = System.getProperty("line.separator");

public Listener(String listenerHostURI, String listenerTopic, String logFile, boolean debug) {
this(listenerHostURI, listenerTopic, logFile, DEFAULT_SLEEP_BEFORE_RE_ATTEMPT_IN_SECONDS, DEFAULT_MAX_RE_ATTEMPT_DURATION_IN_SECONDS, debug);
}

public Listener(String listenerHostURI, String listenerTopic, String logFile, long listenerSleepBeforeReAttemptInSeconds, long listenerMaxReAttemptDurationInSeconds, boolean debug) {
init(listenerHostURI, listenerTopic, logFile, listenerSleepBeforeReAttemptInSeconds, listenerMaxReAttemptDurationInSeconds, debug);
}

private void init(String listenerHostURI, String listenerTopic, String logFile, long listenerSleepBeforeReAttemptInSeconds, long listenerMaxReAttemptDurationInSeconds, boolean debug) {
this.listenerHostURI = listenerHostURI;
this.listenerTopic = listenerTopic;
this.listenerLogFile = logFile;
this.listenerSleepBeforeReAttemptInSeconds = listenerSleepBeforeReAttemptInSeconds;
this.listenerMaxReAttemptDurationInSeconds = listenerMaxReAttemptDurationInSeconds;
this.listenerDebug = debug;
initMQTT();
}

private void initMQTT() {
mqtt = new MQTT();
listenerLastSuccessfulSubscription = System.currentTimeMillis();

try {
fLogger = Logger.getLogger("eTactica.mqtt.listener");
FileHandler handler = new FileHandler(listenerLogFile);
fLogger.addHandler(handler);
} catch (IOException e) {
System.out.println("Logger - Failed");
}

try {
mqtt.setHost(listenerHostURI);
} catch (URISyntaxException e) {
stderr("setHost failed: " + e);
stderr(e);
}
QoS qos = QoS.AT_MOST_ONCE;
topics = new ArrayList<Topic>();
topics.add(new Topic(listenerTopic, qos));
}

private void stdout(String x) {
if (listenerDebug) {
fLogger.log(Level.INFO, x + NEW_LINE);
}
}

private void stderr(String x) {
if (listenerDebug) {
fLogger.log(Level.SEVERE, x + NEW_LINE);
}
}

private void stderr(Throwable e) {
if (listenerDebug) {
StringWriter sw = new StringWriter();
PrintWriter pw = new PrintWriter(sw);
e.printStackTrace(pw);

fLogger.log(Level.SEVERE, sw.toString() + NEW_LINE);
}
}

private void subscriptionSuccessful() {
listenerLastSuccessfulSubscription = System.currentTimeMillis();
}

private boolean tryToListen() {
return ((System.currentTimeMillis() - listenerLastSuccessfulSubscription) < listenerMaxReAttemptDurationInSeconds * 1000);
}

private void sleepBeforeReAttempt() throws InterruptedException {
stdout(String.format(("Listener stopped, re-attempt in %s seconds."), listenerSleepBeforeReAttemptInSeconds));
Thread.sleep(listenerSleepBeforeReAttemptInSeconds);
}

private void listenerReAttemptsOver() {
stdout(String.format(("Listener stopped since reattempts have failed for %s seconds."), listenerMaxReAttemptDurationInSeconds));
}

private void listen() {
final CallbackConnection connection = mqtt.callbackConnection();
final CountDownLatch done = new CountDownLatch(1);



/* Runtime.getRuntime().addShutdownHook(new Thread(){
@Override
public void run() {
setName("MQTT client shutdown");
stderr("Disconnecting the client.");

connection.getDispatchQueue().execute(new Runnable() {
public void run() {
connection.disconnect(new Callback<Void>() {
public void onSuccess(Void value) {
stdout("Disconnecting onSuccess.");
done.countDown();
}
public void onFailure(Throwable value) {
stderr("Disconnecting onFailure: " + value);
stderr(value);
done.countDown();
}
});
}
});
}
});
*/

connection.listener(new org.fusesource.mqtt.client.Listener() {

public void onConnected() {
stdout("Listener onConnected");
}

public void onDisconnected() {
stdout("Listener onDisconnected");
}

public void onPublish(UTF8Buffer topic, Buffer body, Runnable ack) {
stdout(topic + " --> " + body.toString());
ack.run();
}

public void onFailure(Throwable value) {
stdout("Listener onFailure: " + value);
stderr(value);
done.countDown();
}
});

connection.resume();

connection.connect(new Callback<Void>() {
public void onFailure(Throwable value) {
stderr("Connect onFailure...: " + value);
stderr(value);
done.countDown();
}

public void onSuccess(Void value) {
final Topic[] ta = topics.toArray(new Topic[topics.size()]);
connection.subscribe(ta, new Callback<byte[]>() {
public void onSuccess(byte[] value) {
for (int i = 0; i < value.length; i++) {
stdout("Subscribed to Topic: " + ta[i].name() + " with QoS: " + QoS.values()[value[i]]);
}
subscriptionSuccessful();
}
public void onFailure(Throwable value) {
stderr("Subscribe failed: " + value);
stderr(value);
done.countDown();
}
});
}
});

try {
done.await();
} catch (Exception e) {
stderr(e);
}
}

@Override
public void run() {
while (tryToListen()) {
initMQTT();
listen();
try {
sleepBeforeReAttempt();
} catch (InterruptedException e) {
stderr("Sleep failed:" + e);
stderr(e);
}
}

listenerReAttemptsOver();
}

}

最佳答案

一个 TCP 端口只能有一个监听器。 “tcp://localhost:1883”中的数字对于每个监听器必须是唯一的。大概在某个地方(我不熟悉这个特定的 API)你可能也在用一个端口号启动一个客户端;数字必须在客户端和服务器之间匹配。

关于java - Java 中的 MQTT 客户端 - 在线程中启动我的监听器,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/9432192/

28 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com