Azure node IOT SDK如何设置MQTT异常回调
Azure node IOT SDK how to set callbacks for MQTT exceptions
我已经在几个项目中使用了 Azure IOT SDK for C。有很好的文档,并且很清楚如何注册回调以在无法发送消息或 MQTT 连接断开时收到通知。查看节点的 javascript SDK,我不知道如何注册类似的回调。例如,在创建客户端后,我可以调用 client.open(onConnect)
并在建立连接时调用 onConnect
方法。我还可以调用 client.sendEvent(message,...)
发送消息,如果成功,我会收到消息已排队的通知。然而:
- 如果mqtt连接中断,如何注册一个回调通知?
- 如何注册一个回调,以便在成功接收数据包(QOS 1 或 2)或发送失败时获知?
- 如何指定我想要的 QOS 级别?
谢谢
首先,关于 IoT 中心 MQTT QoS 支持的几点注意事项:
- IoT Hub does not support QoS 2 messages. If a device app publishes a message with QoS 2, IoT Hub closes the network connection. When a
device app subscribes to a topic with QoS 2, IoT Hub grants maximum
QoS level 1 in the SUBACK packet. After that, IoT Hub delivers
messages to the device using QoS 1.
- By default, the device SDKs connect to an IoT Hub with QoS 1 for message exchange with the IoT hub.
详情请参考Communicate with your IoT hub using the MQTT protocol。
现在进入这个问题,你有 2 个选择:
使用 Azure 物联网 SDK:
如果您想使用 IoT hub device sdk, below is one reference from official sample 来处理断开连接的情况。关于 QoS 1 相关交换(如 PUBACK 确认),它不允许在较低级别,但在内部处理以确保有保证的消息传递。直接使用MQTT,参考我后面的回答
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
'use strict';
const Protocol = require('azure-iot-device-mqtt').Mqtt;
// Uncomment one of these transports and then change it in fromConnectionString to test other transports
// const Protocol = require('azure-iot-device-amqp').AmqpWs;
// const Protocol = require('azure-iot-device-http').Http;
// const Protocol = require('azure-iot-device-amqp').Amqp;
// const Protocol = require('azure-iot-device-mqtt').MqttWs;
const Client = require('azure-iot-device').Client;
const Message = require('azure-iot-device').Message;
// String containing Hostname, Device Id & Device Key in the following formats:
// "HostName=<iothub_host_name>;DeviceId=<device_id>;SharedAccessKey=<device_key>"
const deviceConnectionString = process.env.DEVICE_CONNECTION_STRING;
let sendInterval;
function disconnectHandler () {
clearInterval(sendInterval);
client.open().catch((err) => {
console.error(err.message);
});
}
// The AMQP and HTTP transports have the notion of completing, rejecting or abandoning the message.
// For example, this is only functional in AMQP and HTTP:
// client.complete(msg, printResultFor('completed'));
// If using MQTT calls to complete, reject, or abandon are no-ops.
// When completing a message, the service that sent the C2D message is notified that the message has been processed.
// When rejecting a message, the service that sent the C2D message is notified that the message won't be processed by the device. the method to use is client.reject(msg, callback).
// When abandoning the message, IoT Hub will immediately try to resend it. The method to use is client.abandon(msg, callback).
// MQTT is simpler: it accepts the message by default, and doesn't support rejecting or abandoning a message.
function messageHandler (msg) {
console.log('Id: ' + msg.messageId + ' Body: ' + msg.data);
client.complete(msg, printResultFor('completed'));
}
function generateMessage () {
const windSpeed = 10 + (Math.random() * 4); // range: [10, 14]
const temperature = 20 + (Math.random() * 10); // range: [20, 30]
const humidity = 60 + (Math.random() * 20); // range: [60, 80]
const data = JSON.stringify({ deviceId: 'myFirstDevice', windSpeed: windSpeed, temperature: temperature, humidity: humidity });
const message = new Message(data);
message.properties.add('temperatureAlert', (temperature > 28) ? 'true' : 'false');
return message;
}
function errorCallback (err) {
console.error(err.message);
}
function connectCallback () {
console.log('Client connected');
// Create a message and send it to the IoT Hub every two seconds
sendInterval = setInterval(() => {
const message = generateMessage();
console.log('Sending message: ' + message.getData());
client.sendEvent(message, printResultFor('send'));
}, 2000);
}
// fromConnectionString must specify a transport constructor, coming from any transport package.
let client = Client.fromConnectionString(deviceConnectionString, Protocol);
client.on('connect', connectCallback);
client.on('error', errorCallback);
client.on('disconnect', disconnectHandler);
client.on('message', messageHandler);
client.open()
.catch(err => {
console.error('Could not connect: ' + err.message);
});
// Helper function to print results in the console
function printResultFor(op) {
return function printResult(err, res) {
if (err) console.log(op + ' error: ' + err.toString());
if (res) console.log(op + ' status: ' + res.constructor.name);
};
}
直接使用 MQTT:
如Using the MQTT protocol directly (as a device), you can also connect to the public device endpoints using the MQTT protocol on port 8883. For that you can use any standard MQTT library available (for example https://www.npmjs.com/package/mqtt中所述)。但是,请注意以下几点以供特殊考虑:
In the CONNECT packet, the device should use the following values:
- For the ClientId field, use the deviceId.
- For the Username field, use {iothubhostname}/{device_id}/?api-version=2018-06-30, where
{iothubhostname} is the full CName of the IoT hub. For example, if the
name of your IoT hub is contoso.azure-devices.net and if the name of
your device is MyDevice01, the full Username field should contain:
contoso.azure-devices.net/MyDevice01/?api-version=2018-06-30
- For the Password field, use a SAS token. The format of the SAS token is the same as for both the HTTPS and AMQP protocols:
SharedAccessSignature sig={signature-string}&se={expiry}&sr={URL-encoded-resourceURI}
Note
If you use X.509 certificate authentication, SAS token passwords are
not required. For more information, see Set up X.509 security in your
Azure IoT
Hub
and follow code instructions in the TLS/SSL configuration
section.
For more information about how to generate SAS tokens, see the device
section of Using IoT Hub security
tokens.
我已经在几个项目中使用了 Azure IOT SDK for C。有很好的文档,并且很清楚如何注册回调以在无法发送消息或 MQTT 连接断开时收到通知。查看节点的 javascript SDK,我不知道如何注册类似的回调。例如,在创建客户端后,我可以调用 client.open(onConnect)
并在建立连接时调用 onConnect
方法。我还可以调用 client.sendEvent(message,...)
发送消息,如果成功,我会收到消息已排队的通知。然而:
- 如果mqtt连接中断,如何注册一个回调通知?
- 如何注册一个回调,以便在成功接收数据包(QOS 1 或 2)或发送失败时获知?
- 如何指定我想要的 QOS 级别?
谢谢
首先,关于 IoT 中心 MQTT QoS 支持的几点注意事项:
- IoT Hub does not support QoS 2 messages. If a device app publishes a message with QoS 2, IoT Hub closes the network connection. When a device app subscribes to a topic with QoS 2, IoT Hub grants maximum QoS level 1 in the SUBACK packet. After that, IoT Hub delivers messages to the device using QoS 1.
- By default, the device SDKs connect to an IoT Hub with QoS 1 for message exchange with the IoT hub.
详情请参考Communicate with your IoT hub using the MQTT protocol。
现在进入这个问题,你有 2 个选择:
使用 Azure 物联网 SDK:
如果您想使用 IoT hub device sdk, below is one reference from official sample 来处理断开连接的情况。关于 QoS 1 相关交换(如 PUBACK 确认),它不允许在较低级别,但在内部处理以确保有保证的消息传递。直接使用MQTT,参考我后面的回答
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
'use strict';
const Protocol = require('azure-iot-device-mqtt').Mqtt;
// Uncomment one of these transports and then change it in fromConnectionString to test other transports
// const Protocol = require('azure-iot-device-amqp').AmqpWs;
// const Protocol = require('azure-iot-device-http').Http;
// const Protocol = require('azure-iot-device-amqp').Amqp;
// const Protocol = require('azure-iot-device-mqtt').MqttWs;
const Client = require('azure-iot-device').Client;
const Message = require('azure-iot-device').Message;
// String containing Hostname, Device Id & Device Key in the following formats:
// "HostName=<iothub_host_name>;DeviceId=<device_id>;SharedAccessKey=<device_key>"
const deviceConnectionString = process.env.DEVICE_CONNECTION_STRING;
let sendInterval;
function disconnectHandler () {
clearInterval(sendInterval);
client.open().catch((err) => {
console.error(err.message);
});
}
// The AMQP and HTTP transports have the notion of completing, rejecting or abandoning the message.
// For example, this is only functional in AMQP and HTTP:
// client.complete(msg, printResultFor('completed'));
// If using MQTT calls to complete, reject, or abandon are no-ops.
// When completing a message, the service that sent the C2D message is notified that the message has been processed.
// When rejecting a message, the service that sent the C2D message is notified that the message won't be processed by the device. the method to use is client.reject(msg, callback).
// When abandoning the message, IoT Hub will immediately try to resend it. The method to use is client.abandon(msg, callback).
// MQTT is simpler: it accepts the message by default, and doesn't support rejecting or abandoning a message.
function messageHandler (msg) {
console.log('Id: ' + msg.messageId + ' Body: ' + msg.data);
client.complete(msg, printResultFor('completed'));
}
function generateMessage () {
const windSpeed = 10 + (Math.random() * 4); // range: [10, 14]
const temperature = 20 + (Math.random() * 10); // range: [20, 30]
const humidity = 60 + (Math.random() * 20); // range: [60, 80]
const data = JSON.stringify({ deviceId: 'myFirstDevice', windSpeed: windSpeed, temperature: temperature, humidity: humidity });
const message = new Message(data);
message.properties.add('temperatureAlert', (temperature > 28) ? 'true' : 'false');
return message;
}
function errorCallback (err) {
console.error(err.message);
}
function connectCallback () {
console.log('Client connected');
// Create a message and send it to the IoT Hub every two seconds
sendInterval = setInterval(() => {
const message = generateMessage();
console.log('Sending message: ' + message.getData());
client.sendEvent(message, printResultFor('send'));
}, 2000);
}
// fromConnectionString must specify a transport constructor, coming from any transport package.
let client = Client.fromConnectionString(deviceConnectionString, Protocol);
client.on('connect', connectCallback);
client.on('error', errorCallback);
client.on('disconnect', disconnectHandler);
client.on('message', messageHandler);
client.open()
.catch(err => {
console.error('Could not connect: ' + err.message);
});
// Helper function to print results in the console
function printResultFor(op) {
return function printResult(err, res) {
if (err) console.log(op + ' error: ' + err.toString());
if (res) console.log(op + ' status: ' + res.constructor.name);
};
}
直接使用 MQTT:
如Using the MQTT protocol directly (as a device), you can also connect to the public device endpoints using the MQTT protocol on port 8883. For that you can use any standard MQTT library available (for example https://www.npmjs.com/package/mqtt中所述)。但是,请注意以下几点以供特殊考虑:
In the CONNECT packet, the device should use the following values:
- For the ClientId field, use the deviceId.
- For the Username field, use {iothubhostname}/{device_id}/?api-version=2018-06-30, where {iothubhostname} is the full CName of the IoT hub. For example, if the name of your IoT hub is contoso.azure-devices.net and if the name of your device is MyDevice01, the full Username field should contain:
contoso.azure-devices.net/MyDevice01/?api-version=2018-06-30
- For the Password field, use a SAS token. The format of the SAS token is the same as for both the HTTPS and AMQP protocols:
SharedAccessSignature sig={signature-string}&se={expiry}&sr={URL-encoded-resourceURI}
Note
If you use X.509 certificate authentication, SAS token passwords are not required. For more information, see Set up X.509 security in your Azure IoT Hub and follow code instructions in the TLS/SSL configuration section.
For more information about how to generate SAS tokens, see the device section of Using IoT Hub security tokens.