了解 Spring Cloud Stream Kafka 和 Spring 重试
Understanding Spring Cloud Stream Kafka and Spring Retry
我有一个使用 Kafka 绑定器的 Spring Cloud Stream 项目,我正在尝试了解并最终自定义 Cloud Stream 使用的 RetryTemplate。
我没有找到很多关于其工作原理的文档,但我阅读的内容使我得出以下假设:
- Cloud Stream 默认配置并启用Spring 重试,包括默认重试和退避策略。
- 默认情况下,
@StreamListener
中任何未捕获的异常都将触发 Spring 重试
- Cloud Stream 会以某种方式跟踪每条消息的 RetryContext 信息(如何?我不确定)
这些假设是否正确?
现在,在我的应用程序中,我有一个模式,其中一些消息可以立即处理,但其他消息必须推迟到以后再试(使用指数退避等)。
我应该抛出异常导致 Spring Cloud Stream 在绑定层重试这些消息,还是自己实现重试并跟踪我自己的重试上下文?
如果我应该依赖 Cloud Stream 的重试设置,我应该如何自定义退避策略等?
default retry configuration 尝试 3 次,初始延迟 1 秒,2.0 倍数,最大延迟 10 秒。
默认使用无状态重试,这意味着重试在内存中。
poll()
返回的所有记录的所有重试总延迟不得超过 max.poll.interval.ms
。
使用 Apache Kafka 的现代版本 Spring(由活页夹使用);最好禁用活页夹重试 (maxAttempts=1
) 并使用 SeekToCurrentErrorHandler
并配置适当的 BackOff
。
您可以使用 ListenerContainerCustomizer<AbstractMessageListenerContainer<?, ?>>
@Bean
和 return (container, dest, grp) -> container.setErrorHandler(handler)
.
设置错误处理程序
这样就避免了上面提到的问题,只是一条记录的最大延迟间隔必须小于max.poll.interval.ms
。
您还可以分类哪些异常是可重试的,哪些不是,以及配置一个死信恢复器,在重试用尽后调用。
我有一个使用 Kafka 绑定器的 Spring Cloud Stream 项目,我正在尝试了解并最终自定义 Cloud Stream 使用的 RetryTemplate。
我没有找到很多关于其工作原理的文档,但我阅读的内容使我得出以下假设:
- Cloud Stream 默认配置并启用Spring 重试,包括默认重试和退避策略。
- 默认情况下,
@StreamListener
中任何未捕获的异常都将触发 Spring 重试 - Cloud Stream 会以某种方式跟踪每条消息的 RetryContext 信息(如何?我不确定)
这些假设是否正确?
现在,在我的应用程序中,我有一个模式,其中一些消息可以立即处理,但其他消息必须推迟到以后再试(使用指数退避等)。
我应该抛出异常导致 Spring Cloud Stream 在绑定层重试这些消息,还是自己实现重试并跟踪我自己的重试上下文?
如果我应该依赖 Cloud Stream 的重试设置,我应该如何自定义退避策略等?
default retry configuration 尝试 3 次,初始延迟 1 秒,2.0 倍数,最大延迟 10 秒。
默认使用无状态重试,这意味着重试在内存中。
poll()
返回的所有记录的所有重试总延迟不得超过 max.poll.interval.ms
。
使用 Apache Kafka 的现代版本 Spring(由活页夹使用);最好禁用活页夹重试 (maxAttempts=1
) 并使用 SeekToCurrentErrorHandler
并配置适当的 BackOff
。
您可以使用 ListenerContainerCustomizer<AbstractMessageListenerContainer<?, ?>>
@Bean
和 return (container, dest, grp) -> container.setErrorHandler(handler)
.
这样就避免了上面提到的问题,只是一条记录的最大延迟间隔必须小于max.poll.interval.ms
。
您还可以分类哪些异常是可重试的,哪些不是,以及配置一个死信恢复器,在重试用尽后调用。