Spring @StreamListener:无限重试指数退避

Spring @StreamListener: Infinite retries with exponential backoff

我正在尝试将我的消费者配置为使用指数退避,在该退避中消息将被处理固定次数的尝试,其中应用退避期。但是我没有得到预期的行为。

这是我的 Java 代码:

@EnableBinding({
        MessagingConfiguration.EventTopic.class
})
public class MessagingConfiguration {

    public interface EventTopic {
        String INPUT = "events-channel";

        @Input(INPUT)
        @Nonnull
        SubscribableChannel input();
    }
}
  
@StreamListener(MessagingConfiguration.EventTopic.INPUT))
void handle(@Nonnull Message<Event> event) {
    throw new RuntimeException("FAILING!");
}

如果我尝试下一个配置:

spring.cloud.stream:
  bindings:
    events-channel:
      content-type: application/json
      destination: event-develop
      group: group-event-service
      consumer:
        max-attempts: 2

重试 (20*) 次后,我收到此消息:

Backoff FixedBackOff{interval=0, currentAttempts=10, maxAttempts=9} exhausted for ConsumerRecord(...

2 (consumer.max-attempts) * 10 (FixedBackOff.currentAttempts) = 20* 次重试

所有这些重试都有 1 秒的延迟(默认退避期)

如果我将配置更改为:

spring.cloud.stream:
  bindings:
    events-channel:
      content-type: application/json
      destination: event-develop
      group: group-event-service
      consumer:
        max-attempts: 8
        #Times in milliseconds
        back-off-initial-interval: 1000
        back-off-max-interval: 60000
        back-off-multiplier: 2

退避期在 8 次重试期间应用得很好 (max-attempts) 但是当 8 次重试完成时,新的重试周期开始无限期地阻塞主题。

在下一个版本中,也许我会实现更复杂的错误处理系统,但现在我只需要在重试后丢弃消息并获取下一个。

我做错了什么?

我看了很多questions/answers这里,官方文档和网上的一些教程,但我没有找到避免重试无限循环的解决方案。

P.S.: 我正在与 spring-cloud-stream (3.1.1)spring-kafka (2.6.6)

合作

这是因为侦听器容器现在默认配置为 SeekToCurrentErrorHandler 尝试 10 次。

这意味着您正在复合重试。

您可以使用 ListenerContainerCustomizer @Bean.

注入适当配置的 SeekToCurrentErrorHandler

建议两处都不要配置重试;删除绑定配置并将其替换为适当配置的错误处理程序,或者将错误处理程序更改为不重试。