Spring @StreamListener:无限重试指数退避
Spring @StreamListener: Infinite retries with exponential backoff
我正在尝试将我的消费者配置为使用指数退避,在该退避中消息将被处理固定次数的尝试,其中应用退避期。但是我没有得到预期的行为。
这是我的 Java 代码:
@EnableBinding({
MessagingConfiguration.EventTopic.class
})
public class MessagingConfiguration {
public interface EventTopic {
String INPUT = "events-channel";
@Input(INPUT)
@Nonnull
SubscribableChannel input();
}
}
@StreamListener(MessagingConfiguration.EventTopic.INPUT))
void handle(@Nonnull Message<Event> event) {
throw new RuntimeException("FAILING!");
}
如果我尝试下一个配置:
spring.cloud.stream:
bindings:
events-channel:
content-type: application/json
destination: event-develop
group: group-event-service
consumer:
max-attempts: 2
重试 (20*) 次后,我收到此消息:
Backoff FixedBackOff{interval=0, currentAttempts=10, maxAttempts=9} exhausted for ConsumerRecord(...
2 (consumer.max-attempts
) * 10 (FixedBackOff.currentAttempts
) = 20* 次重试
所有这些重试都有 1 秒的延迟(默认退避期)
如果我将配置更改为:
spring.cloud.stream:
bindings:
events-channel:
content-type: application/json
destination: event-develop
group: group-event-service
consumer:
max-attempts: 8
#Times in milliseconds
back-off-initial-interval: 1000
back-off-max-interval: 60000
back-off-multiplier: 2
退避期在 8 次重试期间应用得很好 (max-attempts
) 但是当 8 次重试完成时,新的重试周期开始无限期地阻塞主题。
在下一个版本中,也许我会实现更复杂的错误处理系统,但现在我只需要在重试后丢弃消息并获取下一个。
我做错了什么?
我看了很多questions/answers这里,官方文档和网上的一些教程,但我没有找到避免重试无限循环的解决方案。
P.S.: 我正在与 spring-cloud-stream (3.1.1)
和 spring-kafka (2.6.6)
合作
这是因为侦听器容器现在默认配置为 SeekToCurrentErrorHandler
尝试 10 次。
这意味着您正在复合重试。
您可以使用 ListenerContainerCustomizer
@Bean
.
注入适当配置的 SeekToCurrentErrorHandler
建议两处都不要配置重试;删除绑定配置并将其替换为适当配置的错误处理程序,或者将错误处理程序更改为不重试。
我正在尝试将我的消费者配置为使用指数退避,在该退避中消息将被处理固定次数的尝试,其中应用退避期。但是我没有得到预期的行为。
这是我的 Java 代码:
@EnableBinding({
MessagingConfiguration.EventTopic.class
})
public class MessagingConfiguration {
public interface EventTopic {
String INPUT = "events-channel";
@Input(INPUT)
@Nonnull
SubscribableChannel input();
}
}
@StreamListener(MessagingConfiguration.EventTopic.INPUT))
void handle(@Nonnull Message<Event> event) {
throw new RuntimeException("FAILING!");
}
如果我尝试下一个配置:
spring.cloud.stream:
bindings:
events-channel:
content-type: application/json
destination: event-develop
group: group-event-service
consumer:
max-attempts: 2
重试 (20*) 次后,我收到此消息:
Backoff FixedBackOff{interval=0, currentAttempts=10, maxAttempts=9} exhausted for ConsumerRecord(...
2 (consumer.max-attempts
) * 10 (FixedBackOff.currentAttempts
) = 20* 次重试
所有这些重试都有 1 秒的延迟(默认退避期)
如果我将配置更改为:
spring.cloud.stream:
bindings:
events-channel:
content-type: application/json
destination: event-develop
group: group-event-service
consumer:
max-attempts: 8
#Times in milliseconds
back-off-initial-interval: 1000
back-off-max-interval: 60000
back-off-multiplier: 2
退避期在 8 次重试期间应用得很好 (max-attempts
) 但是当 8 次重试完成时,新的重试周期开始无限期地阻塞主题。
在下一个版本中,也许我会实现更复杂的错误处理系统,但现在我只需要在重试后丢弃消息并获取下一个。
我做错了什么?
我看了很多questions/answers这里,官方文档和网上的一些教程,但我没有找到避免重试无限循环的解决方案。
P.S.: 我正在与 spring-cloud-stream (3.1.1)
和 spring-kafka (2.6.6)
这是因为侦听器容器现在默认配置为 SeekToCurrentErrorHandler
尝试 10 次。
这意味着您正在复合重试。
您可以使用 ListenerContainerCustomizer
@Bean
.
SeekToCurrentErrorHandler
建议两处都不要配置重试;删除绑定配置并将其替换为适当配置的错误处理程序,或者将错误处理程序更改为不重试。