强制 EventProcessorHost 将失败的 Azure 事件中心 eventData 重新传送到 IEventProcessor.ProcessEvents 方法
Forcing EventProcessorHost to re-deliver failed Azure Event Hub eventData's to IEventProcessor.ProcessEvents method
该应用程序使用 .NET 4.6.1 和 Microsoft.Azure.ServiceBus.EventProcessorHost nuget package v2.0.2, along with it's dependency WindowsAzure.ServiceBus package v3.0.1 来处理 Azure 事件中心消息。
该应用程序实现了 IEventProcessor
。当 ProcessEventsAsync
方法抛出未处理的异常时,EventProcessorHost
永远不会将这些消息重新发送到 IEventProcessor
的 运行 实例。 (有趣的是,如果托管应用程序停止并重新启动,或者租约丢失并重新获得,它将重新发送。)
有没有办法强制 EventProcessorHost
将导致异常的事件消息重新发送到 IEventProcessor
实现?
此评论中针对几乎相同的问题提出了一种可能的解决方案:
Redeliver unprocessed EventHub messages in IEventProcessor.ProcessEventsAsync
该评论建议保留最后一次成功处理的事件消息的副本,并在 ProcessEventsAsync
中发生异常时使用该消息显式检查点。然而,在实施和测试这样的解决方案之后,EventProcessorHost
仍然没有重新发送。实现非常简单:
private EventData _lastSuccessfulEvent;
public async Task ProcessEventsAsync(
PartitionContext context,
IEnumerable<EventData> messages)
{
try
{
await ProcessEvents(context, messages); // does actual processing, may throw exception
_lastSuccessfulEvent = messages
.OrderByDescending(ed => ed.SequenceNumber)
.First();
}
catch(Exception ex)
{
await context.CheckpointAsync(_lastSuccessfulEvent);
}
}
实战分析:
此处提供了部分日志示例:https://gist.github.com/ttbjj/4781aa992941e00e4e15e0bf1c45f316#file-gistfile1-txt
简单回答:
你试过了吗EventProcessorHost.ResetConnection(string partiotionId)?
复杂答案:
可能是你这边需要解决的架构问题,为什么处理失败了?是暂时性错误吗?重试处理逻辑是一种可能的情况吗?等等...
TLDR:到re-play到IEventProcessor.ProcessEventsAsync
的一批失败事件的唯一可靠方法是-Shutdown
EventProcessorHost
(又名 EPH
)立即 - 通过使用 eph.UnregisterEventProcessorAsync()
或 terminating the process - 根据情况。这将让其他 EPH
个实例获取此分区的租约并从上一个检查点开始。
在解释之前 - 我想 call-out 这是一个 很好的问题 并且确实是我们必须为 EPH
做出的最艰难的设计选择之一。在我看来,它是 trade-off b/w: usability
/supportability
的 EPH
框架,对比 Technical-Correctness
。
理想情况 应该是:当 IEventProcessorImpl.ProcessEventsAsync
中的 user-code 抛出异常时 - EPH
库不应该捕捉到这个。它应该让这个 Exception
- 使进程崩溃并且 crash-dump
清楚地表明 callstack
负责。我仍然相信 - 这是最 technically-correct
的解决方案。
现状:IEventProcessorImpl.ProcessEventsAsync
API&EPH
的合同是,
- 只要可以从 EventHubs 服务接收到
EventData
- 继续使用 [=31] 调用 user-callback (IEventProcessorImplementation.ProcessEventsAsync
) =] & 如果 user-callback 在调用时抛出错误,通知 EventProcessorOptions.ExceptionReceived
.
IEventProcessorImpl.ProcessEventsAsync
中的 - User-code 应处理所有错误并在必要时合并
Retry's
。 EPH
没有在此 call-back 上设置任何超时以让用户完全控制 processing-time。
- 如果特定事件是麻烦的原因 - 用特殊的 属性 标记
EventData
- 对于 ex:type=poison-event
和 re-send相同的 EventHub
(包括指向实际事件的指针,将这些 EventData.Offset
和 SequenceNumber
复制到新的 EventData.ApplicationProperties
中)或将其转发到 SERVICEBUS 队列或将其存储在其他地方,基本上,识别并推迟处理 poison-event。
- 如果您处理了所有可能的情况并且仍然 运行 进入
Exceptions
- 捕捉并关闭 EPH
或 failfast
出现此异常的进程。当 EPH
恢复时 - 它将从 where-it-left 开始。
为什么 check-pointing 'the old event' 不起作用 (阅读 以了解 EPH
):
在幕后,EPH
是每个 EventHub Consumergroup 分区的接收器的 运行 泵 - 其工作是从给定的 checkpoint
(如果存在)启动接收器并创建 [= 的专用实例49=] 实施,然后 receive
来自检查点中指定 Offset
的指定 EventHub 分区(如果不存在 - EventProcessorOptions.initialOffsetProvider
)并最终调用 IEventProcessorImpl.ProcessEventsAsync
。 Checkpoint
的目的是在 EPH
进程关闭并且 Partition 的所有权转移到另一个 EPH
实例时能够可靠地开始处理消息。因此,checkpoint
将仅在启动 PUMP 时被消耗,一旦泵启动,将 NOT 被读取。
在我撰写本文时,EPH
的版本为 2.2.10。
该应用程序使用 .NET 4.6.1 和 Microsoft.Azure.ServiceBus.EventProcessorHost nuget package v2.0.2, along with it's dependency WindowsAzure.ServiceBus package v3.0.1 来处理 Azure 事件中心消息。
该应用程序实现了 IEventProcessor
。当 ProcessEventsAsync
方法抛出未处理的异常时,EventProcessorHost
永远不会将这些消息重新发送到 IEventProcessor
的 运行 实例。 (有趣的是,如果托管应用程序停止并重新启动,或者租约丢失并重新获得,它将重新发送。)
有没有办法强制 EventProcessorHost
将导致异常的事件消息重新发送到 IEventProcessor
实现?
此评论中针对几乎相同的问题提出了一种可能的解决方案: Redeliver unprocessed EventHub messages in IEventProcessor.ProcessEventsAsync
该评论建议保留最后一次成功处理的事件消息的副本,并在 ProcessEventsAsync
中发生异常时使用该消息显式检查点。然而,在实施和测试这样的解决方案之后,EventProcessorHost
仍然没有重新发送。实现非常简单:
private EventData _lastSuccessfulEvent;
public async Task ProcessEventsAsync(
PartitionContext context,
IEnumerable<EventData> messages)
{
try
{
await ProcessEvents(context, messages); // does actual processing, may throw exception
_lastSuccessfulEvent = messages
.OrderByDescending(ed => ed.SequenceNumber)
.First();
}
catch(Exception ex)
{
await context.CheckpointAsync(_lastSuccessfulEvent);
}
}
实战分析:
此处提供了部分日志示例:https://gist.github.com/ttbjj/4781aa992941e00e4e15e0bf1c45f316#file-gistfile1-txt
简单回答: 你试过了吗EventProcessorHost.ResetConnection(string partiotionId)?
复杂答案: 可能是你这边需要解决的架构问题,为什么处理失败了?是暂时性错误吗?重试处理逻辑是一种可能的情况吗?等等...
TLDR:到re-play到IEventProcessor.ProcessEventsAsync
的一批失败事件的唯一可靠方法是-Shutdown
EventProcessorHost
(又名 EPH
)立即 - 通过使用 eph.UnregisterEventProcessorAsync()
或 terminating the process - 根据情况。这将让其他 EPH
个实例获取此分区的租约并从上一个检查点开始。
在解释之前 - 我想 call-out 这是一个 很好的问题 并且确实是我们必须为 EPH
做出的最艰难的设计选择之一。在我看来,它是 trade-off b/w: usability
/supportability
的 EPH
框架,对比 Technical-Correctness
。
理想情况 应该是:当 IEventProcessorImpl.ProcessEventsAsync
中的 user-code 抛出异常时 - EPH
库不应该捕捉到这个。它应该让这个 Exception
- 使进程崩溃并且 crash-dump
清楚地表明 callstack
负责。我仍然相信 - 这是最 technically-correct
的解决方案。
现状:IEventProcessorImpl.ProcessEventsAsync
API&EPH
的合同是,
- 只要可以从 EventHubs 服务接收到
EventData
- 继续使用 [=31] 调用 user-callback (IEventProcessorImplementation.ProcessEventsAsync
) =] & 如果 user-callback 在调用时抛出错误,通知EventProcessorOptions.ExceptionReceived
. - User-code 应处理所有错误并在必要时合并
Retry's
。EPH
没有在此 call-back 上设置任何超时以让用户完全控制 processing-time。 - 如果特定事件是麻烦的原因 - 用特殊的 属性 标记
EventData
- 对于 ex:type=poison-event
和 re-send相同的EventHub
(包括指向实际事件的指针,将这些EventData.Offset
和SequenceNumber
复制到新的EventData.ApplicationProperties
中)或将其转发到 SERVICEBUS 队列或将其存储在其他地方,基本上,识别并推迟处理 poison-event。 - 如果您处理了所有可能的情况并且仍然 运行 进入
Exceptions
- 捕捉并关闭EPH
或failfast
出现此异常的进程。当EPH
恢复时 - 它将从 where-it-left 开始。
IEventProcessorImpl.ProcessEventsAsync
中的 为什么 check-pointing 'the old event' 不起作用 (阅读
在幕后,
在我撰写本文时,EPH
):
EPH
是每个 EventHub Consumergroup 分区的接收器的 运行 泵 - 其工作是从给定的 checkpoint
(如果存在)启动接收器并创建 [= 的专用实例49=] 实施,然后 receive
来自检查点中指定 Offset
的指定 EventHub 分区(如果不存在 - EventProcessorOptions.initialOffsetProvider
)并最终调用 IEventProcessorImpl.ProcessEventsAsync
。 Checkpoint
的目的是在 EPH
进程关闭并且 Partition 的所有权转移到另一个 EPH
实例时能够可靠地开始处理消息。因此,checkpoint
将仅在启动 PUMP 时被消耗,一旦泵启动,将 NOT 被读取。
EPH
的版本为 2.2.10。