gpt4 book ai didi

java - 识别多线程 MQTT 发布者中的瓶颈

转载 作者:行者123 更新时间:2023-12-01 23:33:08 26 4
gpt4 key购买 nike

我目前正在使用 Eclipse Paho 为更大的软件开发 MQTT 客户端服务,但遇到了性能问题。我收到了一堆想要发布到代理的事件,并且我正在使用 GSON 来序列化这些事件。我已经对序列化和发布进行了多线程处理。根据基本基准,序列化和发布最多需要 1 毫秒。我使用的 ExecutorService 的固定线程池大小为 10(目前)。

我的代码当前每秒向 ExecutorService 提交大约 50 个 Runnables,但我的 Broker 每秒仅报告大约 5-10 条消息。我之前对 MQTT 设置进行了基准测试,并设法以非多线程方式每秒发送大约 9000 多条 MQTT 消息。

线程池的开销是否太大,以至于我只能从中获取少量的发布?

public class MqttService implements IMessagingService{
protected int PORT = 1883;
protected String HOST = "localhost";
protected final String SERVICENAME = "MQTT";
protected static final String COMMANDTOPIC = "commands";
protected static final String REMINDSPREFIX = "Reminds/";
protected static final String VIOLATIONTOPIC = "violations/";
protected static final String WILDCARDTOPIC = "Reminds/#";
protected static final String TCPPREFIX = "tcp://";
protected static final String SSLPREFIX = "ssl://";

private MqttClient client;
private MqttConnectOptions optionsPublisher = new MqttConnectOptions();

private ExecutorService pool = Executors.newFixedThreadPool(10);

public MqttService() {
this("localhost", 1883);
}

public MqttService(String host, int port) {
this.HOST = host;
this.PORT = port;

}

@Override
public void setPort(int port) {
this.PORT = port;
}

@Override
public void setHost(String host) {
this.HOST = host;
}

@Override
public void sendMessage(AbstractMessage message) {
pool.submit(new SerializeJob(client,message));
}

@Override
public void connect() {
try {
client = new MqttClient(TCPPREFIX + HOST + ":" + PORT, IDPublisher);
optionsPublisher.setMqttVersion(MqttConnectOptions.MQTT_VERSION_3_1_1);
client.connect(optionsPublisher);
client.setCallback(new MessageCallback());
client.subscribe(WILDCARDTOPIC, 0);
} catch (MqttException e1) {
e1.printStackTrace();
}
}
}

下面的代码是ExecutorService执行的Runnable。不过,这本身不应该是一个问题,因为它只需要 1-2 毫秒即可完成。

class SerializeJob implements Runnable {
private AbstractMessage message;
private MqttClient client;

public SerializeJob(MqttClient client, AbstractMessage m) {
this.client = client;
this.message = m;
}

@Override
public void run() {
String serializedMessage = MessageSerializer.serializeMessage(message);
MqttMessage wireMessage = new MqttMessage();
wireMessage.setQos(message.getQos());
wireMessage.setPayload(serializedMessage.getBytes());
if (client.isConnected()) {
StringBuilder topic = new StringBuilder();
topic.append(MqttService.REMINDSPREFIX);
topic.append(MqttService.VIOLATIONTOPIC);
try {
client.publish(topic.toString(), wireMessage);
} catch (MqttPersistenceException e) {
e.printStackTrace();
} catch (MqttException e) {
e.printStackTrace();
}
}
}

}

我不太确定是什么让我退缩。 MQTT 本身似乎允许很多事件,这些事件也可能有很大的有效负载,而且网络也不可能成为问题,因为我目前在与客户端相同的机器上本地托管代理。

编辑并进一步测试:

我现在已经对我自己的设置进行了综合基准测试,其中包括本地托管的 HiveMQ 和在机器上“本地”运行的 Mosquitto 代理。使用 Paho 库,我以 1000 条为批处理发送了越来越大的消息。对于每个批处理,我计算了从第一条消息到最后一条消息的消息吞吐量。这个场景没有使用任何多线程。由此,我得出了以下性能图表:

Throughput for MQTT messages of different sizes with 2 different brokers

运行客户端和代理的计算机是一台具有 i7 6700 和 32 GB RAM 的台式机。代理可以访问其虚拟机的所有核心和 8 GB 内存。

为了进行基准测试,我使用了以下代码:

import java.util.Random;

import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.MqttPersistenceException;

