Azure 事件中心 - 如何使用官方 SDK 并行使用事件?
Azure event hub - How to consume events parallelly using the official SDK?
我设置了以下测试:
- 创建了一个包含 10 个分区的 azure 事件中心
- 创建了一个存储帐户
- 创建了一个消费者组
- 已用 10k 条消息填满中心
- 创建了 2 个容器(在 AKS 上),它们基本上会使用这些事件(使用相同的消费者组)并记录它们 azure 应用程序见解。
期望:
运行
traces
| where message == "Event received"
| summarize count() by bin(timestamp,1s), cloud_RoleInstance
| render timechart
并看到如下内容:
但我看到的是:
(这是一个 3x 运行,每个事件 10k,以消除“pod 未预热变量”)
请注意 pods activity 之间没有(或很少)重叠,就好像其中一个持有锁或其他东西一样,神秘地,在某些时候,锁是被另一个 pod 释放和使用。
相关消费代码:
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
_processor = new EventProcessorClient(_storageClient, _consumerGroup, _hubConnection, _eventHubName);
_processor.ProcessEventAsync += ProcessEventHandler;
_processor.ProcessErrorAsync += ProcessErrorHandler;
// Start the processing
await _processor.StartProcessingAsync(stoppingToken);
}
internal async Task ProcessEventHandler(ProcessEventArgs eventArgs)
{
_logger.LogTelemetry("Event received");
await eventArgs.UpdateCheckpointAsync(eventArgs.CancellationToken);
}
如上所述,您的高级场景已设置为并行使用。
每个 EventProcessorClient
独立工作,尽管它们通过存储进行协调以在它们之间分割分区的所有权。在这种情况下,每个处理器应该拥有 5 个分区,他们将使用默认配置在约 60-90 秒内声明这些分区,之后所有权应该是稳定的。
对于处理器拥有的每个分区,一个独立的后台任务用于从事件中心读取事件并将它们分派给您的处理程序。您的处理程序将被同时调用,但它保证对给定分区进行单个活动调用。
您看到的结果表明有问题,但推测原因的上下文有限。片段中的几个 observations/questions:
ExecuteAsync
一旦处理器启动就会立即退出;如果其他东西没有阻塞以保持主机进程处于活动状态,则它可能正在终止。
_logger
会被不同的线程同时调用
ProcessEventHandler
不考虑异常;如果它抛出,负责处理分区的任务就会出错。根据您的主机环境,它可能会重新启动或主机进程可能会崩溃。我们强烈建议遵循 guidance for processor handlers.
支持为每个事件添加检查点,但会对吞吐量产生负面影响。对于大多数情况,我们建议在 X 数量的事件或经过某个固定时间间隔后设置检查点,这些值由您的应用程序可以重新处理的事件数量决定。
我很乐意帮助您深入了解可能导致您所看到的集群行为的原因,但 Stack Overflow 可能不是这样做的最佳场所。您可能希望在 Azure SDK for .NET repository 中提出问题,我们可以在那里解决问题。
上面的代码其实并没有什么问题。 On this GitHub issue 我们进行了一些讨论,并注意到在处理较大的批次(500k 事件)时的预期行为。
截图如下:
我设置了以下测试:
- 创建了一个包含 10 个分区的 azure 事件中心
- 创建了一个存储帐户
- 创建了一个消费者组
- 已用 10k 条消息填满中心
- 创建了 2 个容器(在 AKS 上),它们基本上会使用这些事件(使用相同的消费者组)并记录它们 azure 应用程序见解。
期望:
运行
traces
| where message == "Event received"
| summarize count() by bin(timestamp,1s), cloud_RoleInstance
| render timechart
并看到如下内容:
但我看到的是:
(这是一个 3x 运行,每个事件 10k,以消除“pod 未预热变量”)
请注意 pods activity 之间没有(或很少)重叠,就好像其中一个持有锁或其他东西一样,神秘地,在某些时候,锁是被另一个 pod 释放和使用。
相关消费代码:
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
_processor = new EventProcessorClient(_storageClient, _consumerGroup, _hubConnection, _eventHubName);
_processor.ProcessEventAsync += ProcessEventHandler;
_processor.ProcessErrorAsync += ProcessErrorHandler;
// Start the processing
await _processor.StartProcessingAsync(stoppingToken);
}
internal async Task ProcessEventHandler(ProcessEventArgs eventArgs)
{
_logger.LogTelemetry("Event received");
await eventArgs.UpdateCheckpointAsync(eventArgs.CancellationToken);
}
如上所述,您的高级场景已设置为并行使用。
每个 EventProcessorClient
独立工作,尽管它们通过存储进行协调以在它们之间分割分区的所有权。在这种情况下,每个处理器应该拥有 5 个分区,他们将使用默认配置在约 60-90 秒内声明这些分区,之后所有权应该是稳定的。
对于处理器拥有的每个分区,一个独立的后台任务用于从事件中心读取事件并将它们分派给您的处理程序。您的处理程序将被同时调用,但它保证对给定分区进行单个活动调用。
您看到的结果表明有问题,但推测原因的上下文有限。片段中的几个 observations/questions:
ExecuteAsync
一旦处理器启动就会立即退出;如果其他东西没有阻塞以保持主机进程处于活动状态,则它可能正在终止。_logger
会被不同的线程同时调用ProcessEventHandler
不考虑异常;如果它抛出,负责处理分区的任务就会出错。根据您的主机环境,它可能会重新启动或主机进程可能会崩溃。我们强烈建议遵循 guidance for processor handlers.支持为每个事件添加检查点,但会对吞吐量产生负面影响。对于大多数情况,我们建议在 X 数量的事件或经过某个固定时间间隔后设置检查点,这些值由您的应用程序可以重新处理的事件数量决定。
我很乐意帮助您深入了解可能导致您所看到的集群行为的原因,但 Stack Overflow 可能不是这样做的最佳场所。您可能希望在 Azure SDK for .NET repository 中提出问题,我们可以在那里解决问题。
上面的代码其实并没有什么问题。 On this GitHub issue 我们进行了一些讨论,并注意到在处理较大的批次(500k 事件)时的预期行为。
截图如下: