如何实时收听和使用来自 Azure IoT 中心的消息
How to realtime listen and consume messages from Azure IoT Hub
我目前正在尝试使用 Azure IoT 中心从事件中心兼容的 IoT 中心默认 message/events 端点读取消息。为了尝试这个,我写了两个命令行应用程序,一个模拟设备并写入 IoT 中心,另一个从 IoT 中心 messages/events 端点读取。
生产者每秒产生一条消息并将其写入物联网中心。这似乎工作正常。但是当我启动 reader/consumer 时,它会收到一批消息并关闭应用程序。但与此同时,生产者仍然生产消息。
我的期望是生产者每秒或随机地产生消息,而消费者“监听”端点,如果有新消息到达,则读取并显示它。按照我与 Azure IoT 中心的生产者和消费者的代码。
生产者/模拟物联网设备
using Microsoft.Azure.Devices.Client;
using Newtonsoft.Json;
using Newtonsoft.Json.Linq;
using System;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
namespace IoTGarage_Azure_01_Simulated_Device
{
class Program
{
private static DeviceClient s_deviceClient;
private readonly static string s_myDeviceId = "simulatedDevice";
private readonly static string s_iotHubUri = "<name.azure-devices.net>";
// Im IoT Hub > Geräte > Primärschlüssel
private readonly static string s_deviceKey = "<primary key>";
private static async Task Main()
{
Console.WriteLine("Routing Tutorial: Simulated device\n");
s_deviceClient = DeviceClient.Create(s_iotHubUri,
new DeviceAuthenticationWithRegistrySymmetricKey(s_myDeviceId, s_deviceKey), TransportType.Mqtt);
using var cts = new CancellationTokenSource();
var messages = SendDeviceToCloudMessagesAsync(cts.Token);
Console.WriteLine("Press the Enter key to stop.");
Console.ReadLine();
cts.Cancel();
await messages;
}
private static async Task SendDeviceToCloudMessagesAsync(CancellationToken token)
{
double minTemperature = 20;
double minHumidity = 60;
Random rand = new Random();
while (!token.IsCancellationRequested)
{
double currentTemperature = minTemperature + rand.NextDouble() * 15;
double currentHumidity = minHumidity + rand.NextDouble() * 20;
string infoString;
string levelValue;
if (rand.NextDouble() > 0.7)
{
if (rand.NextDouble() > 0.5)
{
levelValue = "critical";
infoString = "This is a critical message.";
}
else
{
levelValue = "storage";
infoString = "This is a storage message.";
}
}
else
{
levelValue = "normal";
infoString = "This is a normal message.";
}
var telemetryDataPoint = new
{
deviceId = s_myDeviceId,
temperature = currentTemperature,
humidity = currentHumidity,
pointInfo = infoString
};
var telemetryDataString = JsonConvert.SerializeObject(telemetryDataPoint);
// You can encode this as ASCII, but if you want it to be the body of the message,
// and to be able to search the body, it must be encoded in UTF with base64 encoding.
using var message = new Message(Encoding.UTF32.GetBytes(telemetryDataString));
//Add one property to the message.
message.Properties.Add("target", levelValue);
// Submit the message to the hub.
await s_deviceClient.SendEventAsync(message);
// Print out the message.
Console.WriteLine("{0} > Sent message: {1}", DateTime.Now, telemetryDataString);
await Task.Delay(1000);
}
}
}
}
消费者
using System;
using System.Text;
using System.Threading.Tasks;
using Azure.Storage.Blobs;
using Azure.Messaging.EventHubs;
using Azure.Messaging.EventHubs.Consumer;
using Azure.Messaging.EventHubs.Processor;
namespace IoTGarage_Azure_02_IoTHub_ReadFromInternalEndpoint
{
class Program
{
private const string ehubNamespaceConnectionString = "<Endpoint=sb://>";
private const string eventHubName = "<iothubname>";
private const string blobStorageConnectionString = "<DefaultEndpointsProtocol=https;AccountName=EndpointSuffix=core.windows.net>";
private const string blobContainerName = "checkpointblob";
static async Task Main()
{
string consumerGroup = EventHubConsumerClient.DefaultConsumerGroupName;
BlobContainerClient storageClient = new BlobContainerClient(blobStorageConnectionString, blobContainerName);
EventProcessorClient processor = new EventProcessorClient(storageClient, consumerGroup, ehubNamespaceConnectionString, eventHubName);
processor.ProcessEventAsync += ProcessEventHandler;
processor.ProcessErrorAsync += ProcessErrorHandler;
await processor.StartProcessingAsync();
await Task.Delay(TimeSpan.FromSeconds(30));
await processor.StopProcessingAsync();
}
static async Task ProcessEventHandler(ProcessEventArgs eventArgs)
{
Console.WriteLine("\tReceived event: {0}", Encoding.UTF32.GetString(eventArgs.Data.Body.ToArray()));
await eventArgs.UpdateCheckpointAsync(eventArgs.CancellationToken);
}
static Task ProcessErrorHandler(ProcessErrorEventArgs eventArgs)
{
Console.WriteLine($"\tPartition '{ eventArgs.PartitionId}': an unhandled exception was encountered. This was not expected to happen.");
Console.WriteLine(eventArgs.Exception.Message);
return Task.CompletedTask;
}
}
}
您所看到的可能是由于 IoTHub 的消息保留策略有关其内置 EventHub。这可以解释您看到从接收器传来的最初一连串消息。您的应用程序退出的事实可能是因为您让主线程退出。设置 EventPosition=Latest 以仅读取传入的新消息。
选项 #1 - 使用 EventHubConsumerClient
using Azure.Messaging.EventHubs.Consumer;
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading.Tasks;
namespace DotnetService
{
class Program
{
private const string EventHubsCompatibleEndpoint = "TODO: az iot hub show --query properties.eventHubEndpoints.events.endpoint --name {hubname}";
private const string EventHubsCompatiblePath = "TODO: {hubname}";
private const string IotHubSasKey = "TODO: az iot hub policy show --name service --query primaryKey --hub-name {hubname}";
private const string ConsumerGroup = "$Default";
private static EventHubConsumerClient eventHubConsumerClient = null;
private async static Task Setup()
{
string eventHubConnectionString = $"Endpoint={EventHubsCompatibleEndpoint.Replace("sb://", "amqps://")};EntityPath={EventHubsCompatiblePath};SharedAccessKeyName=service;SharedAccessKey={IotHubSasKey};";
eventHubConsumerClient = new EventHubConsumerClient(ConsumerGroup, eventHubConnectionString);
var tasks = new List<Task>();
var partitions = await eventHubConsumerClient.GetPartitionIdsAsync();
foreach (string partition in partitions)
{
tasks.Add(ReceiveMessagesFromDeviceAsync(partition));
}
}
static async Task ReceiveMessagesFromDeviceAsync(string partitionId)
{
Console.WriteLine($"Starting listener thread for partition: {partitionId}");
while (true)
{
await foreach (PartitionEvent receivedEvent in eventHubConsumerClient.ReadEventsFromPartitionAsync(partitionId, EventPosition.Latest))
{
string msgSource;
string body = Encoding.UTF8.GetString(receivedEvent.Data.Body.ToArray());
if (receivedEvent.Data.SystemProperties.ContainsKey("iothub-message-source"))
{
msgSource = receivedEvent.Data.SystemProperties["iothub-message-source"].ToString();
Console.WriteLine($"{partitionId} {msgSource} {body}");
}
}
}
}
static async Task Main(string[] args)
{
await Setup();
Console.ReadLine();
}
}
}
选项 #2 使用 EventProcessorClient
using System;
using System.Text;
using System.Threading.Tasks;
using Azure.Storage.Blobs;
using Azure.Messaging.EventHubs;
using Azure.Messaging.EventHubs.Consumer;
using Azure.Messaging.EventHubs.Processor;
namespace DotnetService
{
class Program
{
private const string ehubNamespaceConnectionString = "Endpoint=sb://...";
private const string eventHubName = "{iothubname}";
private const string blobStorageConnectionString = "DefaultEndpointsProtocol=...";
private const string blobContainerName = "{storagename}";
private static Task initializeEventHandler(PartitionInitializingEventArgs arg)
{
arg.DefaultStartingPosition = EventPosition.Latest;
return Task.CompletedTask;
}
static async Task ProcessEventHandler(ProcessEventArgs eventArgs)
{
Console.WriteLine("\tReceived event: {0}", Encoding.UTF8.GetString(eventArgs.Data.Body.ToArray()));
await eventArgs.UpdateCheckpointAsync(eventArgs.CancellationToken);
}
static Task ProcessErrorHandler(ProcessErrorEventArgs eventArgs)
{
Console.WriteLine($"\tPartition '{ eventArgs.PartitionId}': an unhandled exception was encountered.");
Console.WriteLine(eventArgs.Exception.Message);
return Task.CompletedTask;
}
static async Task Main()
{
BlobContainerClient storageClient = new BlobContainerClient(blobStorageConnectionString, blobContainerName);
EventProcessorClient processor = new EventProcessorClient(storageClient, EventHubConsumerClient.DefaultConsumerGroupName, ehubNamespaceConnectionString, eventHubName);
processor.PartitionInitializingAsync += initializeEventHandler;
processor.ProcessEventAsync += ProcessEventHandler;
processor.ProcessErrorAsync += ProcessErrorHandler;
await processor.StartProcessingAsync();
Console.ReadLine();
}
}
}
我目前正在尝试使用 Azure IoT 中心从事件中心兼容的 IoT 中心默认 message/events 端点读取消息。为了尝试这个,我写了两个命令行应用程序,一个模拟设备并写入 IoT 中心,另一个从 IoT 中心 messages/events 端点读取。
生产者每秒产生一条消息并将其写入物联网中心。这似乎工作正常。但是当我启动 reader/consumer 时,它会收到一批消息并关闭应用程序。但与此同时,生产者仍然生产消息。
我的期望是生产者每秒或随机地产生消息,而消费者“监听”端点,如果有新消息到达,则读取并显示它。按照我与 Azure IoT 中心的生产者和消费者的代码。
生产者/模拟物联网设备
using Microsoft.Azure.Devices.Client;
using Newtonsoft.Json;
using Newtonsoft.Json.Linq;
using System;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
namespace IoTGarage_Azure_01_Simulated_Device
{
class Program
{
private static DeviceClient s_deviceClient;
private readonly static string s_myDeviceId = "simulatedDevice";
private readonly static string s_iotHubUri = "<name.azure-devices.net>";
// Im IoT Hub > Geräte > Primärschlüssel
private readonly static string s_deviceKey = "<primary key>";
private static async Task Main()
{
Console.WriteLine("Routing Tutorial: Simulated device\n");
s_deviceClient = DeviceClient.Create(s_iotHubUri,
new DeviceAuthenticationWithRegistrySymmetricKey(s_myDeviceId, s_deviceKey), TransportType.Mqtt);
using var cts = new CancellationTokenSource();
var messages = SendDeviceToCloudMessagesAsync(cts.Token);
Console.WriteLine("Press the Enter key to stop.");
Console.ReadLine();
cts.Cancel();
await messages;
}
private static async Task SendDeviceToCloudMessagesAsync(CancellationToken token)
{
double minTemperature = 20;
double minHumidity = 60;
Random rand = new Random();
while (!token.IsCancellationRequested)
{
double currentTemperature = minTemperature + rand.NextDouble() * 15;
double currentHumidity = minHumidity + rand.NextDouble() * 20;
string infoString;
string levelValue;
if (rand.NextDouble() > 0.7)
{
if (rand.NextDouble() > 0.5)
{
levelValue = "critical";
infoString = "This is a critical message.";
}
else
{
levelValue = "storage";
infoString = "This is a storage message.";
}
}
else
{
levelValue = "normal";
infoString = "This is a normal message.";
}
var telemetryDataPoint = new
{
deviceId = s_myDeviceId,
temperature = currentTemperature,
humidity = currentHumidity,
pointInfo = infoString
};
var telemetryDataString = JsonConvert.SerializeObject(telemetryDataPoint);
// You can encode this as ASCII, but if you want it to be the body of the message,
// and to be able to search the body, it must be encoded in UTF with base64 encoding.
using var message = new Message(Encoding.UTF32.GetBytes(telemetryDataString));
//Add one property to the message.
message.Properties.Add("target", levelValue);
// Submit the message to the hub.
await s_deviceClient.SendEventAsync(message);
// Print out the message.
Console.WriteLine("{0} > Sent message: {1}", DateTime.Now, telemetryDataString);
await Task.Delay(1000);
}
}
}
}
消费者
using System;
using System.Text;
using System.Threading.Tasks;
using Azure.Storage.Blobs;
using Azure.Messaging.EventHubs;
using Azure.Messaging.EventHubs.Consumer;
using Azure.Messaging.EventHubs.Processor;
namespace IoTGarage_Azure_02_IoTHub_ReadFromInternalEndpoint
{
class Program
{
private const string ehubNamespaceConnectionString = "<Endpoint=sb://>";
private const string eventHubName = "<iothubname>";
private const string blobStorageConnectionString = "<DefaultEndpointsProtocol=https;AccountName=EndpointSuffix=core.windows.net>";
private const string blobContainerName = "checkpointblob";
static async Task Main()
{
string consumerGroup = EventHubConsumerClient.DefaultConsumerGroupName;
BlobContainerClient storageClient = new BlobContainerClient(blobStorageConnectionString, blobContainerName);
EventProcessorClient processor = new EventProcessorClient(storageClient, consumerGroup, ehubNamespaceConnectionString, eventHubName);
processor.ProcessEventAsync += ProcessEventHandler;
processor.ProcessErrorAsync += ProcessErrorHandler;
await processor.StartProcessingAsync();
await Task.Delay(TimeSpan.FromSeconds(30));
await processor.StopProcessingAsync();
}
static async Task ProcessEventHandler(ProcessEventArgs eventArgs)
{
Console.WriteLine("\tReceived event: {0}", Encoding.UTF32.GetString(eventArgs.Data.Body.ToArray()));
await eventArgs.UpdateCheckpointAsync(eventArgs.CancellationToken);
}
static Task ProcessErrorHandler(ProcessErrorEventArgs eventArgs)
{
Console.WriteLine($"\tPartition '{ eventArgs.PartitionId}': an unhandled exception was encountered. This was not expected to happen.");
Console.WriteLine(eventArgs.Exception.Message);
return Task.CompletedTask;
}
}
}
您所看到的可能是由于 IoTHub 的消息保留策略有关其内置 EventHub。这可以解释您看到从接收器传来的最初一连串消息。您的应用程序退出的事实可能是因为您让主线程退出。设置 EventPosition=Latest 以仅读取传入的新消息。
选项 #1 - 使用 EventHubConsumerClient
using Azure.Messaging.EventHubs.Consumer;
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading.Tasks;
namespace DotnetService
{
class Program
{
private const string EventHubsCompatibleEndpoint = "TODO: az iot hub show --query properties.eventHubEndpoints.events.endpoint --name {hubname}";
private const string EventHubsCompatiblePath = "TODO: {hubname}";
private const string IotHubSasKey = "TODO: az iot hub policy show --name service --query primaryKey --hub-name {hubname}";
private const string ConsumerGroup = "$Default";
private static EventHubConsumerClient eventHubConsumerClient = null;
private async static Task Setup()
{
string eventHubConnectionString = $"Endpoint={EventHubsCompatibleEndpoint.Replace("sb://", "amqps://")};EntityPath={EventHubsCompatiblePath};SharedAccessKeyName=service;SharedAccessKey={IotHubSasKey};";
eventHubConsumerClient = new EventHubConsumerClient(ConsumerGroup, eventHubConnectionString);
var tasks = new List<Task>();
var partitions = await eventHubConsumerClient.GetPartitionIdsAsync();
foreach (string partition in partitions)
{
tasks.Add(ReceiveMessagesFromDeviceAsync(partition));
}
}
static async Task ReceiveMessagesFromDeviceAsync(string partitionId)
{
Console.WriteLine($"Starting listener thread for partition: {partitionId}");
while (true)
{
await foreach (PartitionEvent receivedEvent in eventHubConsumerClient.ReadEventsFromPartitionAsync(partitionId, EventPosition.Latest))
{
string msgSource;
string body = Encoding.UTF8.GetString(receivedEvent.Data.Body.ToArray());
if (receivedEvent.Data.SystemProperties.ContainsKey("iothub-message-source"))
{
msgSource = receivedEvent.Data.SystemProperties["iothub-message-source"].ToString();
Console.WriteLine($"{partitionId} {msgSource} {body}");
}
}
}
}
static async Task Main(string[] args)
{
await Setup();
Console.ReadLine();
}
}
}
选项 #2 使用 EventProcessorClient
using System;
using System.Text;
using System.Threading.Tasks;
using Azure.Storage.Blobs;
using Azure.Messaging.EventHubs;
using Azure.Messaging.EventHubs.Consumer;
using Azure.Messaging.EventHubs.Processor;
namespace DotnetService
{
class Program
{
private const string ehubNamespaceConnectionString = "Endpoint=sb://...";
private const string eventHubName = "{iothubname}";
private const string blobStorageConnectionString = "DefaultEndpointsProtocol=...";
private const string blobContainerName = "{storagename}";
private static Task initializeEventHandler(PartitionInitializingEventArgs arg)
{
arg.DefaultStartingPosition = EventPosition.Latest;
return Task.CompletedTask;
}
static async Task ProcessEventHandler(ProcessEventArgs eventArgs)
{
Console.WriteLine("\tReceived event: {0}", Encoding.UTF8.GetString(eventArgs.Data.Body.ToArray()));
await eventArgs.UpdateCheckpointAsync(eventArgs.CancellationToken);
}
static Task ProcessErrorHandler(ProcessErrorEventArgs eventArgs)
{
Console.WriteLine($"\tPartition '{ eventArgs.PartitionId}': an unhandled exception was encountered.");
Console.WriteLine(eventArgs.Exception.Message);
return Task.CompletedTask;
}
static async Task Main()
{
BlobContainerClient storageClient = new BlobContainerClient(blobStorageConnectionString, blobContainerName);
EventProcessorClient processor = new EventProcessorClient(storageClient, EventHubConsumerClient.DefaultConsumerGroupName, ehubNamespaceConnectionString, eventHubName);
processor.PartitionInitializingAsync += initializeEventHandler;
processor.ProcessEventAsync += ProcessEventHandler;
processor.ProcessErrorAsync += ProcessErrorHandler;
await processor.StartProcessingAsync();
Console.ReadLine();
}
}
}