事件发射器循环在节点红色功能中不起作用
Event emitter loop not working in node-red function
我正在尝试使用 node-red 函数通过 AMQP 读取 Azure IoT 中心消息。我已经导入了 azure-iothub 模块。
下面的代码连接 ok 和 getFeedbackReceiver returns AmqpReceiver 对象(我可以看到对象输出到调试选项卡)充当事件发射器。但是,发射器循环 (msgReceiver.on) 似乎没有 运行。我什至没有得到调试选项卡的空输出。
感谢任何帮助。
var azureamqp = global.get('azureamqp');
var iothub = azureamqp.Client.fromConnectionString(cnct);
try
{
iothub.open(function()
{
node.send({type: "connection", payload: util.inspect(iothub)});
node.status({ fill: "green", shape: "ring", text: "listening" });
iothub.getFeedbackReceiver(function(err,msgReceiver)
{
if (!err)
{
node.send({type: "receiver", payload: util.inspect(msgReceiver)});
msgReceiver.on('message',function(message)
{
node.send({payload:message});
});
}
else
{
node.send({payload: err});
}
});
node.send({type: "streamend"});
node.status({ fill: "red", shape: "ring", text: "disconnected" });
});
}
catch (err)
{}
好的,我想我明白发生了什么,多亏了额外的评论:首先,一些关于使用 IoT 中心进行消息传递的参考文档:
https://docs.microsoft.com/en-us/azure/iot-hub/iot-hub-devguide-messaging
有 2 种类型的消息通过 IoT 中心发送,具体取决于消息的发送方向:
云到设备(C2D,或命令):这些消息由云应用程序发送到一个或多个设备。它们存储在特定于设备的队列中,并在设备连接并开始侦听消息后立即传送到设备。设备收到后,可以选择将有关这些的反馈发送到 IoT 中心。使用 azure-iothub.Client.getFeedbackReceiver()
API 接收此反馈。我在答案的末尾添加了更多相关信息。
设备到云(D2C,或遥测):这些消息由设备发送到云应用程序。这些消息存储在事件中心分区中,可以从这些分区中读取。 Event Hubs SDK
会发生这种情况
从问题和评论来看,您似乎正在尝试使用反馈 API 接收 D2C 消息(遥测)- 但它不起作用,因为它不应该以这种方式工作。这是对(坏?)API 设计和缺少文档的良好反馈。
如何接收设备发送的消息:
This sample on github shows how to set up an Event Hubs client and can be used to listen to messages sent by devices to IoT Hub.This command in iothub-explorer 也是一个简单的参考。
请在下面找到一个简单的例子:
'use strict';
var EventHubClient = require('azure-event-hubs').Client;
var Promise = require('bluebird');
var connectionString = '[IoT Hub Connection String]';
var client = EventHubClient.fromConnectionString(connectionString);
var receiveAfterTime = Date.now() - 5000;
var printError = function (err) {
console.error(err.message);
};
var printEvent = function (ehEvent) {
console.log('Event Received: ');
console.log(JSON.stringify(ehEvent.body));
console.log('');
};
client.open()
.then(client.getPartitionIds.bind(client))
.then(function (partitionIds) {
return Promise.map(partitionIds, function (partitionId) {
return client.createReceiver('$Default', partitionId, { 'startAfterTime' : receiveAfterTime}).then(function(receiver) {
receiver.on('errorReceived', printError);
receiver.on('message', printEvent);
});
});
}).catch(printError);
关于该代码示例的更多信息:
基本上,事件中心客户端提供的 AMQP 连接可用于在事件中心的每个分区上打开接收器。分区用于存储设备发送的消息。每个分区都有自己的接收者,每个接收者都有一个 message
事件。因此需要为每个分区打开一个接收器,以免错过来自任何设备的任何消息。下面是关于事件中心和分区性质的更多信息:https://docs.microsoft.com/en-us/azure/event-hubs/event-hubs-what-is-event-hubs
有关 C2D 反馈的更多信息:
设备可以发送关于 C2D 消息的 3 种类型的反馈:
- accept(或complete)表示消息由设备处理,IoT Hub 将从设备队列
- reject 表示设备不想要此消息(可能是因为它格式错误或不相关,这取决于您的设备)并且 IoT 中心将删除该消息从队列中。
- abandon 表示设备现在无法 "take care" 这条消息,希望 IoT 中心稍后重新发送它。邮件保留在队列中。
accept 和 reject 都将使用 getFeedbackReceiver
API 以及如果消息从未收到或被放弃次数过多,则超时。
我正在尝试使用 node-red 函数通过 AMQP 读取 Azure IoT 中心消息。我已经导入了 azure-iothub 模块。
下面的代码连接 ok 和 getFeedbackReceiver returns AmqpReceiver 对象(我可以看到对象输出到调试选项卡)充当事件发射器。但是,发射器循环 (msgReceiver.on) 似乎没有 运行。我什至没有得到调试选项卡的空输出。
感谢任何帮助。
var azureamqp = global.get('azureamqp');
var iothub = azureamqp.Client.fromConnectionString(cnct);
try
{
iothub.open(function()
{
node.send({type: "connection", payload: util.inspect(iothub)});
node.status({ fill: "green", shape: "ring", text: "listening" });
iothub.getFeedbackReceiver(function(err,msgReceiver)
{
if (!err)
{
node.send({type: "receiver", payload: util.inspect(msgReceiver)});
msgReceiver.on('message',function(message)
{
node.send({payload:message});
});
}
else
{
node.send({payload: err});
}
});
node.send({type: "streamend"});
node.status({ fill: "red", shape: "ring", text: "disconnected" });
});
}
catch (err)
{}
好的,我想我明白发生了什么,多亏了额外的评论:首先,一些关于使用 IoT 中心进行消息传递的参考文档: https://docs.microsoft.com/en-us/azure/iot-hub/iot-hub-devguide-messaging
有 2 种类型的消息通过 IoT 中心发送,具体取决于消息的发送方向:
云到设备(C2D,或命令):这些消息由云应用程序发送到一个或多个设备。它们存储在特定于设备的队列中,并在设备连接并开始侦听消息后立即传送到设备。设备收到后,可以选择将有关这些的反馈发送到 IoT 中心。使用
azure-iothub.Client.getFeedbackReceiver()
API 接收此反馈。我在答案的末尾添加了更多相关信息。设备到云(D2C,或遥测):这些消息由设备发送到云应用程序。这些消息存储在事件中心分区中,可以从这些分区中读取。 Event Hubs SDK
会发生这种情况
从问题和评论来看,您似乎正在尝试使用反馈 API 接收 D2C 消息(遥测)- 但它不起作用,因为它不应该以这种方式工作。这是对(坏?)API 设计和缺少文档的良好反馈。
如何接收设备发送的消息:
This sample on github shows how to set up an Event Hubs client and can be used to listen to messages sent by devices to IoT Hub.This command in iothub-explorer 也是一个简单的参考。
请在下面找到一个简单的例子:
'use strict';
var EventHubClient = require('azure-event-hubs').Client;
var Promise = require('bluebird');
var connectionString = '[IoT Hub Connection String]';
var client = EventHubClient.fromConnectionString(connectionString);
var receiveAfterTime = Date.now() - 5000;
var printError = function (err) {
console.error(err.message);
};
var printEvent = function (ehEvent) {
console.log('Event Received: ');
console.log(JSON.stringify(ehEvent.body));
console.log('');
};
client.open()
.then(client.getPartitionIds.bind(client))
.then(function (partitionIds) {
return Promise.map(partitionIds, function (partitionId) {
return client.createReceiver('$Default', partitionId, { 'startAfterTime' : receiveAfterTime}).then(function(receiver) {
receiver.on('errorReceived', printError);
receiver.on('message', printEvent);
});
});
}).catch(printError);
关于该代码示例的更多信息:
基本上,事件中心客户端提供的 AMQP 连接可用于在事件中心的每个分区上打开接收器。分区用于存储设备发送的消息。每个分区都有自己的接收者,每个接收者都有一个 message
事件。因此需要为每个分区打开一个接收器,以免错过来自任何设备的任何消息。下面是关于事件中心和分区性质的更多信息:https://docs.microsoft.com/en-us/azure/event-hubs/event-hubs-what-is-event-hubs
有关 C2D 反馈的更多信息:
设备可以发送关于 C2D 消息的 3 种类型的反馈:
- accept(或complete)表示消息由设备处理,IoT Hub 将从设备队列
- reject 表示设备不想要此消息(可能是因为它格式错误或不相关,这取决于您的设备)并且 IoT 中心将删除该消息从队列中。
- abandon 表示设备现在无法 "take care" 这条消息,希望 IoT 中心稍后重新发送它。邮件保留在队列中。
accept 和 reject 都将使用 getFeedbackReceiver
API 以及如果消息从未收到或被放弃次数过多,则超时。