Paho:并行连接创建导致连接下拉
Paho: Parallel Connection Creation leads to Connection Dropdown
我正在使用 JAVA paho
客户端和 mosquitto mqtt broker 1.6.7.
<dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
<version>1.2.2</version>
</dependency>
我订阅了多个主题,所以我创建了一个 class,如下所示:
private String topic = "";
private MqttClient client = null;
public MqttEndpoint(String topic) throws InterruptedException {
this.topic = topic;
new Thread() {
@Override
public void run() {
try {
client = getNewClient();
client.setCallback(new Callback());
client.subscribe(topic);
//isInitialized=true;
} catch (MqttException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}.start();
}
我的主 class 代码如下所示:
new MqttEndpoint("abc/def");
new MqttEndpoint("abc/def2");
...
我为连接创建了线程以避免连接时间过长。我的问题:通过这种方法,我得到(不总是,但有时)连接丢失错误 (32109):
at org.eclipse.paho.client.mqttv3.internal.CommsReceiver.run(CommsReceiver.java:190)
at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: java.io.EOFException
at java.base/java.io.DataInputStream.readByte(DataInputStream.java:272)
at org.eclipse.paho.client.mqttv3.internal.wire.MqttInputStream.readMqttWireMessage(MqttInputStream.java:92)
at org.eclipse.paho.client.mqttv3.internal.CommsReceiver.run(CommsReceiver.java:137)
... 1 more
getNewClient
只是returns一个新客户:
public static MqttClient getNewClient(){
MqttClient client = null;
try {
String id=MqttClient.generateClientId();
client = new MqttClient("tcp://localhost", id,new MemoryPersistence() );
MqttConnectOptions options = new MqttConnectOptions();
options.setMaxInflight(8000);
options.setAutomaticReconnect(true);
client.connect(options);
} catch (MqttException exception) {
if (exception.getCause() instanceof InterruptedException) {
throw (InterruptedException) exception.getCause();
}
}
return client;
}
如果删除线程,我没有收到此错误:
public MqttEndpoint(String topic) throws InterruptedException {
this.topic = topic;
try {
client = getNewClient();
client.setCallback(new Callback());
client.subscribe(topic);
LOGGER.info("subscribed to "+ topic);
//isInitialized=true;
} catch (MqttException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
我做错了什么?
编辑:
我使用 QoS 1 发布消息
问题是以下行:
String id=MqttClient.generateClientId();
我根据用户名和系统时间生成一个客户端ID。如果您同时创建多个客户端,则 id 崩溃的可能性会急剧增加,从而导致连接丢失错误....
我正在使用 JAVA paho
客户端和 mosquitto mqtt broker 1.6.7.
<dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
<version>1.2.2</version>
</dependency>
我订阅了多个主题,所以我创建了一个 class,如下所示:
private String topic = "";
private MqttClient client = null;
public MqttEndpoint(String topic) throws InterruptedException {
this.topic = topic;
new Thread() {
@Override
public void run() {
try {
client = getNewClient();
client.setCallback(new Callback());
client.subscribe(topic);
//isInitialized=true;
} catch (MqttException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}.start();
}
我的主 class 代码如下所示:
new MqttEndpoint("abc/def");
new MqttEndpoint("abc/def2");
...
我为连接创建了线程以避免连接时间过长。我的问题:通过这种方法,我得到(不总是,但有时)连接丢失错误 (32109):
at org.eclipse.paho.client.mqttv3.internal.CommsReceiver.run(CommsReceiver.java:190)
at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: java.io.EOFException
at java.base/java.io.DataInputStream.readByte(DataInputStream.java:272)
at org.eclipse.paho.client.mqttv3.internal.wire.MqttInputStream.readMqttWireMessage(MqttInputStream.java:92)
at org.eclipse.paho.client.mqttv3.internal.CommsReceiver.run(CommsReceiver.java:137)
... 1 more
getNewClient
只是returns一个新客户:
public static MqttClient getNewClient(){
MqttClient client = null;
try {
String id=MqttClient.generateClientId();
client = new MqttClient("tcp://localhost", id,new MemoryPersistence() );
MqttConnectOptions options = new MqttConnectOptions();
options.setMaxInflight(8000);
options.setAutomaticReconnect(true);
client.connect(options);
} catch (MqttException exception) {
if (exception.getCause() instanceof InterruptedException) {
throw (InterruptedException) exception.getCause();
}
}
return client;
}
如果删除线程,我没有收到此错误:
public MqttEndpoint(String topic) throws InterruptedException {
this.topic = topic;
try {
client = getNewClient();
client.setCallback(new Callback());
client.subscribe(topic);
LOGGER.info("subscribed to "+ topic);
//isInitialized=true;
} catch (MqttException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
我做错了什么?
编辑: 我使用 QoS 1 发布消息
问题是以下行:
String id=MqttClient.generateClientId();
我根据用户名和系统时间生成一个客户端ID。如果您同时创建多个客户端,则 id 崩溃的可能性会急剧增加,从而导致连接丢失错误....