我可以在将数据从 Azure 流分析存储到 Blob 存储时在路径中添加 DeviceID
Can I add DeviceID in path while storing data from Azure Stream Analytics to Blob storage
我有数据从不同的设备传入到 IoT 中心,使用流分析对其进行处理并将其存储在 blob 存储中。
我知道我们可以根据需要的格式在路径中添加我们添加的{date}{time},在那个路径中我们也可以添加deviceId。
示例:对于 2018/10/30/01 (Date/month/day/hour) 可以在存储到 blob 时在该路径中添加 /deviceId
I know we can add {date}{time} we add in the path according to needed
format, in that path can we add deviceId too.'
正如@Peter Bons 在 comment 中提到的那样,目前尚不支持输出中的变量名称。
作为解决方法,您可以使用 Blob Trigger Azure Function。您需要在输出列中传递 deviceId
,然后在 blob 触发器函数中获取它。然后使用blob sdk创建/deviceId
目录将blob复制进去并删除之前的blob。
以下是针对您的案例的解决方法示例。它基于使用 azure 函数 (HttpTrigger) 输出 ASA 作业以推送方式将数据附加到特定的 blob 存储。
请注意,以下解决方法使用 最大批处理计数 将事件传送到 azure 函数值 1(当时一个遥测数据)。
ASA职位查询:
SELECT
System.Timestamp as [time], *
INTO outAF
FROM
iot TIMESTAMP BY time
Azure 函数 (HttpTrigger):
run.csx
#r "Newtonsoft.Json"
#r "Microsoft.WindowsAzure.Storage"
using System.Net;
using Microsoft.AspNetCore.Mvc;
using Microsoft.Extensions.Primitives;
using Microsoft.WindowsAzure.Storage.Blob;
using Newtonsoft.Json;
using Newtonsoft.Json.Linq;
public static async Task<IActionResult> Run(string body, CloudBlobContainer blobContainer, ILogger log)
{
log.LogInformation($"{body}");
var jtoken = JToken.Parse(body);
var jobject = jtoken is JArray ? jtoken.SingleOrDefault<JToken>() : jtoken;
if(jobject != null)
{
var jtext = jobject.ToString(Formatting.None);
var data = JsonConvert.DeserializeAnonymousType(jtext, new {IoTHub = new { ConnectionDeviceId = ""}});
var blobName = $"{DateTime.UtcNow.ToString("yyyy/MM/dd/hh")}/{data.IoTHub.ConnectionDeviceId}";
var blob = blobContainer.GetAppendBlobReference(blobName);
if(!await blob.ExistsAsync())
{
await blob.CreateOrReplaceAsync();
}
await blob.AppendTextAsync(jtext + "\r\n");
}
return new NoContentResult();
}
function.json
{
"bindings": [
{
"authLevel": "function",
"name": "body",
"type": "httpTrigger",
"direction": "in",
"methods": [
"get",
"post"
]
},
{
"name": "blobContainer",
"type": "blob",
"path": "myContainer",
"connection": "mySTORAGE",
"direction": "out"
},
{
"name": "$return",
"type": "http",
"direction": "out"
}
]
}
我有数据从不同的设备传入到 IoT 中心,使用流分析对其进行处理并将其存储在 blob 存储中。 我知道我们可以根据需要的格式在路径中添加我们添加的{date}{time},在那个路径中我们也可以添加deviceId。
示例:对于 2018/10/30/01 (Date/month/day/hour) 可以在存储到 blob 时在该路径中添加 /deviceId
I know we can add {date}{time} we add in the path according to needed format, in that path can we add deviceId too.'
正如@Peter Bons 在 comment 中提到的那样,目前尚不支持输出中的变量名称。
作为解决方法,您可以使用 Blob Trigger Azure Function。您需要在输出列中传递 deviceId
,然后在 blob 触发器函数中获取它。然后使用blob sdk创建/deviceId
目录将blob复制进去并删除之前的blob。
以下是针对您的案例的解决方法示例。它基于使用 azure 函数 (HttpTrigger) 输出 ASA 作业以推送方式将数据附加到特定的 blob 存储。 请注意,以下解决方法使用 最大批处理计数 将事件传送到 azure 函数值 1(当时一个遥测数据)。
ASA职位查询:
SELECT
System.Timestamp as [time], *
INTO outAF
FROM
iot TIMESTAMP BY time
Azure 函数 (HttpTrigger):
run.csx
#r "Newtonsoft.Json"
#r "Microsoft.WindowsAzure.Storage"
using System.Net;
using Microsoft.AspNetCore.Mvc;
using Microsoft.Extensions.Primitives;
using Microsoft.WindowsAzure.Storage.Blob;
using Newtonsoft.Json;
using Newtonsoft.Json.Linq;
public static async Task<IActionResult> Run(string body, CloudBlobContainer blobContainer, ILogger log)
{
log.LogInformation($"{body}");
var jtoken = JToken.Parse(body);
var jobject = jtoken is JArray ? jtoken.SingleOrDefault<JToken>() : jtoken;
if(jobject != null)
{
var jtext = jobject.ToString(Formatting.None);
var data = JsonConvert.DeserializeAnonymousType(jtext, new {IoTHub = new { ConnectionDeviceId = ""}});
var blobName = $"{DateTime.UtcNow.ToString("yyyy/MM/dd/hh")}/{data.IoTHub.ConnectionDeviceId}";
var blob = blobContainer.GetAppendBlobReference(blobName);
if(!await blob.ExistsAsync())
{
await blob.CreateOrReplaceAsync();
}
await blob.AppendTextAsync(jtext + "\r\n");
}
return new NoContentResult();
}
function.json
{
"bindings": [
{
"authLevel": "function",
"name": "body",
"type": "httpTrigger",
"direction": "in",
"methods": [
"get",
"post"
]
},
{
"name": "blobContainer",
"type": "blob",
"path": "myContainer",
"connection": "mySTORAGE",
"direction": "out"
},
{
"name": "$return",
"type": "http",
"direction": "out"
}
]
}