强制 EventProcessorHost 将失败的 Azure 事件中心 eventData 重新传送到 IEventProcessor.ProcessEvents 方法

Forcing EventProcessorHost to re-deliver failed Azure Event Hub eventData's to IEventProcessor.ProcessEvents method

该应用程序使用 .NET 4.6.1 和 Microsoft.Azure.ServiceBus.EventProcessorHost nuget package v2.0.2, along with it's dependency WindowsAzure.ServiceBus package v3.0.1 来处理 Azure 事件中心消息。

该应用程序实现了 IEventProcessor。当 ProcessEventsAsync 方法抛出未处理的异常时,EventProcessorHost 永远不会将这些消息重新发送到 IEventProcessor 的 运行 实例。 (有趣的是,如果托管应用程序停止并重新启动,或者租约丢失并重新获得,它将重新发送。)

有没有办法强制 EventProcessorHost 将导致异常的事件消息重新发送到 IEventProcessor 实现?

此评论中针对几乎相同的问题提出了一种可能的解决方案: Redeliver unprocessed EventHub messages in IEventProcessor.ProcessEventsAsync

该评论建议保留最后一次成功处理的事件消息的副本,并在 ProcessEventsAsync 中发生异常时使用该消息显式检查点。然而,在实施和测试这样的解决方案之后,EventProcessorHost 仍然没有重新发送。实现非常简单:

private EventData _lastSuccessfulEvent;

public async Task ProcessEventsAsync(
    PartitionContext context,
    IEnumerable<EventData> messages)
{
    try
    {
        await ProcessEvents(context, messages);     // does actual processing, may throw exception
        _lastSuccessfulEvent = messages
            .OrderByDescending(ed => ed.SequenceNumber)
            .First();
    }
    catch(Exception ex)
    {
        await context.CheckpointAsync(_lastSuccessfulEvent);
    }
}

实战分析:

此处提供了部分日志示例:https://gist.github.com/ttbjj/4781aa992941e00e4e15e0bf1c45f316#file-gistfile1-txt

简单回答: 你试过了吗EventProcessorHost.ResetConnection(string partiotionId)

复杂答案: 可能是你这边需要解决的架构问题,为什么处理失败了?是暂时性错误吗?重试处理逻辑是一种可能的情况吗?等等...

TLDR到re-play到IEventProcessor.ProcessEventsAsync的一批失败事件的唯一可靠方法是-Shutdown EventProcessorHost(又名 EPH)立即 - 通过使用 eph.UnregisterEventProcessorAsync()terminating the process - 根据情况。这将让其他 EPH 个实例获取此分区的租约并从上一个检查点开始。

在解释之前 - 我想 call-out 这是一个 很好的问题 并且确实是我们必须为 EPH 做出的最艰难的设计选择之一。在我看来,它是 trade-off b/w: usability/supportabilityEPH 框架,对比 Technical-Correctness

理想情况 应该是:当 IEventProcessorImpl.ProcessEventsAsync 中的 user-code 抛出异常时 - EPH 库不应该捕捉到这个。它应该让这个 Exception - 使进程崩溃并且 crash-dump 清楚地表明 callstack 负责。我仍然相信 - 这是最 technically-correct 的解决方案。

现状IEventProcessorImpl.ProcessEventsAsyncAPI&EPH的合同是,

  1. 只要可以从 EventHubs 服务接收到 EventData - 继续使用 [=31] 调用 user-callback (IEventProcessorImplementation.ProcessEventsAsync) =] & 如果 user-callback 在调用时抛出错误,通知 EventProcessorOptions.ExceptionReceived.
  2. IEventProcessorImpl.ProcessEventsAsync 中的
  3. User-code 应处理所有错误并在必要时合并 Retry'sEPH 没有在此 call-back 上设置任何超时以让用户完全控制 processing-time。
  4. 如果特定事件是麻烦的原因 - 用特殊的 属性 标记 EventData - 对于 ex:type=poison-event 和 re-send相同的 EventHub(包括指向实际事件的指针,将这些 EventData.OffsetSequenceNumber 复制到新的 EventData.ApplicationProperties 中)或将其转发到 SERVICEBUS 队列或将其存储在其他地方,基本上,识别并推迟处理 poison-event
  5. 如果您处理了所有可能的情况并且仍然 运行 进入 Exceptions - 捕捉并关闭 EPHfailfast 出现此异常的进程。当 EPH 恢复时 - 它将从 where-it-left 开始。

为什么 check-pointing 'the old event' 不起作用 (阅读 以了解 EPH):

在幕后,EPH 是每个 EventHub Consumergroup 分区的接收器的 运行 泵 - 其工作是从给定的 checkpoint(如果存在)启动接收器并创建 [= 的专用实例49=] 实施,然后 receive 来自检查点中指定 Offset 的指定 EventHub 分区(如果不存在 - EventProcessorOptions.initialOffsetProvider)并最终调用 IEventProcessorImpl.ProcessEventsAsyncCheckpoint 的目的是在 EPH 进程关闭并且 Partition 的所有权转移到另一个 EPH 实例时能够可靠地开始处理消息。因此,checkpoint 将仅在启动 PUMP 时被消耗,一旦泵启动,将 NOT 被读取。

在我撰写本文时,EPH 的版本为 2.2.10

more general reading on Event Hubs...