识别多线程 MQTT 发布者中的瓶颈
Identifying a bottleneck in a multithreaded MQTT publisher
我目前正在使用 Eclipse Paho 为更大的软件开发 MQTT 客户端服务,但我遇到了性能问题。我收到了一堆要发布给代理的事件,我正在使用 GSON 对这些事件进行序列化。我已经对序列化和发布进行了多线程处理。根据基本的基准测试,序列化和发布最多需要 1 毫秒。
我正在使用 ExecutorService 和大小为 10 的固定线程池(目前)。
我的代码目前每秒向 ExecutorService 提交大约 50 个 Runnable,但我的 Broker 报告每秒大约只有 5-10 条消息。
我之前已经对我的 MQTT 设置进行了基准测试,并设法以非多线程方式每秒发送大约 9000 多条 MQTT 消息。
线程池是否有那么大的开销,以至于我只能从中获取这么少量的发布?
public class MqttService implements IMessagingService{
protected int PORT = 1883;
protected String HOST = "localhost";
protected final String SERVICENAME = "MQTT";
protected static final String COMMANDTOPIC = "commands";
protected static final String REMINDSPREFIX = "Reminds/";
protected static final String VIOLATIONTOPIC = "violations/";
protected static final String WILDCARDTOPIC = "Reminds/#";
protected static final String TCPPREFIX = "tcp://";
protected static final String SSLPREFIX = "ssl://";
private MqttClient client;
private MqttConnectOptions optionsPublisher = new MqttConnectOptions();
private ExecutorService pool = Executors.newFixedThreadPool(10);
public MqttService() {
this("localhost", 1883);
}
public MqttService(String host, int port) {
this.HOST = host;
this.PORT = port;
}
@Override
public void setPort(int port) {
this.PORT = port;
}
@Override
public void setHost(String host) {
this.HOST = host;
}
@Override
public void sendMessage(AbstractMessage message) {
pool.submit(new SerializeJob(client,message));
}
@Override
public void connect() {
try {
client = new MqttClient(TCPPREFIX + HOST + ":" + PORT, IDPublisher);
optionsPublisher.setMqttVersion(MqttConnectOptions.MQTT_VERSION_3_1_1);
client.connect(optionsPublisher);
client.setCallback(new MessageCallback());
client.subscribe(WILDCARDTOPIC, 0);
} catch (MqttException e1) {
e1.printStackTrace();
}
}
}
以下代码是ExecutorService执行的Runnable。不过,这本身应该不是问题,因为它最多只需要 1-2 毫秒即可完成。
class SerializeJob implements Runnable {
private AbstractMessage message;
private MqttClient client;
public SerializeJob(MqttClient client, AbstractMessage m) {
this.client = client;
this.message = m;
}
@Override
public void run() {
String serializedMessage = MessageSerializer.serializeMessage(message);
MqttMessage wireMessage = new MqttMessage();
wireMessage.setQos(message.getQos());
wireMessage.setPayload(serializedMessage.getBytes());
if (client.isConnected()) {
StringBuilder topic = new StringBuilder();
topic.append(MqttService.REMINDSPREFIX);
topic.append(MqttService.VIOLATIONTOPIC);
try {
client.publish(topic.toString(), wireMessage);
} catch (MqttPersistenceException e) {
e.printStackTrace();
} catch (MqttException e) {
e.printStackTrace();
}
}
}
}
我不太确定是什么让我退缩了。 MQTT 本身似乎允许很多事件,它们也可以有很大的有效负载,网络也不可能是问题,因为我目前在与客户端相同的机器上本地托管代理。
编辑并进一步测试:
我现在对我自己的设置进行了综合基准测试,其中包括一个本地托管的 HiveMQ 和 运行 "natively" 脱离机器的 Mosquitto 代理。使用 Paho 库,我以 1000 条为一组发送了越来越大的消息。对于每批,我计算了从第一条消息到最后一条消息的消息吞吐量。这个场景没有使用任何多线程。有了这个,我得出了以下性能图表:
客户端和代理的机器 运行 都是具有 i7 6700 和 32 GB RAM 的台式机。代理可以访问其 VM 的所有内核和 8 GB 内存。
为了进行基准测试,我使用了以下代码:
import java.util.Random;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.MqttPersistenceException;
public class MqttBenchmarker {
protected static int PORT = 1883;
protected static String HOST = "localhost";
protected final String SERVICENAME = "MQTT";
protected static final String COMMANDTOPIC = "commands";
protected static final String REMINDSPREFIX = "Reminds/";
protected static final String VIOLATIONTOPIC = "violations/";
protected static final String WILDCARDTOPIC = "Reminds/#";
protected static final String TCPPREFIX = "tcp://";
protected static final String SSLPREFIX = "ssl://";
private static MqttClient client;
private static MqttConnectOptions optionsPublisher = new MqttConnectOptions();
private static String IDPublisher = MqttClient.generateClientId();
private static int messageReceived = 0;
private static long timesent = 0;
private static int count = 2;
private static StringBuilder out = new StringBuilder();
private static StringBuilder in = new StringBuilder();
private static final int runs = 1000;
private static boolean receivefinished = false;
public static void main(String[] args) {
connect();
Thread sendThread=new Thread(new Runnable(){
@Override
public void run() {
Random rd = new Random();
for (int i = 2; i < 1000000; i += i) {
byte[] arr = new byte[i];
// System.out.println("Starting test run for byte Array of size:
// "+arr.length);
long startt = System.currentTimeMillis();
System.out.println("Test for size: " + i + " started.");
for (int a = 0; a <= runs; a++) {
rd.nextBytes(arr);
try {
client.publish(REMINDSPREFIX, arr, 1, false);
} catch (MqttPersistenceException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (MqttException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
try {
while (!receivefinished) {
Thread.sleep(10);
}
receivefinished = false;
System.out.println("Test for size: " + i + " finished.");
out.append("Sending Payload size: " + arr.length + " achieved "
+ runs / ((System.currentTimeMillis() - startt) / 1000d) + " messages per second.\n");
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
System.out.println(out.toString());
System.out.println(in.toString());
}
});
sendThread.start();
}
private static class MessageCallback implements MqttCallback {
@Override
public void messageArrived(String arg0, MqttMessage arg1) throws Exception {
if (messageReceived == 0) {
timesent = System.currentTimeMillis();
}
messageReceived++;
if (messageReceived >= runs) {
receivefinished = true;
in.append("Receiving payload size " + count + " achieved "
+ runs / ((System.currentTimeMillis() - timesent) / 1000d) + " messages per second.\n");
count += count;
messageReceived = 0;
}
}
@Override
public void deliveryComplete(IMqttDeliveryToken arg0) {
// TODO Auto-generated method stub
}
@Override
public void connectionLost(Throwable arg0) {
// TODO Auto-generated method stub
}
}
public static void connect() {
try {
client = new MqttClient(TCPPREFIX + HOST + ":" + PORT, IDPublisher);
optionsPublisher.setMqttVersion(MqttConnectOptions.MQTT_VERSION_3_1_1);
optionsPublisher.setAutomaticReconnect(true);
optionsPublisher.setCleanSession(false);
optionsPublisher.setMaxInflight(65535);
client.connect(optionsPublisher);
while (!client.isConnected()) {
try {
Thread.sleep(50);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
client.setCallback(new MessageCallback());
client.subscribe(WILDCARDTOPIC, 0);
} catch (MqttException e1) {
e1.printStackTrace();
}
}
}
奇怪的是,我想从我的应用程序发送的序列化消息只使用了大约 4000 个字节。所以理论上的吞吐量应该是每秒 200 条消息左右。这仍然是回调函数内部较长计算导致的问题吗?我已经使用 mosquitto 代理取得了更好的结果,我将进一步测试,我可以将性能提高到什么程度。
感谢任何建议!
我仍然不确定问题出在哪里,但是将我的代理从 HiveMQ 切换到 Mosquitto 似乎解决了这个问题。也许与 HiveMQ 相比,Mosquitto 具有一组不同的默认设置,或者 HiveMQ 的免费试用版可能在其他方面受到限制,而不仅仅是连接的客户端数量。
无论如何,Mosquitto 对我来说工作得更好,并且处理了我可以从我的应用程序中抛出的所有消息。
一个问题是在 MQTT 客户端的测试设置中。
您只使用了一个 MQTT 客户端。您正在有效测试的是 MQTT 飞行中的大小 window 使用此公式:
throughput <= inflight window-size / round-trip time
HiveMQ 默认启用了一个名为 <cluster-overload-protection>
的 属性 来限制单个客户端的 inflight window
。
此外,paho 客户端并不真正适合多线程上下文中的高吞吐量工作。高性能场景的更好实现是 HiveMQ MQTT Client.
连接了 20 个客户端(10 个发布和 10 个接收),我达到了大约每秒 6000 qos=1 10kb 消息的持续吞吐量。
免责声明:我是 HiveMQ 的软件开发人员。
我目前正在使用 Eclipse Paho 为更大的软件开发 MQTT 客户端服务,但我遇到了性能问题。我收到了一堆要发布给代理的事件,我正在使用 GSON 对这些事件进行序列化。我已经对序列化和发布进行了多线程处理。根据基本的基准测试,序列化和发布最多需要 1 毫秒。 我正在使用 ExecutorService 和大小为 10 的固定线程池(目前)。
我的代码目前每秒向 ExecutorService 提交大约 50 个 Runnable,但我的 Broker 报告每秒大约只有 5-10 条消息。 我之前已经对我的 MQTT 设置进行了基准测试,并设法以非多线程方式每秒发送大约 9000 多条 MQTT 消息。
线程池是否有那么大的开销,以至于我只能从中获取这么少量的发布?
public class MqttService implements IMessagingService{
protected int PORT = 1883;
protected String HOST = "localhost";
protected final String SERVICENAME = "MQTT";
protected static final String COMMANDTOPIC = "commands";
protected static final String REMINDSPREFIX = "Reminds/";
protected static final String VIOLATIONTOPIC = "violations/";
protected static final String WILDCARDTOPIC = "Reminds/#";
protected static final String TCPPREFIX = "tcp://";
protected static final String SSLPREFIX = "ssl://";
private MqttClient client;
private MqttConnectOptions optionsPublisher = new MqttConnectOptions();
private ExecutorService pool = Executors.newFixedThreadPool(10);
public MqttService() {
this("localhost", 1883);
}
public MqttService(String host, int port) {
this.HOST = host;
this.PORT = port;
}
@Override
public void setPort(int port) {
this.PORT = port;
}
@Override
public void setHost(String host) {
this.HOST = host;
}
@Override
public void sendMessage(AbstractMessage message) {
pool.submit(new SerializeJob(client,message));
}
@Override
public void connect() {
try {
client = new MqttClient(TCPPREFIX + HOST + ":" + PORT, IDPublisher);
optionsPublisher.setMqttVersion(MqttConnectOptions.MQTT_VERSION_3_1_1);
client.connect(optionsPublisher);
client.setCallback(new MessageCallback());
client.subscribe(WILDCARDTOPIC, 0);
} catch (MqttException e1) {
e1.printStackTrace();
}
}
}
以下代码是ExecutorService执行的Runnable。不过,这本身应该不是问题,因为它最多只需要 1-2 毫秒即可完成。
class SerializeJob implements Runnable {
private AbstractMessage message;
private MqttClient client;
public SerializeJob(MqttClient client, AbstractMessage m) {
this.client = client;
this.message = m;
}
@Override
public void run() {
String serializedMessage = MessageSerializer.serializeMessage(message);
MqttMessage wireMessage = new MqttMessage();
wireMessage.setQos(message.getQos());
wireMessage.setPayload(serializedMessage.getBytes());
if (client.isConnected()) {
StringBuilder topic = new StringBuilder();
topic.append(MqttService.REMINDSPREFIX);
topic.append(MqttService.VIOLATIONTOPIC);
try {
client.publish(topic.toString(), wireMessage);
} catch (MqttPersistenceException e) {
e.printStackTrace();
} catch (MqttException e) {
e.printStackTrace();
}
}
}
}
我不太确定是什么让我退缩了。 MQTT 本身似乎允许很多事件,它们也可以有很大的有效负载,网络也不可能是问题,因为我目前在与客户端相同的机器上本地托管代理。
编辑并进一步测试:
我现在对我自己的设置进行了综合基准测试,其中包括一个本地托管的 HiveMQ 和 运行 "natively" 脱离机器的 Mosquitto 代理。使用 Paho 库,我以 1000 条为一组发送了越来越大的消息。对于每批,我计算了从第一条消息到最后一条消息的消息吞吐量。这个场景没有使用任何多线程。有了这个,我得出了以下性能图表:
客户端和代理的机器 运行 都是具有 i7 6700 和 32 GB RAM 的台式机。代理可以访问其 VM 的所有内核和 8 GB 内存。
为了进行基准测试,我使用了以下代码:
import java.util.Random;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.MqttPersistenceException;
public class MqttBenchmarker {
protected static int PORT = 1883;
protected static String HOST = "localhost";
protected final String SERVICENAME = "MQTT";
protected static final String COMMANDTOPIC = "commands";
protected static final String REMINDSPREFIX = "Reminds/";
protected static final String VIOLATIONTOPIC = "violations/";
protected static final String WILDCARDTOPIC = "Reminds/#";
protected static final String TCPPREFIX = "tcp://";
protected static final String SSLPREFIX = "ssl://";
private static MqttClient client;
private static MqttConnectOptions optionsPublisher = new MqttConnectOptions();
private static String IDPublisher = MqttClient.generateClientId();
private static int messageReceived = 0;
private static long timesent = 0;
private static int count = 2;
private static StringBuilder out = new StringBuilder();
private static StringBuilder in = new StringBuilder();
private static final int runs = 1000;
private static boolean receivefinished = false;
public static void main(String[] args) {
connect();
Thread sendThread=new Thread(new Runnable(){
@Override
public void run() {
Random rd = new Random();
for (int i = 2; i < 1000000; i += i) {
byte[] arr = new byte[i];
// System.out.println("Starting test run for byte Array of size:
// "+arr.length);
long startt = System.currentTimeMillis();
System.out.println("Test for size: " + i + " started.");
for (int a = 0; a <= runs; a++) {
rd.nextBytes(arr);
try {
client.publish(REMINDSPREFIX, arr, 1, false);
} catch (MqttPersistenceException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (MqttException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
try {
while (!receivefinished) {
Thread.sleep(10);
}
receivefinished = false;
System.out.println("Test for size: " + i + " finished.");
out.append("Sending Payload size: " + arr.length + " achieved "
+ runs / ((System.currentTimeMillis() - startt) / 1000d) + " messages per second.\n");
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
System.out.println(out.toString());
System.out.println(in.toString());
}
});
sendThread.start();
}
private static class MessageCallback implements MqttCallback {
@Override
public void messageArrived(String arg0, MqttMessage arg1) throws Exception {
if (messageReceived == 0) {
timesent = System.currentTimeMillis();
}
messageReceived++;
if (messageReceived >= runs) {
receivefinished = true;
in.append("Receiving payload size " + count + " achieved "
+ runs / ((System.currentTimeMillis() - timesent) / 1000d) + " messages per second.\n");
count += count;
messageReceived = 0;
}
}
@Override
public void deliveryComplete(IMqttDeliveryToken arg0) {
// TODO Auto-generated method stub
}
@Override
public void connectionLost(Throwable arg0) {
// TODO Auto-generated method stub
}
}
public static void connect() {
try {
client = new MqttClient(TCPPREFIX + HOST + ":" + PORT, IDPublisher);
optionsPublisher.setMqttVersion(MqttConnectOptions.MQTT_VERSION_3_1_1);
optionsPublisher.setAutomaticReconnect(true);
optionsPublisher.setCleanSession(false);
optionsPublisher.setMaxInflight(65535);
client.connect(optionsPublisher);
while (!client.isConnected()) {
try {
Thread.sleep(50);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
client.setCallback(new MessageCallback());
client.subscribe(WILDCARDTOPIC, 0);
} catch (MqttException e1) {
e1.printStackTrace();
}
}
}
奇怪的是,我想从我的应用程序发送的序列化消息只使用了大约 4000 个字节。所以理论上的吞吐量应该是每秒 200 条消息左右。这仍然是回调函数内部较长计算导致的问题吗?我已经使用 mosquitto 代理取得了更好的结果,我将进一步测试,我可以将性能提高到什么程度。
感谢任何建议!
我仍然不确定问题出在哪里,但是将我的代理从 HiveMQ 切换到 Mosquitto 似乎解决了这个问题。也许与 HiveMQ 相比,Mosquitto 具有一组不同的默认设置,或者 HiveMQ 的免费试用版可能在其他方面受到限制,而不仅仅是连接的客户端数量。
无论如何,Mosquitto 对我来说工作得更好,并且处理了我可以从我的应用程序中抛出的所有消息。
一个问题是在 MQTT 客户端的测试设置中。
您只使用了一个 MQTT 客户端。您正在有效测试的是 MQTT 飞行中的大小 window 使用此公式:
throughput <= inflight window-size / round-trip time
HiveMQ 默认启用了一个名为 <cluster-overload-protection>
的 属性 来限制单个客户端的 inflight window
。
此外,paho 客户端并不真正适合多线程上下文中的高吞吐量工作。高性能场景的更好实现是 HiveMQ MQTT Client.
连接了 20 个客户端(10 个发布和 10 个接收),我达到了大约每秒 6000 qos=1 10kb 消息的持续吞吐量。
免责声明:我是 HiveMQ 的软件开发人员。