仅处理来自单个设备的 Azure IoT 中心事件
Process Azure IoT hub events from a single device only
我正在尝试解决部署了数千个 IoT 设备,将所有事件记录到 Azure IoT 中心,然后只能读取由单个 deviceid 创建的事件的问题。
我一直在使用 EventProcessorHost
来实现这样的功能,但到目前为止我只能看到一种方法来读取来自 的 所有 消息]所有台设备。
读取所有消息并过滤客户端不是一个可行的解决方案,因为可能有数百万条消息。
如果您对 Java/Scala 没意见,此示例显示了如何创建客户端并按设备 ID 过滤消息:
虽然底层客户端从中心读取所有消息。
您也可以考虑使用 IoT 中心消息路由,更多信息请点击此处:
Azure IoT 中心的主要目的是将大量事件从设备摄取到云流管道,以便以实时方式对其进行分析。默认遥测路径(热方式)是通过内置事件中心,其中所有事件都临时存储在 EH 分区中。
除了默认端点(事件)之外,还可以根据规则(条件)将事件消息路由到自定义端点。
请注意,自定义端点的数量限制为 10,规则的数量限制为 100。如果此限制符合您的业务模型,您可以非常轻松地单独流式传输 10 个设备,如 Davis 中所述' 回答。
但是,根据超过此限制 (10+1) 的源(设备)拆分遥测流管道,将需要使用额外的 Azure 实体(组件)。
下图显示了使用Pub/Sub推送模型基于设备拆分遥测流管道的解决方案。
上述解决方案基于使用自定义主题发布者将流事件转发到 Azure 事件网格。事件网格事件的事件架构是 here。
事件网格的自定义主题发布者由 Azure EventHubTrigger 函数表示,其中每个流事件都映射到事件网格事件消息中,主题表示已注册的设备。
Azure 事件网格是一个 Pub/Sub 松散解耦模型,其中事件根据订阅者的订阅传递给订阅者。换句话说,如果没有匹配的传递,事件消息就会消失。
请注意,事件网格路由的能力是每个区域每秒 1000 万个事件。每个区域的订阅数量限制为 1000。
使用REST Api,可以动态创建、更新、删除等订阅
以下代码片段显示了将流事件映射到 EG 事件消息的 AF 实现示例。如您所见,它的实现非常简单:
run.csx:
#r "Newtonsoft.Json"
#r "Microsoft.ServiceBus"
using System.Configuration;
using System.Text;
using System.Threading.Tasks;
using Microsoft.Azure.EventGrid.Models;
using Microsoft.ServiceBus.Messaging;
using Newtonsoft.Json;
// reusable client proxy
static HttpClient client = HttpClientHelper.Client(ConfigurationManager.AppSettings["TopicEndpointEventGrid"], ConfigurationManager.AppSettings["aeg-sas-key"]);
// AF
public static async Task Run(EventData ed, TraceWriter log)
{
log.Info($"C# Event Hub trigger function processed a message:{ed.SequenceNumber}");
// fire EventGrid Custom Topic
var egevent = new EventGridEvent()
{
Id = ed.SequenceNumber.ToString(),
Subject = $"/iothub/events/{ed.SystemProperties["iothub-message-source"] ?? "?"}/{ed.SystemProperties["iothub-connection-device-id"] ?? "?"}",
EventType = "telemetryDataInserted",
EventTime = ed.EnqueuedTimeUtc,
Data = new
{
sysproperties = ed.SystemProperties,
properties = ed.Properties,
body = JsonConvert.DeserializeObject(Encoding.UTF8.GetString(ed.GetBytes()))
}
};
await client.PostAsJsonAsync("", new[] { egevent });
}
// helper
class HttpClientHelper
{
public static HttpClient Client(string address, string key)
{
var client = new HttpClient() { BaseAddress = new Uri(address) };
client.DefaultRequestHeaders.Add("aeg-sas-key", key);
return client;
}
}
function.json:
{
"bindings": [
{
"type": "eventHubTrigger",
"name": "ed",
"direction": "in",
"path": "<yourEventHubName>",
"connection": "<yourIoTHUB>",
"consumerGroup": "<yourGroup>",
"cardinality": "many"
}
],
"disabled": false
}
project.json:
{
"frameworks": {
"net46":{
"dependencies": {
"Microsoft.Azure.EventGrid": "1.1.0-preview"
}
}
}
}
最后,以下屏幕片段显示了设备 1 的 AF 订户收到的事件网格事件消息:
我正在尝试解决部署了数千个 IoT 设备,将所有事件记录到 Azure IoT 中心,然后只能读取由单个 deviceid 创建的事件的问题。
我一直在使用 EventProcessorHost
来实现这样的功能,但到目前为止我只能看到一种方法来读取来自 的 所有 消息]所有台设备。
读取所有消息并过滤客户端不是一个可行的解决方案,因为可能有数百万条消息。
如果您对 Java/Scala 没意见,此示例显示了如何创建客户端并按设备 ID 过滤消息:
虽然底层客户端从中心读取所有消息。
您也可以考虑使用 IoT 中心消息路由,更多信息请点击此处:
Azure IoT 中心的主要目的是将大量事件从设备摄取到云流管道,以便以实时方式对其进行分析。默认遥测路径(热方式)是通过内置事件中心,其中所有事件都临时存储在 EH 分区中。 除了默认端点(事件)之外,还可以根据规则(条件)将事件消息路由到自定义端点。
请注意,自定义端点的数量限制为 10,规则的数量限制为 100。如果此限制符合您的业务模型,您可以非常轻松地单独流式传输 10 个设备,如 Davis 中所述' 回答。
但是,根据超过此限制 (10+1) 的源(设备)拆分遥测流管道,将需要使用额外的 Azure 实体(组件)。
下图显示了使用Pub/Sub推送模型基于设备拆分遥测流管道的解决方案。
上述解决方案基于使用自定义主题发布者将流事件转发到 Azure 事件网格。事件网格事件的事件架构是 here。 事件网格的自定义主题发布者由 Azure EventHubTrigger 函数表示,其中每个流事件都映射到事件网格事件消息中,主题表示已注册的设备。
Azure 事件网格是一个 Pub/Sub 松散解耦模型,其中事件根据订阅者的订阅传递给订阅者。换句话说,如果没有匹配的传递,事件消息就会消失。
请注意,事件网格路由的能力是每个区域每秒 1000 万个事件。每个区域的订阅数量限制为 1000。
使用REST Api,可以动态创建、更新、删除等订阅
以下代码片段显示了将流事件映射到 EG 事件消息的 AF 实现示例。如您所见,它的实现非常简单:
run.csx:
#r "Newtonsoft.Json"
#r "Microsoft.ServiceBus"
using System.Configuration;
using System.Text;
using System.Threading.Tasks;
using Microsoft.Azure.EventGrid.Models;
using Microsoft.ServiceBus.Messaging;
using Newtonsoft.Json;
// reusable client proxy
static HttpClient client = HttpClientHelper.Client(ConfigurationManager.AppSettings["TopicEndpointEventGrid"], ConfigurationManager.AppSettings["aeg-sas-key"]);
// AF
public static async Task Run(EventData ed, TraceWriter log)
{
log.Info($"C# Event Hub trigger function processed a message:{ed.SequenceNumber}");
// fire EventGrid Custom Topic
var egevent = new EventGridEvent()
{
Id = ed.SequenceNumber.ToString(),
Subject = $"/iothub/events/{ed.SystemProperties["iothub-message-source"] ?? "?"}/{ed.SystemProperties["iothub-connection-device-id"] ?? "?"}",
EventType = "telemetryDataInserted",
EventTime = ed.EnqueuedTimeUtc,
Data = new
{
sysproperties = ed.SystemProperties,
properties = ed.Properties,
body = JsonConvert.DeserializeObject(Encoding.UTF8.GetString(ed.GetBytes()))
}
};
await client.PostAsJsonAsync("", new[] { egevent });
}
// helper
class HttpClientHelper
{
public static HttpClient Client(string address, string key)
{
var client = new HttpClient() { BaseAddress = new Uri(address) };
client.DefaultRequestHeaders.Add("aeg-sas-key", key);
return client;
}
}
function.json:
{
"bindings": [
{
"type": "eventHubTrigger",
"name": "ed",
"direction": "in",
"path": "<yourEventHubName>",
"connection": "<yourIoTHUB>",
"consumerGroup": "<yourGroup>",
"cardinality": "many"
}
],
"disabled": false
}
project.json:
{
"frameworks": {
"net46":{
"dependencies": {
"Microsoft.Azure.EventGrid": "1.1.0-preview"
}
}
}
}
最后,以下屏幕片段显示了设备 1 的 AF 订户收到的事件网格事件消息: