从 ServiceBus 触发的 azure 函数中检索 IoT Hub Twin
Retrieve IoT Hub Twin from ServiceBus triggered azure function
我们正在将数据从 IoT 设备发送到 Azure IoT Hub,并尝试将某种类型的消息传递到 Azure Functions。
目前,我们通过创建 Azure 服务总线终结点并在 IoTHub 中创建消息路由 来实现。它按预期工作,消息被 Azure 函数正确接收。
现在,我们想从 Azure 函数中的 IoT Hub 获取 DeviceId,以及在Device Twin,我完全不知道该怎么做。
如果我们使用EventHubTrigger
,看起来会很简单,做这样的事情:
public static class Test
{
[FunctionName("TestQueueTrigger")]
public static void Run(
[EventHubTrigger("messages/events", Connection = "IoTHubConnection")]
EventData message,
Twin deviceTwin,
TraceWriter log)
{ ... }
}
但确实不清楚如何使用服务总线触发器来完成此操作。
此外,我们希望将所有消息(独立于路由)存储到 Azure Data Lake 存储,我有点不知道它是如何工作的。
描述了 Azure IoT 中心设备到云的消息格式 here。这种格式没有设备孪生属性。设备孪生存储在云端后端,可以根据到特定端点(内置 and/or 自定义端点)的物联网中心路由通知它们的更改。
您的函数“TestQueueTrigger”示例使用的是版本 1 的 azure-functions-iothub-extension。扩展输入绑定 Twin允许在扩展中使用单独的调用来获取设备孪生:
deviceTwin = await registryManager.GetTwinAsync(attribute.DeviceId);
基本上,此扩展也可用于 ServiceBusTrigger 绑定。
请注意,此扩展只能用于函数版本 1,因此我建议使用例如 REST API Get Twin 调用在函数中获取设备孪生。
更新
以下代码片段显示了 ServiceBusTrigger 函数和 REST API Get Twin 调用的示例。
run.csx 文件:
#r "..\bin\Microsoft.Azure.ServiceBus.dll"
#r "..\bin\Microsoft.Azure.Devices.Shared.dll"
#r "Microsoft.Azure.WebJobs.ServiceBus"
#r "Newtonsoft.Json"
using System;
using System.Threading.Tasks;
using System.Text;
using Newtonsoft.Json;
using Newtonsoft.Json.Linq;
using Microsoft.Azure.WebJobs.ServiceBus;
using Microsoft.Azure.ServiceBus;
using System.Globalization;
using System.Linq;
using System.Net.Http;
using System.Security.Cryptography;
using System.Web;
using Microsoft.Azure.Devices.Shared;
// reusable proxy
static HttpClientHelper iothub = new HttpClientHelper(Environment.GetEnvironmentVariable("AzureIoTHubShariedAccessPolicy"));
public static async Task Run(Message queueItem, ILogger log)
{
// payload
log.LogInformation($"C# ServiceBus queue trigger function processed message: {Encoding.UTF8.GetString(queueItem.Body)}");
// device identity Id
var deviceId = queueItem.UserProperties["iothub-connection-device-id"];
// get the device twin
var response = await iothub.Client.GetAsync($"/twins/{deviceId}?api-version=2018-06-30");
response.EnsureSuccessStatusCode();
Twin twin = await response.Content.ReadAsAsync<Twin>();
log.LogInformation(JsonConvert.SerializeObject(twin.Tags, Formatting.Indented));
await Task.CompletedTask;
}
// helpers
class HttpClientHelper
{
HttpClient client;
DateTime expiringSaS;
(string hostname, string keyname, string key) config;
public HttpClientHelper(string connectionString)
{
config = GetPartsFromConnectionString(connectionString);
client = new HttpClient() { BaseAddress = new Uri($"https://{config.hostname}")};
SetAuthorizationHeader();
}
public HttpClient Client
{
get
{
if (expiringSaS < DateTime.UtcNow.AddMinutes(-1))
{
SetAuthorizationHeader();
}
return client;
}
}
internal void SetAuthorizationHeader()
{
lock (client)
{
if (expiringSaS < DateTime.UtcNow.AddMinutes(-1))
{
string sasToken = GetSASToken(config.hostname, config.key, config.keyname, 1);
if (client.DefaultRequestHeaders.Contains("Authorization"))
client.DefaultRequestHeaders.Remove("Authorization");
client.DefaultRequestHeaders.Add("Authorization", sasToken);
expiringSaS = DateTime.UtcNow.AddHours(1);
}
}
}
internal (string hostname, string keyname, string key) GetPartsFromConnectionString(string connectionString)
{
var parts = connectionString.Split(new[] { ';' }, StringSplitOptions.RemoveEmptyEntries).Select(s => s.Split(new[] { '=' }, 2)).ToDictionary(x => x[0].Trim(), x => x[1].Trim());
return (parts["HostName"] ?? "", parts["SharedAccessKeyName"] ?? "", parts["SharedAccessKey"] ?? "");
}
internal string GetSASToken(string resourceUri, string key, string keyName = null, uint hours = 24)
{
var expiry = GetExpiry(hours);
string stringToSign = HttpUtility.UrlEncode(resourceUri) + "\n" + expiry;
HMACSHA256 hmac = new HMACSHA256(Convert.FromBase64String(key));
var signature = Convert.ToBase64String(hmac.ComputeHash(Encoding.UTF8.GetBytes(stringToSign)));
var sasToken = String.Format(CultureInfo.InvariantCulture, $"SharedAccessSignature sr={HttpUtility.UrlEncode(resourceUri)}&sig={HttpUtility.UrlEncode(signature)}&se={expiry}");
if (!string.IsNullOrEmpty(keyName))
sasToken += $"&skn={keyName}";
return sasToken;
}
internal string GetExpiry(uint hours = 24)
{
TimeSpan sinceEpoch = DateTime.UtcNow - new DateTime(1970, 1, 1);
return Convert.ToString((int)sinceEpoch.TotalSeconds + 3600 * hours);
}
}
function.json:
{
"bindings": [
{
"name": "queueItem",
"type": "serviceBusTrigger",
"direction": "in",
"queueName": "myQueue",
"connection": "myConnectionString_SERVICEBUS"
}
]
}
我们正在将数据从 IoT 设备发送到 Azure IoT Hub,并尝试将某种类型的消息传递到 Azure Functions。
目前,我们通过创建 Azure 服务总线终结点并在 IoTHub 中创建消息路由 来实现。它按预期工作,消息被 Azure 函数正确接收。
现在,我们想从 Azure 函数中的 IoT Hub 获取 DeviceId,以及在Device Twin,我完全不知道该怎么做。
如果我们使用EventHubTrigger
,看起来会很简单,做这样的事情:
public static class Test
{
[FunctionName("TestQueueTrigger")]
public static void Run(
[EventHubTrigger("messages/events", Connection = "IoTHubConnection")]
EventData message,
Twin deviceTwin,
TraceWriter log)
{ ... }
}
但确实不清楚如何使用服务总线触发器来完成此操作。
此外,我们希望将所有消息(独立于路由)存储到 Azure Data Lake 存储,我有点不知道它是如何工作的。
描述了 Azure IoT 中心设备到云的消息格式 here。这种格式没有设备孪生属性。设备孪生存储在云端后端,可以根据到特定端点(内置 and/or 自定义端点)的物联网中心路由通知它们的更改。
您的函数“TestQueueTrigger”示例使用的是版本 1 的 azure-functions-iothub-extension。扩展输入绑定 Twin允许在扩展中使用单独的调用来获取设备孪生:
deviceTwin = await registryManager.GetTwinAsync(attribute.DeviceId);
基本上,此扩展也可用于 ServiceBusTrigger 绑定。 请注意,此扩展只能用于函数版本 1,因此我建议使用例如 REST API Get Twin 调用在函数中获取设备孪生。
更新
以下代码片段显示了 ServiceBusTrigger 函数和 REST API Get Twin 调用的示例。
run.csx 文件:
#r "..\bin\Microsoft.Azure.ServiceBus.dll"
#r "..\bin\Microsoft.Azure.Devices.Shared.dll"
#r "Microsoft.Azure.WebJobs.ServiceBus"
#r "Newtonsoft.Json"
using System;
using System.Threading.Tasks;
using System.Text;
using Newtonsoft.Json;
using Newtonsoft.Json.Linq;
using Microsoft.Azure.WebJobs.ServiceBus;
using Microsoft.Azure.ServiceBus;
using System.Globalization;
using System.Linq;
using System.Net.Http;
using System.Security.Cryptography;
using System.Web;
using Microsoft.Azure.Devices.Shared;
// reusable proxy
static HttpClientHelper iothub = new HttpClientHelper(Environment.GetEnvironmentVariable("AzureIoTHubShariedAccessPolicy"));
public static async Task Run(Message queueItem, ILogger log)
{
// payload
log.LogInformation($"C# ServiceBus queue trigger function processed message: {Encoding.UTF8.GetString(queueItem.Body)}");
// device identity Id
var deviceId = queueItem.UserProperties["iothub-connection-device-id"];
// get the device twin
var response = await iothub.Client.GetAsync($"/twins/{deviceId}?api-version=2018-06-30");
response.EnsureSuccessStatusCode();
Twin twin = await response.Content.ReadAsAsync<Twin>();
log.LogInformation(JsonConvert.SerializeObject(twin.Tags, Formatting.Indented));
await Task.CompletedTask;
}
// helpers
class HttpClientHelper
{
HttpClient client;
DateTime expiringSaS;
(string hostname, string keyname, string key) config;
public HttpClientHelper(string connectionString)
{
config = GetPartsFromConnectionString(connectionString);
client = new HttpClient() { BaseAddress = new Uri($"https://{config.hostname}")};
SetAuthorizationHeader();
}
public HttpClient Client
{
get
{
if (expiringSaS < DateTime.UtcNow.AddMinutes(-1))
{
SetAuthorizationHeader();
}
return client;
}
}
internal void SetAuthorizationHeader()
{
lock (client)
{
if (expiringSaS < DateTime.UtcNow.AddMinutes(-1))
{
string sasToken = GetSASToken(config.hostname, config.key, config.keyname, 1);
if (client.DefaultRequestHeaders.Contains("Authorization"))
client.DefaultRequestHeaders.Remove("Authorization");
client.DefaultRequestHeaders.Add("Authorization", sasToken);
expiringSaS = DateTime.UtcNow.AddHours(1);
}
}
}
internal (string hostname, string keyname, string key) GetPartsFromConnectionString(string connectionString)
{
var parts = connectionString.Split(new[] { ';' }, StringSplitOptions.RemoveEmptyEntries).Select(s => s.Split(new[] { '=' }, 2)).ToDictionary(x => x[0].Trim(), x => x[1].Trim());
return (parts["HostName"] ?? "", parts["SharedAccessKeyName"] ?? "", parts["SharedAccessKey"] ?? "");
}
internal string GetSASToken(string resourceUri, string key, string keyName = null, uint hours = 24)
{
var expiry = GetExpiry(hours);
string stringToSign = HttpUtility.UrlEncode(resourceUri) + "\n" + expiry;
HMACSHA256 hmac = new HMACSHA256(Convert.FromBase64String(key));
var signature = Convert.ToBase64String(hmac.ComputeHash(Encoding.UTF8.GetBytes(stringToSign)));
var sasToken = String.Format(CultureInfo.InvariantCulture, $"SharedAccessSignature sr={HttpUtility.UrlEncode(resourceUri)}&sig={HttpUtility.UrlEncode(signature)}&se={expiry}");
if (!string.IsNullOrEmpty(keyName))
sasToken += $"&skn={keyName}";
return sasToken;
}
internal string GetExpiry(uint hours = 24)
{
TimeSpan sinceEpoch = DateTime.UtcNow - new DateTime(1970, 1, 1);
return Convert.ToString((int)sinceEpoch.TotalSeconds + 3600 * hours);
}
}
function.json:
{
"bindings": [
{
"name": "queueItem",
"type": "serviceBusTrigger",
"direction": "in",
"queueName": "myQueue",
"connection": "myConnectionString_SERVICEBUS"
}
]
}