将单个 json 从 azure iot hub 存储到 datalake2
Store single json from azure iot hub to datalake2
我添加了物联网集线器和设备。来自 iot hub 的所有数据都以 json 格式保存到数据湖 2。工作正常,但如果同时有来自设备的多条消息,它会保存在一个 json 中。它会引起一些麻烦...有没有办法将每个消息事件保存在单独的 json 中?我查看了物联网集线器的设置,但一无所获。
IoT Hub 路由机制中没有始终将单个消息转发到存储的设置。基本上,这个要求可以通过流管道消费者 (IoTHubTrigger) 或事件网格订阅者 (EventGridTrigger) 中的 azure 函数来实现。
更新:
以下是 IoTHubTrigger 函数的示例,其中输出 blob 绑定到 Data Lake Storage Gen2 的容器:
run.csx:
#r "Microsoft.Azure.EventHubs"
#r "Newtonsoft.Json"
#r "Microsoft.WindowsAzure.Storage"
using System;
using System.IO;
using System.Text;
using System.Linq;
using Microsoft.Azure.EventHubs;
using Microsoft.WindowsAzure.Storage.Blob;
using Newtonsoft.Json;
using Newtonsoft.Json.Linq;
public static async Task Run(EventData ed, CloudBlockBlob outputBlob, ILogger log)
{
//log.LogInformation($"DeviceId = {ed.SystemProperties["iothub-connection-device-id"]}\r\n{JObject.Parse(Encoding.ASCII.GetString(ed.Body))}");
var msg = new {
EnqueuedTimeUtc = ed.SystemProperties["iothub-enqueuedtime"],
Properties = ed.Properties,
SystemProperties = new {
connectionDeviceId = ed.SystemProperties["iothub-connection-device-id"],
connectionAuthMethod = ed.SystemProperties["iothub-connection-auth-method"],
connectionDeviceGenerationId = ed.SystemProperties["iothub-connection-auth-generation-id"],
enqueuedTime = ed.SystemProperties["iothub-enqueuedtime"]
},
Body = JObject.Parse(Encoding.ASCII.GetString(ed.Body))
};
byte[] buffer = Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(msg));
await outputBlob.UploadFromStreamAsync(new MemoryStream(buffer));
await Task.CompletedTask;
}
function.json:
{
"bindings": [
{
"name": "ed",
"connection": "rk2020iot_IOTHUB",
"eventHubName": "rk2020iot_IOTHUBNAME",
"consumerGroup": "function",
"cardinality": "one",
"direction": "in",
"type": "eventHubTrigger"
},
{
"name": "outputBlob",
"path": "iot/rk2020iot/{DateTime}.json",
"connection": "rk2020datalake2_STORAGE",
"direction": "out",
"type": "blob"
}
]
}
我添加了物联网集线器和设备。来自 iot hub 的所有数据都以 json 格式保存到数据湖 2。工作正常,但如果同时有来自设备的多条消息,它会保存在一个 json 中。它会引起一些麻烦...有没有办法将每个消息事件保存在单独的 json 中?我查看了物联网集线器的设置,但一无所获。
IoT Hub 路由机制中没有始终将单个消息转发到存储的设置。基本上,这个要求可以通过流管道消费者 (IoTHubTrigger) 或事件网格订阅者 (EventGridTrigger) 中的 azure 函数来实现。
更新:
以下是 IoTHubTrigger 函数的示例,其中输出 blob 绑定到 Data Lake Storage Gen2 的容器:
run.csx:
#r "Microsoft.Azure.EventHubs"
#r "Newtonsoft.Json"
#r "Microsoft.WindowsAzure.Storage"
using System;
using System.IO;
using System.Text;
using System.Linq;
using Microsoft.Azure.EventHubs;
using Microsoft.WindowsAzure.Storage.Blob;
using Newtonsoft.Json;
using Newtonsoft.Json.Linq;
public static async Task Run(EventData ed, CloudBlockBlob outputBlob, ILogger log)
{
//log.LogInformation($"DeviceId = {ed.SystemProperties["iothub-connection-device-id"]}\r\n{JObject.Parse(Encoding.ASCII.GetString(ed.Body))}");
var msg = new {
EnqueuedTimeUtc = ed.SystemProperties["iothub-enqueuedtime"],
Properties = ed.Properties,
SystemProperties = new {
connectionDeviceId = ed.SystemProperties["iothub-connection-device-id"],
connectionAuthMethod = ed.SystemProperties["iothub-connection-auth-method"],
connectionDeviceGenerationId = ed.SystemProperties["iothub-connection-auth-generation-id"],
enqueuedTime = ed.SystemProperties["iothub-enqueuedtime"]
},
Body = JObject.Parse(Encoding.ASCII.GetString(ed.Body))
};
byte[] buffer = Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(msg));
await outputBlob.UploadFromStreamAsync(new MemoryStream(buffer));
await Task.CompletedTask;
}
function.json:
{
"bindings": [
{
"name": "ed",
"connection": "rk2020iot_IOTHUB",
"eventHubName": "rk2020iot_IOTHUBNAME",
"consumerGroup": "function",
"cardinality": "one",
"direction": "in",
"type": "eventHubTrigger"
},
{
"name": "outputBlob",
"path": "iot/rk2020iot/{DateTime}.json",
"connection": "rk2020datalake2_STORAGE",
"direction": "out",
"type": "blob"
}
]
}