改进代码以根据上次消息日期从事件中心读取消息
Improve code to read messages from Event Hub, depending on last message date
以下代码连接到 Azure 事件中心,它遍历所有分区,然后读取要处理的消息并插入数据库(待完成),代码工作正常,但是每次它读取所有消息。
这将作为 Azure WebJob 安装,因此它将 运行 连续、实时、不间断。
- 如何改进此代码以仅读取未处理的消息?
while/for 部分是否有更好的编码方式,您会采用不同的方式吗?
static void Main(string[] args)
{
ServiceBusConnectionStringBuilder builder = new ServiceBusConnectionStringBuilder(ConfigurationManager.AppSettings["ConnectionString"].ToString());
builder.TransportType = TransportType.Amqp;
MessagingFactory factory = MessagingFactory.CreateFromConnectionString(ConfigurationManager.AppSettings["ConnectionString"].ToString());
EventHubClient client = factory.CreateEventHubClient(ConfigurationManager.AppSettings["eventHubEntity"].ToString());
EventHubConsumerGroup group = client.GetDefaultConsumerGroup();
CancellationTokenSource cts = new CancellationTokenSource();
System.Console.CancelKeyPress += (s, e) =>
{
e.Cancel = true;
cts.Cancel();
Console.WriteLine("Exiting...");
};
var d2cPartitions = client.GetRuntimeInformation().PartitionIds;
while (true)
{
foreach (string partition in d2cPartitions)
{
EventHubReceiver receiver = group.CreateReceiver(partition, DateTime.MinValue);
EventData data = receiver.Receive();
Console.WriteLine("{0} {1} {2}", data.PartitionKey, data.EnqueuedTimeUtc.ToLocalTime(), Encoding.UTF8.GetString(data.GetBytes()));
var dateLastMessage = data.EnqueuedTimeUtc.ToLocalTime();
receiver.Close();
client.Close();
factory.Close();
}
}
}
使用 EventHubReceiver
无法为您提供所需的控制。相反,您应该使用 EventProcessorHost
,它允许您可以使用检查点来恢复处理消息。
请参阅 http://blogs.biztalk360.com/understanding-consumer-side-of-azure-event-hubs-checkpoint-initialoffset-eventprocessorhost/ and https://blogs.msdn.microsoft.com/servicebus/2015/01/16/event-processor-host-best-practices-part-1/ 了解背景知识。
您可以轻松地在 WebJob 中托管 EventProcessor
。
以下代码连接到 Azure 事件中心,它遍历所有分区,然后读取要处理的消息并插入数据库(待完成),代码工作正常,但是每次它读取所有消息。
这将作为 Azure WebJob 安装,因此它将 运行 连续、实时、不间断。
- 如何改进此代码以仅读取未处理的消息?
while/for 部分是否有更好的编码方式,您会采用不同的方式吗?
static void Main(string[] args) { ServiceBusConnectionStringBuilder builder = new ServiceBusConnectionStringBuilder(ConfigurationManager.AppSettings["ConnectionString"].ToString()); builder.TransportType = TransportType.Amqp; MessagingFactory factory = MessagingFactory.CreateFromConnectionString(ConfigurationManager.AppSettings["ConnectionString"].ToString()); EventHubClient client = factory.CreateEventHubClient(ConfigurationManager.AppSettings["eventHubEntity"].ToString()); EventHubConsumerGroup group = client.GetDefaultConsumerGroup(); CancellationTokenSource cts = new CancellationTokenSource(); System.Console.CancelKeyPress += (s, e) => { e.Cancel = true; cts.Cancel(); Console.WriteLine("Exiting..."); }; var d2cPartitions = client.GetRuntimeInformation().PartitionIds; while (true) { foreach (string partition in d2cPartitions) { EventHubReceiver receiver = group.CreateReceiver(partition, DateTime.MinValue); EventData data = receiver.Receive(); Console.WriteLine("{0} {1} {2}", data.PartitionKey, data.EnqueuedTimeUtc.ToLocalTime(), Encoding.UTF8.GetString(data.GetBytes())); var dateLastMessage = data.EnqueuedTimeUtc.ToLocalTime(); receiver.Close(); client.Close(); factory.Close(); } } }
使用 EventHubReceiver
无法为您提供所需的控制。相反,您应该使用 EventProcessorHost
,它允许您可以使用检查点来恢复处理消息。
请参阅 http://blogs.biztalk360.com/understanding-consumer-side-of-azure-event-hubs-checkpoint-initialoffset-eventprocessorhost/ and https://blogs.msdn.microsoft.com/servicebus/2015/01/16/event-processor-host-best-practices-part-1/ 了解背景知识。
您可以轻松地在 WebJob 中托管 EventProcessor
。