Azure 函数 - 从 IoT 中心保存到 Table 存储
Azure function - save to Table Storage from IoT Hub
我有 5 台设备连接到 IoT 中心。此设备发送消息,我已将此消息保存到 Azure 存储 Table 而不是 blob。
我根据本指南做所有事情 https://blog.maximerouiller.com/post/persisting-iot-device-messages-into-cosmosdb-with-azure-functions-and-iot-hub/ 不幸的是,我可以毫无问题地添加输入和输出,不幸的是,我无法编写将此数据保存到 table :(
我可以将数据从 IoT 中心保存到 Blob 存储,从具有流分析的 IoT 中心保存到 Table 存储,但我无法从没有 SA 的 IoT 中心保存到 table 存储:(
我建议您针对您的情况使用 azure table storage
REST API。
您也可以为此使用 SDK。请往下看。
Class
public class Item : TableEntity
{
public Item()
{
PartitionKey = "YourPartionKey";
RowKey = "YourRowKey";
}
public string Message{ get; set; }
public string Description { get; set; }
}
使用 SDK 的内部函数
Item entity = new Item("YourPartionKey", "YourRowKey")
{
Message= "I am From IOT Device",
Description = "I am from IOT and Want to go to Storage"
};
// My Storage operation
var client = new CloudTableClient(new Uri("https://YourTableStorageAccountName.table.core.windows.net/"),
new Microsoft.WindowsAzure.Storage.Auth.StorageCredentials("YourTableStorageAccountName", "YourStorageKey"));
var table = client.GetTableReference("YourTableName");
TableOperation insertOperation = TableOperation.Insert(entity);
var insertOnstorage = await table.ExecuteAsync(insertOperation);
Console.WriteLine("Entity inserted!");
REST API参考
URL:
https://YourAccount.table.core.windows.net/YourTableThatYouWantedToInsertMessase
Method:
POST
Request Body:
{
"Message":"IOT Message",
"Description":"I am from IOT and Want to go to Storage",
"PartitionKey":"Yourpartitionkey",
"RowKey":"YourRowkey"
}
Note: For more details you could refer here
如果您有任何疑问,请随时分享。谢谢,编码愉快!
这是 C# Azure Function V2 的代码,它使用 deviceId 作为 PartitionKey 和 messageId 作为 RowKey 将数据保存到存储table:
public static class IotHubToTableStorage
{
private static CloudTable _outputTable = CloudTableHelper.GetCloudTable("MyTableName");
[FunctionName("IotHubToTableStorage")]
public static async Task Run([EventHubTrigger("messages/events", Connection = "myConnectionString", ConsumerGroup = "myTablestorageConsumerGroup")]EventData eventData,
ILogger log)
{
string message = Encoding.UTF8.GetString(eventData.Body.Array, eventData.Body.Offset, eventData.Body.Count);
var deviceData = JsonConvert.DeserializeObject<JObject>(message);
var dynamicTableEntity = new DynamicTableEntity();
foreach (KeyValuePair<string, JToken> keyValuePair in deviceData)
{
if (keyValuePair.Key.Equals("deviceId"))
{
dynamicTableEntity.PartitionKey = keyValuePair.Value.ToString();
}
else if (keyValuePair.Key.Equals("messageId"))
{
dynamicTableEntity.RowKey = keyValuePair.Value.ToString();
}
else
{
dynamicTableEntity.Properties.Add(keyValuePair.Key, EntityProperty.CreateEntityPropertyFromObject(keyValuePair.Value));
}
}
var tableOperation = TableOperation.InsertOrMerge(dynamicTableEntity);
await _outputTable.ExecuteAsync(tableOperation);
}
}
它利用了这个助手:
public class CloudTableHelper
{
public static CloudTable GetCloudTable(string tableName, string storageConnectionString)
{
// Retrieve storage account from connection string.
CloudStorageAccount storageAccount = CloudStorageAccount.Parse(storageConnectionString);
// Create the table client.
CloudTableClient tableClient = storageAccount.CreateCloudTableClient();
// Retrieve a reference to a table.
CloudTable table = tableClient.GetTableReference(tableName);
// Create the table if it doesn't already exist
table.CreateIfNotExistsAsync().Wait();
return table;
}
}
我有 5 台设备连接到 IoT 中心。此设备发送消息,我已将此消息保存到 Azure 存储 Table 而不是 blob。
我根据本指南做所有事情 https://blog.maximerouiller.com/post/persisting-iot-device-messages-into-cosmosdb-with-azure-functions-and-iot-hub/ 不幸的是,我可以毫无问题地添加输入和输出,不幸的是,我无法编写将此数据保存到 table :(
我可以将数据从 IoT 中心保存到 Blob 存储,从具有流分析的 IoT 中心保存到 Table 存储,但我无法从没有 SA 的 IoT 中心保存到 table 存储:(
我建议您针对您的情况使用 azure table storage
REST API。
您也可以为此使用 SDK。请往下看。
Class
public class Item : TableEntity
{
public Item()
{
PartitionKey = "YourPartionKey";
RowKey = "YourRowKey";
}
public string Message{ get; set; }
public string Description { get; set; }
}
使用 SDK 的内部函数
Item entity = new Item("YourPartionKey", "YourRowKey")
{
Message= "I am From IOT Device",
Description = "I am from IOT and Want to go to Storage"
};
// My Storage operation
var client = new CloudTableClient(new Uri("https://YourTableStorageAccountName.table.core.windows.net/"),
new Microsoft.WindowsAzure.Storage.Auth.StorageCredentials("YourTableStorageAccountName", "YourStorageKey"));
var table = client.GetTableReference("YourTableName");
TableOperation insertOperation = TableOperation.Insert(entity);
var insertOnstorage = await table.ExecuteAsync(insertOperation);
Console.WriteLine("Entity inserted!");
REST API参考
URL:
https://YourAccount.table.core.windows.net/YourTableThatYouWantedToInsertMessase
Method:
POST
Request Body:
{
"Message":"IOT Message",
"Description":"I am from IOT and Want to go to Storage",
"PartitionKey":"Yourpartitionkey",
"RowKey":"YourRowkey"
}
Note: For more details you could refer here
如果您有任何疑问,请随时分享。谢谢,编码愉快!
这是 C# Azure Function V2 的代码,它使用 deviceId 作为 PartitionKey 和 messageId 作为 RowKey 将数据保存到存储table:
public static class IotHubToTableStorage
{
private static CloudTable _outputTable = CloudTableHelper.GetCloudTable("MyTableName");
[FunctionName("IotHubToTableStorage")]
public static async Task Run([EventHubTrigger("messages/events", Connection = "myConnectionString", ConsumerGroup = "myTablestorageConsumerGroup")]EventData eventData,
ILogger log)
{
string message = Encoding.UTF8.GetString(eventData.Body.Array, eventData.Body.Offset, eventData.Body.Count);
var deviceData = JsonConvert.DeserializeObject<JObject>(message);
var dynamicTableEntity = new DynamicTableEntity();
foreach (KeyValuePair<string, JToken> keyValuePair in deviceData)
{
if (keyValuePair.Key.Equals("deviceId"))
{
dynamicTableEntity.PartitionKey = keyValuePair.Value.ToString();
}
else if (keyValuePair.Key.Equals("messageId"))
{
dynamicTableEntity.RowKey = keyValuePair.Value.ToString();
}
else
{
dynamicTableEntity.Properties.Add(keyValuePair.Key, EntityProperty.CreateEntityPropertyFromObject(keyValuePair.Value));
}
}
var tableOperation = TableOperation.InsertOrMerge(dynamicTableEntity);
await _outputTable.ExecuteAsync(tableOperation);
}
}
它利用了这个助手:
public class CloudTableHelper
{
public static CloudTable GetCloudTable(string tableName, string storageConnectionString)
{
// Retrieve storage account from connection string.
CloudStorageAccount storageAccount = CloudStorageAccount.Parse(storageConnectionString);
// Create the table client.
CloudTableClient tableClient = storageAccount.CreateCloudTableClient();
// Retrieve a reference to a table.
CloudTable table = tableClient.GetTableReference(tableName);
// Create the table if it doesn't already exist
table.CreateIfNotExistsAsync().Wait();
return table;
}
}