Spring-Kafka 2.6.5 通过状态重试和 SeekToCurrentErrorHandler 无限重试策略
Spring-Kafka 2.6.5 infinite retry policy via stateful retry and SeekToCurrentErrorHandler
如标题所示,我使用的是 spring-kafka 2.6.5 版。在我的架构中,我有一个主要主题,它有一个 SimpleRetryPolicy 和一个 ExponentialBackoffPolicy。如果重试次数用尽,我有一个 RecoveryCallback 将消息发送到错误主题。错误主题是我的问题所在。
在这个错误主题中,我需要能够执行无限重试并且不让任何消息丢失。 'dropped',我的意思是,如果 Spring 崩溃或发生其他同样糟糕的事情,我需要确保在恢复时,可以重新轮询任何正在处理的消息(顺序无关紧要)。基本上我认为我需要配置 ACK,以便在处理完成后确认它们。至于无限重试,我四处搜索并发现了一些来自 Gary Russell 等用户的有用建议。不幸的是,不同的 spring-kafka 版本和弃用使得为我的需求和版本拼凑出一个清晰的解决方案变得有点困难。
目前,我的设置如下:
@KafkaListener(topics = "my_topic",
groupId = "my_group_id",
containerFactory = "kafkaErrorListenerContainerFactory")
public void listenErrorTopic(String message) throws Exception {
processingLogic(message);
// Do I need to manually ACK afterwards (and thus also include additional params to access needed
// message components)?
}
@Bean
public ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> props = new HashMap();
...
// Basing the need for the below 2 props off of previously found posts
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
// Unsure if the below prop is needed
// props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed")
...
return new DefaultKafkaConsumerFactory<>(props);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaErrorListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new
ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
// A previous post said that infinite retries could only be achieved via state retry and STCEH,
// but there is an alternative in 2.6?
factory.setStatefulRetry(true);
// A previous post had '-1' passed to SeekToCurrentErrorHandler, but that is no longer possible.
// It was suggested instead to pass Long.MAX_VALUE to the backoff period for later versions, but the
// policy shown was a FixedBackOffPolicy.
factory.setErrorHandler(new SeekToCurrentErrorHandler());
RetryTemplate retryTemplate = new retryTemplate();
retryTemplate.setRetryPolicy(new AlwaysRetryPolicy());
// Do I need a recovery callback set in my RetryTemplate if I want it to be infinite?
ExponentialBackOffPolicy backoffPolicy = new ExponentialBackOffPolicy();
backOffPolicy.setInitialInterval(<props file value insertion here>)
backOffPolicy.setMultiplier(<props file value insertion here>)
backOffPolicy.setMaxInterval(<props file value insertion here>)
retryTemplate.setBackOffPolicy(backoffPolicy);
factory.setRetryTemplate(retryTemplate);
return factory;
}
理想情况下,我更喜欢指数而不是固定,但我主要关心的是在不 max.interval.ms 触发重新平衡的情况下无限完成它的能力。我在不确定的代码块中留下了评论。如果有人能澄清一下,将不胜感激!
在 STCEH 支持回退之前,使用有状态重试专门用于与 STCEH 一起使用以避免重新平衡。
但是,现在 STCEH 支持退避,最好在重试模板上使用它。
如果两者都使用,则实际重试次数是 STCEH 和重试模板重试次数的倍数。
Now that the SeekToCurrentErrorHandler
can be configured with a BackOff
and has the ability to retry only certain exceptions (since version 2.3), the use of stateful retry, via the listener adapter retry configuration, is no longer necessary. You can provide the same functionality with appropriate configuration of the error handler and remove all retry configuration from the listener adapter. See Seek To Current Container Error Handlers for more information.
配置简单多了。
您不需要使用手动确认;容器将根据 AckMode
BATCH
(默认)或 RECORD
提交偏移量。后者成本更高,但重新交付的机会更少。
要无限重试,请在 maxAttempts
属性.
中使用 FixedBackOff
和 UNLIMITED_ATTEMPTS (Long.MAX_VALUE
)
默认情况下ExponentialBackOff
会无限重试。您只需要确保 maxInterval
小于 max.poll.interval.ms
即可避免重新平衡。
如标题所示,我使用的是 spring-kafka 2.6.5 版。在我的架构中,我有一个主要主题,它有一个 SimpleRetryPolicy 和一个 ExponentialBackoffPolicy。如果重试次数用尽,我有一个 RecoveryCallback 将消息发送到错误主题。错误主题是我的问题所在。
在这个错误主题中,我需要能够执行无限重试并且不让任何消息丢失。 'dropped',我的意思是,如果 Spring 崩溃或发生其他同样糟糕的事情,我需要确保在恢复时,可以重新轮询任何正在处理的消息(顺序无关紧要)。基本上我认为我需要配置 ACK,以便在处理完成后确认它们。至于无限重试,我四处搜索并发现了一些来自 Gary Russell 等用户的有用建议。不幸的是,不同的 spring-kafka 版本和弃用使得为我的需求和版本拼凑出一个清晰的解决方案变得有点困难。
目前,我的设置如下:
@KafkaListener(topics = "my_topic",
groupId = "my_group_id",
containerFactory = "kafkaErrorListenerContainerFactory")
public void listenErrorTopic(String message) throws Exception {
processingLogic(message);
// Do I need to manually ACK afterwards (and thus also include additional params to access needed
// message components)?
}
@Bean
public ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> props = new HashMap();
...
// Basing the need for the below 2 props off of previously found posts
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
// Unsure if the below prop is needed
// props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed")
...
return new DefaultKafkaConsumerFactory<>(props);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaErrorListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new
ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
// A previous post said that infinite retries could only be achieved via state retry and STCEH,
// but there is an alternative in 2.6?
factory.setStatefulRetry(true);
// A previous post had '-1' passed to SeekToCurrentErrorHandler, but that is no longer possible.
// It was suggested instead to pass Long.MAX_VALUE to the backoff period for later versions, but the
// policy shown was a FixedBackOffPolicy.
factory.setErrorHandler(new SeekToCurrentErrorHandler());
RetryTemplate retryTemplate = new retryTemplate();
retryTemplate.setRetryPolicy(new AlwaysRetryPolicy());
// Do I need a recovery callback set in my RetryTemplate if I want it to be infinite?
ExponentialBackOffPolicy backoffPolicy = new ExponentialBackOffPolicy();
backOffPolicy.setInitialInterval(<props file value insertion here>)
backOffPolicy.setMultiplier(<props file value insertion here>)
backOffPolicy.setMaxInterval(<props file value insertion here>)
retryTemplate.setBackOffPolicy(backoffPolicy);
factory.setRetryTemplate(retryTemplate);
return factory;
}
理想情况下,我更喜欢指数而不是固定,但我主要关心的是在不 max.interval.ms 触发重新平衡的情况下无限完成它的能力。我在不确定的代码块中留下了评论。如果有人能澄清一下,将不胜感激!
在 STCEH 支持回退之前,使用有状态重试专门用于与 STCEH 一起使用以避免重新平衡。
但是,现在 STCEH 支持退避,最好在重试模板上使用它。
如果两者都使用,则实际重试次数是 STCEH 和重试模板重试次数的倍数。
Now that the
SeekToCurrentErrorHandler
can be configured with aBackOff
and has the ability to retry only certain exceptions (since version 2.3), the use of stateful retry, via the listener adapter retry configuration, is no longer necessary. You can provide the same functionality with appropriate configuration of the error handler and remove all retry configuration from the listener adapter. See Seek To Current Container Error Handlers for more information.
配置简单多了。
您不需要使用手动确认;容器将根据 AckMode
BATCH
(默认)或 RECORD
提交偏移量。后者成本更高,但重新交付的机会更少。
要无限重试,请在 maxAttempts
属性.
FixedBackOff
和 UNLIMITED_ATTEMPTS (Long.MAX_VALUE
)
默认情况下ExponentialBackOff
会无限重试。您只需要确保 maxInterval
小于 max.poll.interval.ms
即可避免重新平衡。