关于事件中心处理器中的检查点策略
About checkpoint strategy in event hub processor
我使用事件中心处理器主机来接收和处理来自事件中心的事件。为了获得更好的性能,我每 3 分钟调用一次检查点,而不是每次接收事件时调用:
public async Task ProcessEventAsync(context, messages)
{
foreach (var eventData in messages)
{
// do something
}
if (checkpointStopWatth.Elapsed > TimeSpan.FromMinutes(3);
{
await context.CheckpointAsync();
}
}
但问题是,如果没有新事件发送到事件中心,可能会有一些事件永远不会成为检查点,因为如果没有新消息,ProcessEventAsync 将不会被调用。
有什么建议可以确保所有已处理的事件都是检查点,但仍然每隔几分钟检查一次吗?
更新:根据 Sreeram 的建议,我将代码更新如下:
public async Task ProcessEventAsync(context, messages)
{
foreach (var eventData in messages)
{
// do something
}
this.lastProcessedEventsCount += messages.Count();
if (this.checkpointStopWatth.Elapsed > TimeSpan.FromMinutes(3);
{
this.checkpointStopWatch.Restart();
if (this.lastProcessedEventsCount > 0)
{
await context.CheckpointAsync();
this.lastProcessedEventsCount = 0;
}
}
}
很棒的案例 - 你在报道!
在以下 2 种情况下,您可能会丢失 event checkpoints
(结果是 event replay
):
当您的数据流稀疏时(例如:每 5 分钟一批消息,您的检查点间隔为 3 分钟)并且 EventProcessorHost
实例由于某种原因关闭 - 您可以看到2 min
个 EventData
- 重新处理。为了处理这种情况,
在完成 IEventProcessor.onEvents
/IEventProcessor.ProcessEventsAsync
和检查点后跟踪 lastProcessedEvent
并在关闭时收到通知 - IEventProcessor.onClose
/IEventProcessor.CloseAsync
.
可能只是这样一种情况 - 特定 EventHubs partition
没有更多事件。在这种情况下,您永远不会看到最后一个事件被检查点 - 使用您的 Checkpointing strategy
。但是,当您有连续的 EventData
流并且您没有发送到特定的 EventHubs 分区 (EventHubClient.send(EventData_Without_PartitionKey)
) 时,这种情况并不常见。如果您认为 - 您可以 运行 遇到这种情况,请使用:
EventProcessorOptions.setInvokeProcessorAfterReceiveTimeout(true); // in java or
EventProcessorOptions.InvokeProcessorAfterReceiveTimeout = true; // in C#
标记以每隔一段时间唤醒 processEventsAsync
。然后,跟踪LastProcessedEventData
和LastCheckpointedEventData
,并根据这些事件的EventData.SequenceNumber
属性判断在没有收到Events
时是否进行检查点。
我使用事件中心处理器主机来接收和处理来自事件中心的事件。为了获得更好的性能,我每 3 分钟调用一次检查点,而不是每次接收事件时调用:
public async Task ProcessEventAsync(context, messages)
{
foreach (var eventData in messages)
{
// do something
}
if (checkpointStopWatth.Elapsed > TimeSpan.FromMinutes(3);
{
await context.CheckpointAsync();
}
}
但问题是,如果没有新事件发送到事件中心,可能会有一些事件永远不会成为检查点,因为如果没有新消息,ProcessEventAsync 将不会被调用。
有什么建议可以确保所有已处理的事件都是检查点,但仍然每隔几分钟检查一次吗?
更新:根据 Sreeram 的建议,我将代码更新如下:
public async Task ProcessEventAsync(context, messages)
{
foreach (var eventData in messages)
{
// do something
}
this.lastProcessedEventsCount += messages.Count();
if (this.checkpointStopWatth.Elapsed > TimeSpan.FromMinutes(3);
{
this.checkpointStopWatch.Restart();
if (this.lastProcessedEventsCount > 0)
{
await context.CheckpointAsync();
this.lastProcessedEventsCount = 0;
}
}
}
很棒的案例 - 你在报道!
在以下 2 种情况下,您可能会丢失 event checkpoints
(结果是 event replay
):
当您的数据流稀疏时(例如:每 5 分钟一批消息,您的检查点间隔为 3 分钟)并且
EventProcessorHost
实例由于某种原因关闭 - 您可以看到2 min
个EventData
- 重新处理。为了处理这种情况, 在完成IEventProcessor.onEvents
/IEventProcessor.ProcessEventsAsync
和检查点后跟踪lastProcessedEvent
并在关闭时收到通知 -IEventProcessor.onClose
/IEventProcessor.CloseAsync
.可能只是这样一种情况 - 特定
EventHubs partition
没有更多事件。在这种情况下,您永远不会看到最后一个事件被检查点 - 使用您的Checkpointing strategy
。但是,当您有连续的EventData
流并且您没有发送到特定的 EventHubs 分区 (EventHubClient.send(EventData_Without_PartitionKey)
) 时,这种情况并不常见。如果您认为 - 您可以 运行 遇到这种情况,请使用:EventProcessorOptions.setInvokeProcessorAfterReceiveTimeout(true); // in java or EventProcessorOptions.InvokeProcessorAfterReceiveTimeout = true; // in C#
标记以每隔一段时间唤醒 processEventsAsync
。然后,跟踪LastProcessedEventData
和LastCheckpointedEventData
,并根据这些事件的EventData.SequenceNumber
属性判断在没有收到Events
时是否进行检查点。