使用 c# 在 azure iot hub 上批量导入注册设备不起作用?
Bulk import register device on azure iot hub using c# not working?
我从github msdn link获取了批量导入代码
但是在控制台和 Web 应用程序中使用相同的代码无法将任何新设备注册到物联网中心。
正在将设备列表创建到 blob 文件中,但为什么无法成功执行导入作业?我也没有得到任何例外。
class Program
{
static RegistryManager registryManager;
static string connectionString = "iothub-connectionstring";
static void Main(string[] args)
{
BulkImport();
}
static string GetContainerSasUri(CloudBlobContainer container)
{
// Set the expiry time and permissions for the container.
// In this case no start time is specified, so the
// shared access signature becomes valid immediately.
var sasConstraints = new SharedAccessBlobPolicy();
sasConstraints.SharedAccessExpiryTime = DateTime.UtcNow.AddHours(24);
sasConstraints.Permissions =
SharedAccessBlobPermissions.Write |
SharedAccessBlobPermissions.Read |
SharedAccessBlobPermissions.Delete;
// Generate the shared access signature on the container,
// setting the constraints directly on the signature.
string sasContainerToken = container.GetSharedAccessSignature(sasConstraints);
// Return the URI string for the container,
// including the SAS token.
return container.Uri + sasContainerToken;
}
private static async Task BulkImport()
{
try
{
registryManager = RegistryManager.CreateFromConnectionString(connectionString);
StorageCredentials storageCredentials = new StorageCredentials("storeage-name", "storage-key");
CloudStorageAccount storageAccount = new CloudStorageAccount(storageCredentials, useHttps: true);
// Create a blob client.
CloudBlobClient blobClient = storageAccount.CreateCloudBlobClient();
CloudBlobContainer container = blobClient.GetContainerReference("bulkiothubimport");
CloudBlockBlob blob = container.GetBlockBlobReference("devices.txt");
var containerSasUri = GetContainerSasUri(container);
// Provision 1,000 more devices
var serializedDevices = new List<string>();
for (var i = 0; i < 5; i++)
{
// Create a new ExportImportDevice
// CryptoKeyGenerator is in the Microsoft.Azure.Devices.Common namespace
var deviceToAdd = new ExportImportDevice()
{
Id = i+"mydevicenew",
Status = DeviceStatus.Enabled,
Authentication = new AuthenticationMechanism()
{
SymmetricKey = new SymmetricKey()
{
PrimaryKey = CryptoKeyGenerator.GenerateKey(32),
SecondaryKey = CryptoKeyGenerator.GenerateKey(32)
}
},
ImportMode = ImportMode.Create
};
// Add device to the list
serializedDevices.Add(JsonConvert.SerializeObject(deviceToAdd));
}
// Write the list to the blob
var sb = new StringBuilder();
serializedDevices.ForEach(serializedDevice => sb.AppendLine(serializedDevice));
//await blob.DeleteIfExistsAsync();
using (CloudBlobStream stream = await blob.OpenWriteAsync())
{
byte[] bytes = Encoding.UTF8.GetBytes(sb.ToString());
for (var i = 0; i < bytes.Length; i += 500)
{
int length = Math.Min(bytes.Length - i, 500);
await stream.WriteAsync(bytes, i, length);
}
}
// Call import using the blob to add new devices
// Log information related to the job is written to the same container
// This normally takes 1 minute per 100 devices
JobProperties importJob =
await registryManager.ImportDevicesAsync(containerSasUri, containerSasUri);
// Wait until job is finished
while (true)
{
importJob = await registryManager.GetJobAsync(importJob.JobId);
if (importJob.Status == JobStatus.Completed ||
importJob.Status == JobStatus.Failed ||
importJob.Status == JobStatus.Cancelled)
{
// Job has finished executing
break;
}
await Task.Delay(TimeSpan.FromSeconds(5));
}
}
catch (Exception ex)
{
throw ex;
}
}
}
我认为您看到此行为是因为没有等待您的 BulkImport
,这意味着,在代码第一次在方法中遇到 await
时,代码 returns到 Main
方法并终止应用程序。
尝试将 Main
的 return 值更改为 Task
(参见 Async main)而不是 void
,然后 await
调用到 BulkImport
:
static async Task Main(string[] args)
{
await BulkImport();
}
希望对您有所帮助!
批量导入设备方法的背后是对 Azure IoT 中心面向服务的端点的 REST API 调用。使用registryManager.ImportDevicesAsync方法是在Azure IoT Hub命名空间中创建一个后台作业。接受作业后,将返回其轮询处理状态的 jobId。
基本上,我们需要两个 REST API,例如 POST 和 GET 作业。可以在文档服务中找到更多详细信息 - Create Import Export Job and Service - Get Import Export Job。
以下步骤显示如何使用 REST APIs 测试此批量导入设备作业:
使用最新的 Microsoft Azure Storage Explorer 创建两个容器,例如一个用于 input,另一个用于 output 记录消息。此外,此工具允许为这些容器生成带有 sas 令牌的完整 uri 地址。
创建一个名称为 devices.txt 的文本文件并包含以下内容(配置两个设备的示例):
{"id":"Device1", "importMode":"Create", "status":"enabled", "properties":{"desired":{"abcd":12345 } } }
{"id":"Device2", "importMode":"Create", "status":"enabled", "properties":{"desired":{"abcd":12345 } } }
上传文件devices.txt到input容器。
POST 到 Azure IoT 中心的作业
POST https://{yourIoTHub}.azure-devices.net/jobs/create?api-version=2018-06-30
Authorization:{yourIoTHubSasToken}
body:
{
"inputBlobContainerUri": "{inputContainerUriWithSasToken}",
"outputBlobContainerUri": "{outputContainerUriWithSasToken}",
"inputBlobName": "devices.txt",
"type": "import"
}
响应应该是:
{
"jobId": "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx",
"startTimeUtc": "2019-01-06T16:19:54.9535051Z",
"type": "import",
"status": "enqueued",
"progress": 0,
"inputBlobContainerUri": "xxxxxx",
"outputBlobContainerUri": "xxxxxx",
"excludeKeysInExport": false,
"useSecondaryStorageAsSource": false
}
轮询作业状态,直到作业完成或失败或取消
GET https://{yourIoTHub}.azure-devices.net/jobs/xxxxx-xxxx-xxxx-xxxxxxxxxxxx?api-version=2018-06-30
Authorization:{yourIoTHubSasToken}
回复:
{
"jobId": "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx",
"startTimeUtc": "2019-01-06T16:19:54.9535051Z",
"endTimeUtc": "2019-01-06T16:20:11.4043137Z",
"type": "import",
"status": "completed",
"progress": 100,
"outputBlobContainerUri": "",
"excludeKeysInExport": false,
"useSecondaryStorageAsSource": false
}
请注意,上述步骤 4. 和 5. 可以使用 Azure Functions 来实现,以隐藏与存储和物联网中心相关的所有详细信息。
我从github msdn link获取了批量导入代码 但是在控制台和 Web 应用程序中使用相同的代码无法将任何新设备注册到物联网中心。
正在将设备列表创建到 blob 文件中,但为什么无法成功执行导入作业?我也没有得到任何例外。
class Program
{
static RegistryManager registryManager;
static string connectionString = "iothub-connectionstring";
static void Main(string[] args)
{
BulkImport();
}
static string GetContainerSasUri(CloudBlobContainer container)
{
// Set the expiry time and permissions for the container.
// In this case no start time is specified, so the
// shared access signature becomes valid immediately.
var sasConstraints = new SharedAccessBlobPolicy();
sasConstraints.SharedAccessExpiryTime = DateTime.UtcNow.AddHours(24);
sasConstraints.Permissions =
SharedAccessBlobPermissions.Write |
SharedAccessBlobPermissions.Read |
SharedAccessBlobPermissions.Delete;
// Generate the shared access signature on the container,
// setting the constraints directly on the signature.
string sasContainerToken = container.GetSharedAccessSignature(sasConstraints);
// Return the URI string for the container,
// including the SAS token.
return container.Uri + sasContainerToken;
}
private static async Task BulkImport()
{
try
{
registryManager = RegistryManager.CreateFromConnectionString(connectionString);
StorageCredentials storageCredentials = new StorageCredentials("storeage-name", "storage-key");
CloudStorageAccount storageAccount = new CloudStorageAccount(storageCredentials, useHttps: true);
// Create a blob client.
CloudBlobClient blobClient = storageAccount.CreateCloudBlobClient();
CloudBlobContainer container = blobClient.GetContainerReference("bulkiothubimport");
CloudBlockBlob blob = container.GetBlockBlobReference("devices.txt");
var containerSasUri = GetContainerSasUri(container);
// Provision 1,000 more devices
var serializedDevices = new List<string>();
for (var i = 0; i < 5; i++)
{
// Create a new ExportImportDevice
// CryptoKeyGenerator is in the Microsoft.Azure.Devices.Common namespace
var deviceToAdd = new ExportImportDevice()
{
Id = i+"mydevicenew",
Status = DeviceStatus.Enabled,
Authentication = new AuthenticationMechanism()
{
SymmetricKey = new SymmetricKey()
{
PrimaryKey = CryptoKeyGenerator.GenerateKey(32),
SecondaryKey = CryptoKeyGenerator.GenerateKey(32)
}
},
ImportMode = ImportMode.Create
};
// Add device to the list
serializedDevices.Add(JsonConvert.SerializeObject(deviceToAdd));
}
// Write the list to the blob
var sb = new StringBuilder();
serializedDevices.ForEach(serializedDevice => sb.AppendLine(serializedDevice));
//await blob.DeleteIfExistsAsync();
using (CloudBlobStream stream = await blob.OpenWriteAsync())
{
byte[] bytes = Encoding.UTF8.GetBytes(sb.ToString());
for (var i = 0; i < bytes.Length; i += 500)
{
int length = Math.Min(bytes.Length - i, 500);
await stream.WriteAsync(bytes, i, length);
}
}
// Call import using the blob to add new devices
// Log information related to the job is written to the same container
// This normally takes 1 minute per 100 devices
JobProperties importJob =
await registryManager.ImportDevicesAsync(containerSasUri, containerSasUri);
// Wait until job is finished
while (true)
{
importJob = await registryManager.GetJobAsync(importJob.JobId);
if (importJob.Status == JobStatus.Completed ||
importJob.Status == JobStatus.Failed ||
importJob.Status == JobStatus.Cancelled)
{
// Job has finished executing
break;
}
await Task.Delay(TimeSpan.FromSeconds(5));
}
}
catch (Exception ex)
{
throw ex;
}
}
}
我认为您看到此行为是因为没有等待您的 BulkImport
,这意味着,在代码第一次在方法中遇到 await
时,代码 returns到 Main
方法并终止应用程序。
尝试将 Main
的 return 值更改为 Task
(参见 Async main)而不是 void
,然后 await
调用到 BulkImport
:
static async Task Main(string[] args)
{
await BulkImport();
}
希望对您有所帮助!
批量导入设备方法的背后是对 Azure IoT 中心面向服务的端点的 REST API 调用。使用registryManager.ImportDevicesAsync方法是在Azure IoT Hub命名空间中创建一个后台作业。接受作业后,将返回其轮询处理状态的 jobId。
基本上,我们需要两个 REST API,例如 POST 和 GET 作业。可以在文档服务中找到更多详细信息 - Create Import Export Job and Service - Get Import Export Job。
以下步骤显示如何使用 REST APIs 测试此批量导入设备作业:
使用最新的 Microsoft Azure Storage Explorer 创建两个容器,例如一个用于 input,另一个用于 output 记录消息。此外,此工具允许为这些容器生成带有 sas 令牌的完整 uri 地址。
创建一个名称为 devices.txt 的文本文件并包含以下内容(配置两个设备的示例):
{"id":"Device1", "importMode":"Create", "status":"enabled", "properties":{"desired":{"abcd":12345 } } } {"id":"Device2", "importMode":"Create", "status":"enabled", "properties":{"desired":{"abcd":12345 } } }
上传文件devices.txt到input容器。
POST 到 Azure IoT 中心的作业
POST https://{yourIoTHub}.azure-devices.net/jobs/create?api-version=2018-06-30 Authorization:{yourIoTHubSasToken} body: { "inputBlobContainerUri": "{inputContainerUriWithSasToken}", "outputBlobContainerUri": "{outputContainerUriWithSasToken}", "inputBlobName": "devices.txt", "type": "import" }
响应应该是:
{ "jobId": "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx", "startTimeUtc": "2019-01-06T16:19:54.9535051Z", "type": "import", "status": "enqueued", "progress": 0, "inputBlobContainerUri": "xxxxxx", "outputBlobContainerUri": "xxxxxx", "excludeKeysInExport": false, "useSecondaryStorageAsSource": false }
轮询作业状态,直到作业完成或失败或取消
GET https://{yourIoTHub}.azure-devices.net/jobs/xxxxx-xxxx-xxxx-xxxxxxxxxxxx?api-version=2018-06-30 Authorization:{yourIoTHubSasToken}
回复:
{ "jobId": "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx", "startTimeUtc": "2019-01-06T16:19:54.9535051Z", "endTimeUtc": "2019-01-06T16:20:11.4043137Z", "type": "import", "status": "completed", "progress": 100, "outputBlobContainerUri": "", "excludeKeysInExport": false, "useSecondaryStorageAsSource": false }
请注意,上述步骤 4. 和 5. 可以使用 Azure Functions 来实现,以隐藏与存储和物联网中心相关的所有详细信息。