在 ASP.Net Core 中处理 EventHub 消费者组的事件时忽略旧事件
Ignore old events when processing events of an EventHub consumer group in ASP.Net Core
我有一个 API 和 ASP.Net Core 3.1,它使用 Azure.Messaging.EventHubs
和 Azure.Messaging.EventHubs.Processor
从消费者组获取事件,然后将它们发送到 SignalR 集线器.处理器仅在有用户连接到集线器时运行,并在最后一个用户断开连接时停止,更新其在 BlobStorage 中的检查点。
每个事件的当前处理逻辑:如果 DateTime.UtcNow 和事件时间戳之间的时间差(以分钟为单位)小于 2,它会将事件发送到 SignalR 集线器,仅此而已。
问题如下:有时候EventProcessorClient
停了很长一段时间,EventHub中保留了很多事件,导致等待时间很长,慢慢追上最近的事件,直到 SignalR Hub 再次开始接收它们。处理器赶上最新事件的时间太长了,特别是在考虑每分钟接收数百个事件时。
有没有办法,例如,在启动处理器之前手动移动检查点?或者只获取最后 X 分钟的事件?也许另一个 idea/solution?
PS:我不关心这个消费者组超过 2 到 5 分钟的事件。
PS2:EventHub中配置的保留时间为1天。
代码:
/* properties and stuff */
// Constructor
public BusEventHub(ILogger<BusEventHub> logger, IConfiguration configuration, IHubContext<BusHub> hubContext) {
_logger = logger;
Configuration = configuration;
_busExcessHub = hubContext;
/* Connection strings and stuff */
// Create a blob container client that the event processor will use
storageClient = new BlobContainerClient(this.blobStorageConnectionString, this.blobContainerName);
// Create an event processor client to process events in the event hub
processor = new EventProcessorClient(storageClient, consumerGroup, this.ehubNamespaceConnectionString, this.eventHubName);
// Register handlers for processing events and handling errors
processor.ProcessEventAsync += ProcessEventHandler;
processor.ProcessErrorAsync += ProcessErrorHandler;
}
public async Task Start() {
_logger.LogInformation($"Starting event processing for EventHub {eventHubName}");
await processor.StartProcessingAsync();
}
public async Task Stop() {
if (BusHubUserHandler.ConnectedIds.Count < 2) {
_logger.LogInformation($"Stopping event processing for EventHub {eventHubName}");
await processor.StopProcessingAsync();
} else {
_logger.LogDebug("There are still other users connected");
}
}
private async Task ProcessEventHandler(ProcessEventArgs eventArgs) {
try {
string receivedEvent = Encoding.UTF8.GetString(eventArgs.Data.Body.ToArray());
_logger.LogDebug($"Received event: {receivedEvent}");
BusExcessMinified busExcess = BusExcessMinified.FromJson(receivedEvent);
double timeDiff = (DateTime.UtcNow - busExcess.Timestamp).TotalMinutes;
if (timeDiff < 2) {
string responseEvent = busExcess.ToJson();
_logger.LogDebug($"Sending message to BusExcess Hub: {responseEvent}");
await _busExcessHub.Clients.All.SendAsync("UpdateBuses", responseEvent);
}
_logger.LogDebug("Update checkpoint in the blob storage"); // So that the service receives only new events the next time it's run
await eventArgs.UpdateCheckpointAsync(eventArgs.CancellationToken);
} catch (TaskCanceledException) {
_logger.LogInformation("The EventHub event processing was stopped");
} catch (Exception e) {
_logger.LogError($"Exception: {e}");
}
}
/* ProcessErrorHandler */
可以在分区初始化时请求分区的初始位置,这样您就可以将排队时间指定为起点。这个sample说明了细节。需要注意的是,初始位置仅在分区没有检查点时使用;检查点将始终优先。
从您所描述的场景来看,检查点似乎对您没有用,并且妨碍了您的首选使用模式。如果没有其他缓解因素,我建议不要检查点,而是覆盖默认起始位置以动态重置为您感兴趣的时间。
如果出于某种原因,除此之外还需要检查点,那么最好的办法是删除检查点数据,因为检查点是基于偏移量的,不会识别定位的排队时间。
我有一个 API 和 ASP.Net Core 3.1,它使用 Azure.Messaging.EventHubs
和 Azure.Messaging.EventHubs.Processor
从消费者组获取事件,然后将它们发送到 SignalR 集线器.处理器仅在有用户连接到集线器时运行,并在最后一个用户断开连接时停止,更新其在 BlobStorage 中的检查点。
每个事件的当前处理逻辑:如果 DateTime.UtcNow 和事件时间戳之间的时间差(以分钟为单位)小于 2,它会将事件发送到 SignalR 集线器,仅此而已。
问题如下:有时候EventProcessorClient
停了很长一段时间,EventHub中保留了很多事件,导致等待时间很长,慢慢追上最近的事件,直到 SignalR Hub 再次开始接收它们。处理器赶上最新事件的时间太长了,特别是在考虑每分钟接收数百个事件时。
有没有办法,例如,在启动处理器之前手动移动检查点?或者只获取最后 X 分钟的事件?也许另一个 idea/solution?
PS:我不关心这个消费者组超过 2 到 5 分钟的事件。
PS2:EventHub中配置的保留时间为1天。
代码:
/* properties and stuff */
// Constructor
public BusEventHub(ILogger<BusEventHub> logger, IConfiguration configuration, IHubContext<BusHub> hubContext) {
_logger = logger;
Configuration = configuration;
_busExcessHub = hubContext;
/* Connection strings and stuff */
// Create a blob container client that the event processor will use
storageClient = new BlobContainerClient(this.blobStorageConnectionString, this.blobContainerName);
// Create an event processor client to process events in the event hub
processor = new EventProcessorClient(storageClient, consumerGroup, this.ehubNamespaceConnectionString, this.eventHubName);
// Register handlers for processing events and handling errors
processor.ProcessEventAsync += ProcessEventHandler;
processor.ProcessErrorAsync += ProcessErrorHandler;
}
public async Task Start() {
_logger.LogInformation($"Starting event processing for EventHub {eventHubName}");
await processor.StartProcessingAsync();
}
public async Task Stop() {
if (BusHubUserHandler.ConnectedIds.Count < 2) {
_logger.LogInformation($"Stopping event processing for EventHub {eventHubName}");
await processor.StopProcessingAsync();
} else {
_logger.LogDebug("There are still other users connected");
}
}
private async Task ProcessEventHandler(ProcessEventArgs eventArgs) {
try {
string receivedEvent = Encoding.UTF8.GetString(eventArgs.Data.Body.ToArray());
_logger.LogDebug($"Received event: {receivedEvent}");
BusExcessMinified busExcess = BusExcessMinified.FromJson(receivedEvent);
double timeDiff = (DateTime.UtcNow - busExcess.Timestamp).TotalMinutes;
if (timeDiff < 2) {
string responseEvent = busExcess.ToJson();
_logger.LogDebug($"Sending message to BusExcess Hub: {responseEvent}");
await _busExcessHub.Clients.All.SendAsync("UpdateBuses", responseEvent);
}
_logger.LogDebug("Update checkpoint in the blob storage"); // So that the service receives only new events the next time it's run
await eventArgs.UpdateCheckpointAsync(eventArgs.CancellationToken);
} catch (TaskCanceledException) {
_logger.LogInformation("The EventHub event processing was stopped");
} catch (Exception e) {
_logger.LogError($"Exception: {e}");
}
}
/* ProcessErrorHandler */
可以在分区初始化时请求分区的初始位置,这样您就可以将排队时间指定为起点。这个sample说明了细节。需要注意的是,初始位置仅在分区没有检查点时使用;检查点将始终优先。
从您所描述的场景来看,检查点似乎对您没有用,并且妨碍了您的首选使用模式。如果没有其他缓解因素,我建议不要检查点,而是覆盖默认起始位置以动态重置为您感兴趣的时间。
如果出于某种原因,除此之外还需要检查点,那么最好的办法是删除检查点数据,因为检查点是基于偏移量的,不会识别定位的排队时间。