如何解决 paho mqtt 客户端的异步连接问题?
How to solve async connection issue of paho mqtt client?
背景
我一直在为一个项目使用 MQTT,遇到了一个奇怪的问题。我正在使用 paho
as my MQTT client and VerneMQ
作为经纪人。
VerneMQ 代理服务已启动并且 运行ning,我可以通过 运行nnig netstat
确认这一点,我可以看到 127.0.0.1:1883
条目在 LISTENING
模式。
这是我的客户代码:
public class Producer implements MqttCallback {
private String brokerUri;
private String clientId;
public Producer(String brokerUri, String clientId){
this.brokerUri = brokerUri;
this.clientId = clientId;
}
public void doProduce(String topic, String payload){
MemoryPersistence memoryPersistence = new MemoryPersistence();
try {
MqttAsyncClient mqttAsyncClient = new MqttAsyncClient(brokerUri, clientId, memoryPersistence);
MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();
mqttConnectOptions.setCleanSession(true);
mqttAsyncClient.setCallback(this);
mqttAsyncClient.connect(mqttConnectOptions);
MqttMessage mqttMessage = new MqttMessage();
mqttMessage.setPayload(payload.getBytes());
mqttAsyncClient.publish(topic, mqttMessage);
} catch (MqttException e) {
e.printStackTrace();
}
}
public void connectionLost(Throwable throwable) {
}
public void messageArrived(String s, MqttMessage mqttMessage) throws Exception {
}
public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
System.out.println("Message delivered!");
}
}
以下是我的主要class
public class Main {
public static void main(String[] args) {
Producer producer = new Producer("tcp://127.0.0.1:1883", "producer1");
producer.doProduce("dummyTopic", "dummyMessage");
}
}
问题
当我 运行 我的应用程序时,我在输出中看到 Client is not connected (32104)
异常。
如果我在 Producer
class 中将行 mqttAsyncClient.connect(mqttConnectOptions);
更改为 mqttAsyncClient.connect(mqttConnectOptions).waitForCompletion();
,我可以成功连接到代理,并且可以在 Message delivered!
中看到输出。
如果我没记错的话 waitForCompletion()
将阻止调用直到收到响应。通过添加这一行,我有效地将我的 AsyncClient 连接更改为阻塞连接,这不是我想要的方法。
问题
我该如何解决这个问题,使 paho MQTT 客户端以非阻塞方式连接到代理?一路上我错过了什么吗?
这包含在 IMqttAsyncClient
的文档中
IMqttToken token method(parms, Object userContext, IMqttActionListener callback)
In this form a callback is registered with the method. The callback
will be notified when the action succeeds or fails. The callback is
invoked on the thread managed by the MQTT client so it is important
that processing is minimised in the callback. If not the operation of
the MQTT client will be inhibited. For example to be notified (called
back) when a connect completes:
IMqttToken conToken;
conToken = asyncClient.connect("some context", new MqttAsyncActionListener() {
public void onSuccess(IMqttToken asyncActionToken) {
log("Connected");
}
public void onFailure(IMqttToken asyncActionToken, Throwable exception) {
log ("connect failed" +exception);
}
});
An optional context object can be passed into the method which will
then be made available in the callback. The context is stored by the
MQTT client) in the token which is then returned to the invoker. The
token is provided to the callback methods where the context can then
be accessed.
因此您的 try/catch 块应该如下所示:
try {
MqttAsyncClient mqttAsyncClient = new MqttAsyncClient(brokerUri, clientId, memoryPersistence);
MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();
mqttConnectOptions.setCleanSession(true);
mqttAsyncClient.setCallback(this);
mqttAsyncClient.connect(mqttConnectOptions, null, new MqttAsyncActionListener() {
public void onSuccess(IMqttToken asyncActionToken) {
MqttMessage mqttMessage = new MqttMessage();
mqttMessage.setPayload(payload.getBytes());
mqttAsyncClient.publish(topic, mqttMessage);
}
public void onFailure(IMqttToken asyncActionToken, Throwable exception) {
exception.printStackTrace();
}
});
} catch (MqttException e) {
e.printStackTrace();
}
背景
我一直在为一个项目使用 MQTT,遇到了一个奇怪的问题。我正在使用 paho
as my MQTT client and VerneMQ
作为经纪人。
VerneMQ 代理服务已启动并且 运行ning,我可以通过 运行nnig netstat
确认这一点,我可以看到 127.0.0.1:1883
条目在 LISTENING
模式。
这是我的客户代码:
public class Producer implements MqttCallback {
private String brokerUri;
private String clientId;
public Producer(String brokerUri, String clientId){
this.brokerUri = brokerUri;
this.clientId = clientId;
}
public void doProduce(String topic, String payload){
MemoryPersistence memoryPersistence = new MemoryPersistence();
try {
MqttAsyncClient mqttAsyncClient = new MqttAsyncClient(brokerUri, clientId, memoryPersistence);
MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();
mqttConnectOptions.setCleanSession(true);
mqttAsyncClient.setCallback(this);
mqttAsyncClient.connect(mqttConnectOptions);
MqttMessage mqttMessage = new MqttMessage();
mqttMessage.setPayload(payload.getBytes());
mqttAsyncClient.publish(topic, mqttMessage);
} catch (MqttException e) {
e.printStackTrace();
}
}
public void connectionLost(Throwable throwable) {
}
public void messageArrived(String s, MqttMessage mqttMessage) throws Exception {
}
public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
System.out.println("Message delivered!");
}
}
以下是我的主要class
public class Main {
public static void main(String[] args) {
Producer producer = new Producer("tcp://127.0.0.1:1883", "producer1");
producer.doProduce("dummyTopic", "dummyMessage");
}
}
问题
当我 运行 我的应用程序时,我在输出中看到 Client is not connected (32104)
异常。
如果我在 Producer
class 中将行 mqttAsyncClient.connect(mqttConnectOptions);
更改为 mqttAsyncClient.connect(mqttConnectOptions).waitForCompletion();
,我可以成功连接到代理,并且可以在 Message delivered!
中看到输出。
如果我没记错的话 waitForCompletion()
将阻止调用直到收到响应。通过添加这一行,我有效地将我的 AsyncClient 连接更改为阻塞连接,这不是我想要的方法。
问题
我该如何解决这个问题,使 paho MQTT 客户端以非阻塞方式连接到代理?一路上我错过了什么吗?
这包含在 IMqttAsyncClient
的文档中IMqttToken token method(parms, Object userContext, IMqttActionListener callback)
In this form a callback is registered with the method. The callback will be notified when the action succeeds or fails. The callback is invoked on the thread managed by the MQTT client so it is important that processing is minimised in the callback. If not the operation of the MQTT client will be inhibited. For example to be notified (called back) when a connect completes:
IMqttToken conToken; conToken = asyncClient.connect("some context", new MqttAsyncActionListener() { public void onSuccess(IMqttToken asyncActionToken) { log("Connected"); } public void onFailure(IMqttToken asyncActionToken, Throwable exception) { log ("connect failed" +exception); } });
An optional context object can be passed into the method which will then be made available in the callback. The context is stored by the MQTT client) in the token which is then returned to the invoker. The token is provided to the callback methods where the context can then be accessed.
因此您的 try/catch 块应该如下所示:
try {
MqttAsyncClient mqttAsyncClient = new MqttAsyncClient(brokerUri, clientId, memoryPersistence);
MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();
mqttConnectOptions.setCleanSession(true);
mqttAsyncClient.setCallback(this);
mqttAsyncClient.connect(mqttConnectOptions, null, new MqttAsyncActionListener() {
public void onSuccess(IMqttToken asyncActionToken) {
MqttMessage mqttMessage = new MqttMessage();
mqttMessage.setPayload(payload.getBytes());
mqttAsyncClient.publish(topic, mqttMessage);
}
public void onFailure(IMqttToken asyncActionToken, Throwable exception) {
exception.printStackTrace();
}
});
} catch (MqttException e) {
e.printStackTrace();
}