public class MqttBenchmarker {
protected static int PORT = 1883;
protected static String HOST = "localhost";
protected final String SERVICENAME = "MQTT";
protected static final String COMMANDTOPIC = "commands";
protected static final String REMINDSPREFIX = "Reminds/";
protected static final String VIOLATIONTOPIC = "violations/";
protected static final String WILDCARDTOPIC = "Reminds/#";
protected static final String TCPPREFIX = "tcp://";
protected static final String SSLPREFIX = "ssl://";

private static MqttClient client;
private static MqttConnectOptions optionsPublisher = new MqttConnectOptions();
private static String IDPublisher = MqttClient.generateClientId();

private static int messageReceived = 0;
private static long timesent = 0;
private static int count = 2;
private static StringBuilder out = new StringBuilder();
private static StringBuilder in = new StringBuilder();
private static final int runs = 1000;
private static boolean receivefinished = false;

public static void main(String[] args) {
connect();
Thread sendThread=new Thread(new Runnable(){

@Override
public void run() {
Random rd = new Random();
for (int i = 2; i < 1000000; i += i) {
byte[] arr = new byte[i];
// System.out.println("Starting test run for byte Array of size:
// "+arr.length);
long startt = System.currentTimeMillis();
System.out.println("Test for size: " + i + " started.");
for (int a = 0; a <= runs; a++) {

rd.nextBytes(arr);
try {
client.publish(REMINDSPREFIX, arr, 1, false);
} catch (MqttPersistenceException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (MqttException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
try {
while (!receivefinished) {
Thread.sleep(10);
}
receivefinished = false;
System.out.println("Test for size: " + i + " finished.");
out.append("Sending Payload size: " + arr.length + " achieved "
+ runs / ((System.currentTimeMillis() - startt) / 1000d) + " messages per second.\n");

} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
System.out.println(out.toString());
System.out.println(in.toString());
}

});
sendThread.start();

}

private static class MessageCallback implements MqttCallback {

@Override
public void messageArrived(String arg0, MqttMessage arg1) throws Exception {
if (messageReceived == 0) {
timesent = System.currentTimeMillis();
}
messageReceived++;
if (messageReceived >= runs) {
receivefinished = true;
in.append("Receiving payload size " + count + " achieved "
+ runs / ((System.currentTimeMillis() - timesent) / 1000d) + " messages per second.\n");
count += count;
messageReceived = 0;
}
}

@Override
public void deliveryComplete(IMqttDeliveryToken arg0) {
// TODO Auto-generated method stub

}

@Override
public void connectionLost(Throwable arg0) {
// TODO Auto-generated method stub

}
}

public static void connect() {
try {
client = new MqttClient(TCPPREFIX + HOST + ":" + PORT, IDPublisher);
optionsPublisher.setMqttVersion(MqttConnectOptions.MQTT_VERSION_3_1_1);
optionsPublisher.setAutomaticReconnect(true);
optionsPublisher.setCleanSession(false);
optionsPublisher.setMaxInflight(65535);
client.connect(optionsPublisher);
while (!client.isConnected()) {
try {
Thread.sleep(50);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
client.setCallback(new MessageCallback());
client.subscribe(WILDCARDTOPIC, 0);
} catch (MqttException e1) {
e1.printStackTrace();
}
}

}


奇怪的是,我想从应用程序发送的序列化消息仅使用大约 4000 字节。因此理论吞吐量应该在每秒 200 条消息左右。这是否仍然是回调函数内较长计算导致的问题?我已经使用 mosquitto 代理取得了更好的结果,我将进一步测试它可以将性能提升到什么程度。

感谢您的建议!

最佳答案

其中一个问题是 MQTT 客户端的测试设置。

您仅使用一个 MQTT 客户端。您正在有效测试的是使用以下公式的 MQTT 运行窗口的大小:

  throughput <= inflight window-size / round-trip time

HiveMQ 默认启用一个名为 <cluster-overload-protection> 的属性。这限制了 inflight window单个客户端。

此外,paho 客户端并不真正适合多线程环境中的高吞吐量工作。高性能场景的更好实现是 HiveMQ MQTT Client .

连接 20 个客户端(10 个发布和 10 个接收)后,我达到了每秒约 6000 qos=1 10kb 消息的持续吞吐量。

enter image description here

免责声明:我是 HiveMQ 的软件开发人员。

关于java - 识别多线程 MQTT 发布者中的瓶颈,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/57426802/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com