空闲时在 Azure.Messaging.EventHubs.EventProcessorClient 上更新事件中心分区偏移检查点

Update EventHub Partition Offsett Checkpoint on Azure.Messaging.EventHubs.EventProcessorClient When Idle

在我的场景中,我将同时接收大量事件,然后在 EventHub 空闲时等待很长时间。在我的处理器客户端中,我想每 N 个事件或 N 分钟(以先到者为准)检查一次。

这是我设置 Azure.Messaging.EventHubs.EventProcessorClient 的方式:

EventProcessorClient processor = new EventProcessorClient(storageClient, consumerGroup, ehubNamespaceConnectionString, eventHubName);
processor.ProcessEventAsync += ProcessEventHandler;
processor.ProcessErrorAsync += ProcessErrorHandler;

//Start Stopwatch
_checkpointStopWatch = new Stopwatch();
_checkpointStopWatch.Start();

// Start the processing
await processor.StartProcessingAsync();

while (true)
{
    await Task.Delay(TimeSpan.FromSeconds(10));
    Console.WriteLine($"{eventsProcessed} events have been processed");
}

在我的 ProcessEventHandler 中,我检查了 eventsProcessedSinceLastCheckpoint 以及秒表上经过的时间。当其中一个达到最大值时,我将两者都重置并在我的控制台中记录下来 window:

static async Task<Task> ProcessEventHandler(ProcessEventArgs eventArgs)
{
   ++eventsProcessed;
   ++eventsProcessedSinceLastCheckpoint;

   Console.WriteLine("\tReceived event: {0}", Encoding.UTF8.GetString(eventArgs.Data.Body.ToArray()));

    // After every 100 events or 2 minutes we add a checkpoint. Whichever occurs first
    if(eventsProcessedSinceLastCheckpoint >= 100 || _checkpointStopWatch.Elapsed > TimeSpan.FromMinutes(2))
    {
        eventsProcessedSinceLastCheckpoint = 0;
        _checkpointStopWatch.Restart();

        await eventArgs.UpdateCheckpointAsync();
        Console.WriteLine("> Checkpoint Set. Count Reset. Stopwatch Reset.");
    }
    return Task.CompletedTask;

}

对 eventsProcessedSinceLastCheckpoint 变量的检查工作完美,因为只要有新事件进入,就会触发 ProcessEventHandler。但是,当 EventHub 空闲时,不会调用 ProcessEventHandler,因此在 EventHub 安静几分钟或几小时的情况下,我永远不会检查点关于过去的时间。

我知道我可以删除计时器,并且如果检查点之间发生崩溃,我的处理器应该能够处理重复事件。但是在我的场景中(因为我会有很长的空闲时间)我想利用我有的时间赶上来,以避免在可能的时候出现额外的重复。因此,在空闲期间添加定时器作为回退。

我的问题是: 如何在 ProcessEventHandler 之外调用 UpdateCheckpointAsync()?该方法似乎只存在于 ProcessEventArgs 上。我不能直接在 EventProcessorClient 上调用它,这将是理想的,因为我可以将计时器检查移到 ProcessEventHandler 之外并进入我的 while 循环....

在创建处理器实例时设置 EventHubProcessorClientOptions.MaximumWaitTime 属性 将允许在未读取任何事件时调用处理程序。当设置为非空时,等待时间基本就是"give me events as soon as you get them, but ping my handler if no events have been read during this interval."

关于在这种情况下更新检查点,推荐的方法是缓存发送到处理程序的最后一个事件的参数,并使用它来调用 UpdateCheckpointAsyncThis sample 演示了该方法,确保它在处理分区停止时创建检查点。