Azure 函数 - 从 IoT 中心保存到 Table 存储

Azure function - save to Table Storage from IoT Hub

我有 5 台设备连接到 IoT 中心。此设备发送消息,我已将此消息保存到 Azure 存储 Table 而不是 blob。

我根据本指南做所有事情 https://blog.maximerouiller.com/post/persisting-iot-device-messages-into-cosmosdb-with-azure-functions-and-iot-hub/ 不幸的是,我可以毫无问题地添加输入和输出,不幸的是,我无法编写将此数据保存到 table :(

我可以将数据从 IoT 中心保存到 Blob 存储,从具有流分析的 IoT 中心保存到 Table 存储,但我无法从没有 SA 的 IoT 中心保存到 table 存储:(

我建议您针对您的情况使用 azure table storage REST API。

您也可以为此使用 SDK。请往下看。

Class

public class Item : TableEntity
    {
        public Item()
        {
            PartitionKey = "YourPartionKey";
            RowKey = "YourRowKey";
        }
        public string Message{ get; set; }
        public string Description { get; set; }

    }

使用 SDK 的内部函数

 Item entity = new Item("YourPartionKey", "YourRowKey")
            {
                Message= "I am From IOT Device",
                Description = "I am from IOT and Want to go to Storage"
            };
            // My Storage operation 
            var client = new CloudTableClient(new Uri("https://YourTableStorageAccountName.table.core.windows.net/"),
                      new Microsoft.WindowsAzure.Storage.Auth.StorageCredentials("YourTableStorageAccountName", "YourStorageKey"));
            var table = client.GetTableReference("YourTableName");

            TableOperation insertOperation = TableOperation.Insert(entity);
            var insertOnstorage = await table.ExecuteAsync(insertOperation);

            Console.WriteLine("Entity inserted!");

REST API参考

URL: https://YourAccount.table.core.windows.net/YourTableThatYouWantedToInsertMessase

Method: POST

Request Body:

{  

   "Message":"IOT Message",  
   "Description":"I am from IOT and Want to go to Storage",  
   "PartitionKey":"Yourpartitionkey",  
   "RowKey":"YourRowkey"  
}

Note: For more details you could refer here

如果您有任何疑问,请随时分享。谢谢,编码愉快!

这是 C# Azure Function V2 的代码,它使用 deviceId 作为 PartitionKey 和 messageId 作为 RowKey 将数据保存到存储table:

public static class IotHubToTableStorage
    {
        private static CloudTable _outputTable = CloudTableHelper.GetCloudTable("MyTableName");

        [FunctionName("IotHubToTableStorage")]
        public static async Task Run([EventHubTrigger("messages/events", Connection = "myConnectionString", ConsumerGroup = "myTablestorageConsumerGroup")]EventData eventData,
            ILogger log)
        {
            string message = Encoding.UTF8.GetString(eventData.Body.Array, eventData.Body.Offset, eventData.Body.Count);
            var deviceData = JsonConvert.DeserializeObject<JObject>(message);

            var dynamicTableEntity = new DynamicTableEntity();

            foreach (KeyValuePair<string, JToken> keyValuePair in deviceData)
            {
                if (keyValuePair.Key.Equals("deviceId"))
                {
                    dynamicTableEntity.PartitionKey = keyValuePair.Value.ToString();

                }
                else if (keyValuePair.Key.Equals("messageId"))
                {
                    dynamicTableEntity.RowKey = keyValuePair.Value.ToString();
                }
                else
                {
                    dynamicTableEntity.Properties.Add(keyValuePair.Key, EntityProperty.CreateEntityPropertyFromObject(keyValuePair.Value));
                }
            }

            var tableOperation = TableOperation.InsertOrMerge(dynamicTableEntity);
            await _outputTable.ExecuteAsync(tableOperation);

        }
    }

它利用了这个助手:

public class CloudTableHelper
        {
            public static CloudTable GetCloudTable(string tableName, string storageConnectionString)
            {
                // Retrieve storage account from connection string.
                CloudStorageAccount storageAccount = CloudStorageAccount.Parse(storageConnectionString);

                // Create the table client.
                CloudTableClient tableClient = storageAccount.CreateCloudTableClient();

                // Retrieve a reference to a table.
                CloudTable table = tableClient.GetTableReference(tableName);

                // Create the table if it doesn't already exist
                table.CreateIfNotExistsAsync().Wait();

                return table;
            }
        }