事件发射器循环在节点红色功能中不起作用

Event emitter loop not working in node-red function

我正在尝试使用 node-red 函数通过 AMQP 读取 Azure IoT 中心消息。我已经导入了 azure-iothub 模块。

下面的代码连接 ok 和 getFeedbackReceiver returns AmqpReceiver 对象(我可以看到对象输出到调试选项卡)充当事件发射器。但是,发射器循环 (msgReceiver.on) 似乎没有 运行。我什至没有得到调试选项卡的空输出。

感谢任何帮助。

var azureamqp = global.get('azureamqp');

var iothub = azureamqp.Client.fromConnectionString(cnct);

try 
   {
         iothub.open(function()
         {
            node.send({type: "connection", payload: util.inspect(iothub)});
            node.status({ fill: "green", shape: "ring", text: "listening" }); 

            iothub.getFeedbackReceiver(function(err,msgReceiver)
                {
                    if (!err) 
                    {
                        node.send({type: "receiver", payload: util.inspect(msgReceiver)});
                        msgReceiver.on('message',function(message)
                            {
                                node.send({payload:message}); 
                            });                        
                    }
                    else
                    {
                        node.send({payload: err});
                    }
                });

            node.send({type: "streamend"}); 
            node.status({ fill: "red", shape: "ring", text: "disconnected" }); 
         });
   }
catch (err)
   {}

好的,我想我明白发生了什么,多亏了额外的评论:首先,一些关于使用 IoT 中心进行消息传递的参考文档: https://docs.microsoft.com/en-us/azure/iot-hub/iot-hub-devguide-messaging

有 2 种类型的消息通过 IoT 中心发送,具体取决于消息的发送方向:

  • 云到设备(C2D,或命令):这些消息由云应用程序发送到一个或多个设备。它们存储在特定于设备的队列中,并在设备连接并开始侦听消息后立即传送到设备。设备收到后,可以选择将有关这些的反馈发送到 IoT 中心。使用 azure-iothub.Client.getFeedbackReceiver() API 接收此反馈。我在答案的末尾添加了更多相关信息。

  • 设备到云(D2C,或遥测):这些消息由设备发送到云应用程序。这些消息存储在事件中心分区中,可以从这些分区中读取。 Event Hubs SDK

  • 会发生这种情况

从问题和评论来看,您似乎正在尝试使用反馈 API 接收 D2C 消息(遥测)- 但它不起作用,因为它不应该以这种方式工作。这是对(坏?)API 设计和缺少文档的良好反馈。

如何接收设备发送的消息:

This sample on github shows how to set up an Event Hubs client and can be used to listen to messages sent by devices to IoT Hub.This command in iothub-explorer 也是一个简单的参考。

请在下面找到一个简单的例子:

'use strict';
var EventHubClient = require('azure-event-hubs').Client;
var Promise = require('bluebird');

var connectionString = '[IoT Hub Connection String]';
var client = EventHubClient.fromConnectionString(connectionString);
var receiveAfterTime = Date.now() - 5000;

var printError = function (err) {
  console.error(err.message);
};

var printEvent = function (ehEvent) {
  console.log('Event Received: ');
  console.log(JSON.stringify(ehEvent.body));
  console.log('');
};

client.open()
      .then(client.getPartitionIds.bind(client))
      .then(function (partitionIds) {
        return Promise.map(partitionIds, function (partitionId) {
          return client.createReceiver('$Default', partitionId, { 'startAfterTime' : receiveAfterTime}).then(function(receiver) {
            receiver.on('errorReceived', printError);
            receiver.on('message', printEvent);
          });
        });
      }).catch(printError);

关于该代码示例的更多信息:

基本上,事件中心客户端提供的 AMQP 连接可用于在事件中心的每个分区上打开接收器。分区用于存储设备发送的消息。每个分区都有自己的接收者,每个接收者都有一个 message 事件。因此需要为每个分区打开一个接收器,以免错过来自任何设备的任何消息。下面是关于事件中心和分区性质的更多信息:https://docs.microsoft.com/en-us/azure/event-hubs/event-hubs-what-is-event-hubs

有关 C2D 反馈的更多信息:

设备可以发送关于 C2D 消息的 3 种类型的反馈:

  • accept(或complete)表示消息由设备处理,IoT Hub 将从设备队列
  • reject 表示设备不想要此消息(可能是因为它格式错误或不相关,这取决于您的设备)并且 IoT 中心将删除该消息从队列中。
  • abandon 表示设备现在无法 "take care" 这条消息,希望 IoT 中心稍后重新发送它。邮件保留在队列中。

acceptreject 都将使用 getFeedbackReceiver API 以及如果消息从未收到或被放弃次数过多,则超时。