Paho:并行连接创建导致连接下拉

Paho: Parallel Connection Creation leads to Connection Dropdown

我正在使用 JAVA paho 客户端和 mosquitto mqtt broker 1.6.7.

<dependency>
    <groupId>org.eclipse.paho</groupId>
    <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
    <version>1.2.2</version>
</dependency>

我订阅了多个主题,所以我创建了一个 class,如下所示:

private String topic = "";
private MqttClient client = null;

public MqttEndpoint(String topic) throws InterruptedException {
    this.topic = topic;
    new Thread() {
        @Override
        public void run() {
            try {
                client = getNewClient();
                client.setCallback(new Callback());                 
                client.subscribe(topic);

                //isInitialized=true;
            } catch (MqttException e) {
                e.printStackTrace();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }               
        }
    }.start();
}

我的主 class 代码如下所示:

new MqttEndpoint("abc/def");
new MqttEndpoint("abc/def2");
...

我为连接创建了线程以避免连接时间过长。我的问题:通过这种方法,我得到(不总是,但有时)连接丢失错误 (32109):

    at org.eclipse.paho.client.mqttv3.internal.CommsReceiver.run(CommsReceiver.java:190)
    at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: java.io.EOFException
    at java.base/java.io.DataInputStream.readByte(DataInputStream.java:272)
    at org.eclipse.paho.client.mqttv3.internal.wire.MqttInputStream.readMqttWireMessage(MqttInputStream.java:92)
    at org.eclipse.paho.client.mqttv3.internal.CommsReceiver.run(CommsReceiver.java:137)
    ... 1 more 

getNewClient只是returns一个新客户:

public static MqttClient getNewClient(){
        MqttClient client = null;
        try {
            String id=MqttClient.generateClientId();
            client = new MqttClient("tcp://localhost", id,new MemoryPersistence() );
            MqttConnectOptions options = new MqttConnectOptions();
            options.setMaxInflight(8000);
           options.setAutomaticReconnect(true);
            client.connect(options);
        } catch (MqttException exception) {
            if (exception.getCause() instanceof InterruptedException) {
                throw (InterruptedException) exception.getCause();
            }
        }

        return client;
}

如果删除线程,我没有收到此错误:

public MqttEndpoint(String topic) throws InterruptedException {
    this.topic = topic;

            try {
                client = getNewClient();
                client.setCallback(new Callback());                 
                client.subscribe(topic);
                LOGGER.info("subscribed to "+ topic);

                //isInitialized=true;
            } catch (MqttException e) {
                e.printStackTrace();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }               
}

我做错了什么?

编辑: 我使用 QoS 1 发布消息

问题是以下行:

String id=MqttClient.generateClientId();

我根据用户名和系统时间生成一个客户端ID。如果您同时创建多个客户端,则 id 崩溃的可能性会急剧增加,从而导致连接丢失错误....