识别多线程 MQTT 发布者中的瓶颈

Identifying a bottleneck in a multithreaded MQTT publisher

我目前正在使用 Eclipse Paho 为更大的软件开发 MQTT 客户端服务,但我遇到了性能问题。我收到了一堆要发布给代理的事件,我正在使用 GSON 对这些事件进行序列化。我已经对序列化和发布进行了多线程处理。根据基本的基准测试,序列化和发布最多需要 1 毫秒。 我正在使用 ExecutorService 和大小为 10 的固定线程池(目前)。

我的代码目前每秒向 ExecutorService 提交大约 50 个 Runnable,但我的 Broker 报告每秒大约只有 5-10 条消息。 我之前已经对我的 MQTT 设置进行了基准测试,并设法以非多线程方式每秒发送大约 9000 多条 MQTT 消息。

线程池是否有那么大的开销,以至于我只能从中获取这么少量的发布?

public class MqttService implements IMessagingService{
    protected int PORT = 1883;
    protected String HOST = "localhost";
    protected final String SERVICENAME = "MQTT";
    protected static final String COMMANDTOPIC = "commands";
    protected static final String REMINDSPREFIX = "Reminds/";
    protected static final String VIOLATIONTOPIC = "violations/";
    protected static final String WILDCARDTOPIC = "Reminds/#";
    protected static final String TCPPREFIX = "tcp://";
    protected static final String SSLPREFIX = "ssl://";

    private MqttClient client;
    private MqttConnectOptions optionsPublisher = new MqttConnectOptions();

    private ExecutorService pool = Executors.newFixedThreadPool(10);

    public MqttService() {
        this("localhost", 1883);
    }

    public MqttService(String host, int port) {
        this.HOST = host;
        this.PORT = port;

    }

    @Override
    public void setPort(int port) {
        this.PORT = port;
    }

    @Override
    public void setHost(String host) {
        this.HOST = host;
    }

    @Override
    public void sendMessage(AbstractMessage message) {
        pool.submit(new SerializeJob(client,message));
    }

    @Override
    public void connect() {
        try {
            client = new MqttClient(TCPPREFIX + HOST + ":" + PORT, IDPublisher);
            optionsPublisher.setMqttVersion(MqttConnectOptions.MQTT_VERSION_3_1_1);
            client.connect(optionsPublisher);
            client.setCallback(new MessageCallback());
            client.subscribe(WILDCARDTOPIC, 0);
        } catch (MqttException e1) {
            e1.printStackTrace();
        }
    }
}

以下代码是ExecutorService执行的Runnable。不过,这本身应该不是问题,因为它最多只需要 1-2 毫秒即可完成。

class SerializeJob implements Runnable {
    private AbstractMessage message;
    private MqttClient client;

    public SerializeJob(MqttClient client, AbstractMessage m) {
        this.client = client;
        this.message = m;
    }

    @Override
    public void run() {
        String serializedMessage = MessageSerializer.serializeMessage(message);
        MqttMessage wireMessage = new MqttMessage();
        wireMessage.setQos(message.getQos());
        wireMessage.setPayload(serializedMessage.getBytes());
        if (client.isConnected()) {
            StringBuilder topic = new StringBuilder();
            topic.append(MqttService.REMINDSPREFIX);
            topic.append(MqttService.VIOLATIONTOPIC);
            try {
                client.publish(topic.toString(), wireMessage);
            } catch (MqttPersistenceException e) {
                e.printStackTrace();
            } catch (MqttException e) {
                e.printStackTrace();
            }
        }
    }

}

我不太确定是什么让我退缩了。 MQTT 本身似乎允许很多事件,它们也可以有很大的有效负载,网络也不可能是问题,因为我目前在与客户端相同的机器上本地托管代理。

编辑并进一步测试:

我现在对我自己的设置进行了综合基准测试,其中包括一个本地托管的 HiveMQ 和 运行 "natively" 脱离机器的 Mosquitto 代理。使用 Paho 库,我以 1000 条为一组发送了越来越大的消息。对于每批,我计算了从第一条消息到最后一条消息的消息吞吐量。这个场景没有使用任何多线程。有了这个,我得出了以下性能图表:

客户端和代理的机器 运行 都是具有 i7 6700 和 32 GB RAM 的台式机。代理可以访问其 VM 的所有内核和 8 GB 内存。

为了进行基准测试,我使用了以下代码:

import java.util.Random;

import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.MqttPersistenceException;

public class MqttBenchmarker {
    protected static int PORT = 1883;
    protected static String HOST = "localhost";
    protected final String SERVICENAME = "MQTT";
    protected static final String COMMANDTOPIC = "commands";
    protected static final String REMINDSPREFIX = "Reminds/";
    protected static final String VIOLATIONTOPIC = "violations/";
    protected static final String WILDCARDTOPIC = "Reminds/#";
    protected static final String TCPPREFIX = "tcp://";
    protected static final String SSLPREFIX = "ssl://";

