如何使用 spring-cloud-stream-binder-kafka 和 RetryTemplate 启用状态重试?

How to enable Stateful Retry using spring-cloud-stream-binder-kafka and RetryTemplate?

我想知道是否有一种方法可以使用 spring-cloud-stream-binder-kafka 启用 Stateful RetryTemplate。

我注意到有一个构造函数

RetryingMessageListenerAdapter(MessageListener<K, V> messageListener, RetryTemplate retryTemplate, RecoveryCallback<? extends Object> recoveryCallback, boolean stateful)

通过这段代码进行调试,我注意到使用 spring-cloud-stream-binder-kafka.

将布尔值 false 传递到 stateful 参数中

相关链接:KafkaMessageDrivenChannelAdapter.java, RetryingMessageListenerAdapter.java

关于这个话题我有几个问题。

  1. 有没有办法通过application.yml或ListenerContainerCustomizerRetryingMessageListenerAdapter的成员stateful设置为true?
  2. 有没有办法使用 spring-cloud-stream-binder-kafka 禁用 RetryTemplate?我认为我发现最接近的是在我的 @StreamRetryTemplate.
  3. 中将 max-attempts 覆盖为 1
  4. 如果我正在使用 spring-kafka 提供的 SeekToCurrentErrorHandler,我应该将 stateful 设置为 true 是否有任何好处或原因,因为 SeekToCurrentErrorHandler 已经是一个状态错误处理机制?

请告诉我。

由于 SeekToCurrentErrorHandler 具有重试和退避功能,因此不再需要状态重试。

是的,要在活页夹中禁用重试,请设置 maxAttempts=1 并适当配置 STCEH。