处理不成功后的 ChangeFeedProcessorBuilder 检查点
ChangeFeedProcessorBuilder checkpointing after unsuccessful processing
我正在调查 ChangeFeedProcessorBuilder processor1
在处理特定更改时引发异常或崩溃的行为。恢复后,将不再拾取相同的更改。有什么办法只有在通知处理成功后才去checkpoint吗?
委托人如下:
var builder = container.GetChangeFeedProcessorBuilder("migrationProcessor",
(IReadOnlyCollection<object> input, CancellationToken cancellationToken) =>
{
Console.WriteLine(input.Count + " Changes Received by " + a);
// just first try will fail (static variable)
if (a++ == 0)
{
throw new Exception();
}
return Task.CompletedTask;
});
谢谢!
Change Feed Processor 的默认行为是在成功执行委托后检查点:https://docs.microsoft.com/azure/cosmos-db/change-feed-processor#processing-life-cycle
主机实例的正常生命周期是:
- 阅读更改提要。
- 如果没有变化,休眠一段预定义的时间(可在构建器中使用 WithPollInterval 自定义)并转到#1。
- 如有更改,请将其发送给代表。
- 当委托成功完成更改处理后,使用最新处理的时间点更新租赁存储并转到#1。
如果您的委托处理程序抛出 未处理的异常,则没有检查点。
从评论中添加:可能不会重试批处理的唯一情况是抛出的批处理是第一个(租约没有延续)。因为当主机再次拿起租约重新处理时,它没有时间点可以重试。根据官方文档,一个租约由单个实例拥有,因此其他实例不可能选择相同的租约并并行处理它(在相同的 Deployment Unit 上下文中)。
我正在调查 ChangeFeedProcessorBuilder processor1
在处理特定更改时引发异常或崩溃的行为。恢复后,将不再拾取相同的更改。有什么办法只有在通知处理成功后才去checkpoint吗?
委托人如下:
var builder = container.GetChangeFeedProcessorBuilder("migrationProcessor",
(IReadOnlyCollection<object> input, CancellationToken cancellationToken) =>
{
Console.WriteLine(input.Count + " Changes Received by " + a);
// just first try will fail (static variable)
if (a++ == 0)
{
throw new Exception();
}
return Task.CompletedTask;
});
谢谢!
Change Feed Processor 的默认行为是在成功执行委托后检查点:https://docs.microsoft.com/azure/cosmos-db/change-feed-processor#processing-life-cycle
主机实例的正常生命周期是:
- 阅读更改提要。
- 如果没有变化,休眠一段预定义的时间(可在构建器中使用 WithPollInterval 自定义)并转到#1。
- 如有更改,请将其发送给代表。
- 当委托成功完成更改处理后,使用最新处理的时间点更新租赁存储并转到#1。
如果您的委托处理程序抛出 未处理的异常,则没有检查点。
从评论中添加:可能不会重试批处理的唯一情况是抛出的批处理是第一个(租约没有延续)。因为当主机再次拿起租约重新处理时,它没有时间点可以重试。根据官方文档,一个租约由单个实例拥有,因此其他实例不可能选择相同的租约并并行处理它(在相同的 Deployment Unit 上下文中)。