    private static MqttClient client;
    private static MqttConnectOptions optionsPublisher = new MqttConnectOptions();
    private static String IDPublisher = MqttClient.generateClientId();

    private static int messageReceived = 0;
    private static long timesent = 0;
    private static int count = 2;
    private static StringBuilder out = new StringBuilder();
    private static StringBuilder in = new StringBuilder();
    private static final int runs = 1000;
    private static boolean receivefinished = false;

    public static void main(String[] args) {
        connect();
        Thread sendThread=new Thread(new Runnable(){

            @Override
            public void run() {
                Random rd = new Random();
                for (int i = 2; i < 1000000; i += i) {
                    byte[] arr = new byte[i];
                    // System.out.println("Starting test run for byte Array of size:
                    // "+arr.length);
                    long startt = System.currentTimeMillis();
                    System.out.println("Test for size: " + i + " started.");
                    for (int a = 0; a <= runs; a++) {

                        rd.nextBytes(arr);
                        try {
                            client.publish(REMINDSPREFIX, arr, 1, false);
                        } catch (MqttPersistenceException e) {
                            // TODO Auto-generated catch block
                            e.printStackTrace();
                        } catch (MqttException e) {
                            // TODO Auto-generated catch block
                            e.printStackTrace();
                        }
                    }
                    try {
                        while (!receivefinished) {
                            Thread.sleep(10);
                        }
                        receivefinished = false;
                        System.out.println("Test for size: " + i + " finished.");
                        out.append("Sending Payload size: " + arr.length + " achieved "
                                + runs / ((System.currentTimeMillis() - startt) / 1000d) + " messages per second.\n");

                    } catch (InterruptedException e) {
                        // TODO Auto-generated catch block
                        e.printStackTrace();
                    }
                }
                System.out.println(out.toString());
                System.out.println(in.toString());
            }

        });
        sendThread.start();

    }

    private static class MessageCallback implements MqttCallback {

        @Override
        public void messageArrived(String arg0, MqttMessage arg1) throws Exception {
            if (messageReceived == 0) {
                timesent = System.currentTimeMillis();
            }
            messageReceived++;
            if (messageReceived >= runs) {
                receivefinished = true;
                in.append("Receiving payload size " + count + " achieved "
                        + runs / ((System.currentTimeMillis() - timesent) / 1000d) + " messages per second.\n");
                count += count;
                messageReceived = 0;
            }
        }

        @Override
        public void deliveryComplete(IMqttDeliveryToken arg0) {
            // TODO Auto-generated method stub

        }

        @Override
        public void connectionLost(Throwable arg0) {
            // TODO Auto-generated method stub

        }
    }

    public static void connect() {
        try {
            client = new MqttClient(TCPPREFIX + HOST + ":" + PORT, IDPublisher);
            optionsPublisher.setMqttVersion(MqttConnectOptions.MQTT_VERSION_3_1_1);
            optionsPublisher.setAutomaticReconnect(true);
            optionsPublisher.setCleanSession(false);
            optionsPublisher.setMaxInflight(65535);
            client.connect(optionsPublisher);
            while (!client.isConnected()) {
                try {
                    Thread.sleep(50);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            client.setCallback(new MessageCallback());
            client.subscribe(WILDCARDTOPIC, 0);
        } catch (MqttException e1) {
            e1.printStackTrace();
        }
    }

}


奇怪的是,我想从我的应用程序发送的序列化消息只使用了大约 4000 个字节。所以理论上的吞吐量应该是每秒 200 条消息左右。这仍然是回调函数内部较长计算导致的问题吗?我已经使用 mosquitto 代理取得了更好的结果,我将进一步测试,我可以将性能提高到什么程度。

感谢任何建议!

我仍然不确定问题出在哪里,但是将我的代理从 HiveMQ 切换到 Mosquitto 似乎解决了这个问题。也许与 HiveMQ 相比,Mosquitto 具有一组不同的默认设置,或者 HiveMQ 的免费试用版可能在其他方面受到限制,而不仅仅是连接的客户端数量。

无论如何,Mosquitto 对我来说工作得更好,并且处理了我可以从我的应用程序中抛出的所有消息。

一个问题是在 MQTT 客户端的测试设置中。

您只使用了一个 MQTT 客户端。您正在有效测试的是 MQTT 飞行中的大小 window 使用此公式:

  throughput <= inflight window-size / round-trip time

HiveMQ 默认启用了一个名为 <cluster-overload-protection> 的 属性 来限制单个客户端的 inflight window

此外,paho 客户端并不真正适合多线程上下文中的高吞吐量工作。高性能场景的更好实现是 HiveMQ MQTT Client.

连接了 20 个客户端(10 个发布和 10 个接收),我达到了大约每秒 6000 qos=1 10kb 消息的持续吞吐量。

免责声明:我是 HiveMQ 的软件开发人员。