仅处理来自单个设备的 Azure IoT 中心事件

Process Azure IoT hub events from a single device only

我正在尝试解决部署了数千个 IoT 设备,将所有事件记录到 Azure IoT 中心,然后只能读取由单个 deviceid 创建的事件的问题。

我一直在使用 EventProcessorHost 来实现这样的功能,但到目前为止我只能看到一种方法来读取来自 所有 消息]所有台设备。

读取所有消息并过滤客户端不是一个可行的解决方案,因为可能有数百万条消息。

如果您对 Java/Scala 没意见,此示例显示了如何创建客户端并按设备 ID 过滤消息:

https://github.com/Azure/toketi-iothubreact/blob/master/samples-scala/src/main/scala/A_APIUSage/Demo.scala#L266

虽然底层客户端从中心读取所有消息。

您也可以考虑使用 IoT 中心消息路由,更多信息请点击此处:

  1. https://azure.microsoft.com/blog/azure-iot-hub-message-routing-enhances-device-telemetry-and-optimizes-iot-infrastructure-resources

  2. https://azure.microsoft.com/blog/iot-hub-message-routing-now-with-routing-on-message-body

Azure IoT 中心的主要目的是将大量事件从设备摄取到云流管道,以便以实时方式对其进行分析。默认遥测路径(热方式)是通过内置事件中心,其中所有事件都临时存储在 EH 分区中。 除了默认端点(事件)之外,还可以根据规则(条件)将事件消息路由到自定义端点。

请注意,自定义端点的数量限制为 10,规则的数量限制为 100。如果此限制符合您的业务模型,您可以非常轻松地单独流式传输 10 个设备,如 Davis 中所述' 回答。

但是,根据超过此限制 (10+1) 的源(设备)拆分遥测流管道,将需要使用额外的 Azure 实体(组件)。

下图显示了使用Pub/Sub推送模型基于设备拆分遥测流管道的解决方案。

上述解决方案基于使用自定义主题发布者将流事件转发到 Azure 事件网格。事件网格事件的事件架构是 here。 事件网格的自定义主题发布者由 Azure EventHubTrigger 函数表示,其中每个流事件都映射到事件网格事件消息中,主题表示已注册的设备。

Azure 事件网格是一个 Pub/Sub 松散解耦模型,其中事件根据订阅者的订阅传递给订阅者。换句话说,如果没有匹配的传递,事件消息就会消失。

请注意,事件网格路由的能力是每个区域每秒 1000 万个事件。每个区域的订阅数量限制为 1000。

使用REST Api,可以动态创建、更新、删除等订阅

以下代码片段显示了将流事件映射到 EG 事件消息的 AF 实现示例。如您所见,它的实现非常简单:

run.csx:

#r "Newtonsoft.Json"
#r "Microsoft.ServiceBus"


using System.Configuration;
using System.Text;
using System.Threading.Tasks;
using Microsoft.Azure.EventGrid.Models;
using Microsoft.ServiceBus.Messaging;
using Newtonsoft.Json;

// reusable client proxy
static HttpClient client = HttpClientHelper.Client(ConfigurationManager.AppSettings["TopicEndpointEventGrid"], ConfigurationManager.AppSettings["aeg-sas-key"]);

// AF
public static async Task Run(EventData ed, TraceWriter log)
{
    log.Info($"C# Event Hub trigger function processed a message:{ed.SequenceNumber}"); 

    // fire EventGrid Custom Topic
    var egevent = new EventGridEvent()
    {
        Id = ed.SequenceNumber.ToString(),
        Subject = $"/iothub/events/{ed.SystemProperties["iothub-message-source"] ?? "?"}/{ed.SystemProperties["iothub-connection-device-id"] ?? "?"}",
        EventType = "telemetryDataInserted",
        EventTime = ed.EnqueuedTimeUtc,
        Data = new
        {
            sysproperties = ed.SystemProperties,
            properties = ed.Properties,
            body = JsonConvert.DeserializeObject(Encoding.UTF8.GetString(ed.GetBytes()))
        }
    };
    await client.PostAsJsonAsync("", new[] { egevent });  
}

// helper
class HttpClientHelper
{
    public static HttpClient Client(string address, string key)
    {      
        var client = new HttpClient() { BaseAddress = new Uri(address) };
        client.DefaultRequestHeaders.Add("aeg-sas-key", key);
        return client;      
    }
}

function.json:

    {
      "bindings": [
        {
         "type": "eventHubTrigger",
         "name": "ed",
         "direction": "in",
         "path": "<yourEventHubName>",
         "connection": "<yourIoTHUB>",
         "consumerGroup": "<yourGroup>",
         "cardinality": "many"
        }
      ],
     "disabled": false
  }

project.json:

{
  "frameworks": {
    "net46":{
      "dependencies": {
        "Microsoft.Azure.EventGrid": "1.1.0-preview"
      }
    }
   }
}

最后,以下屏幕片段显示了设备 1 的 AF 订户收到的事件网格事件消息: