如何在不导致组重新平衡的情况下为 Kafka 消费者实现始终重试 policy/Custom 重试策略
How to achieve Always retry policy/Custom Retry Policy for a Kafka Consumer without causing a rebalance in the group
我正在尝试编写一个有弹性的 Kafka 消费者。如果在 listener 方法中处理消息时发生异常,我想重试。我想为某些异常重试几次,总是为某些异常重试,而从不为其余异常重试几次。我已经阅读了关于 SeekToCurrentErrorHandler 的 Spring 文档,但不是 100% 确定如何实现它。
我已经对 ExceptionClassifierRetryPolicy 进行了子class,并且正在根据 Listener 方法中发生的异常返回适当的重试策略。
我已经创建了 RetryTemplate 并在子 class.
中使用我的自定义实现设置了它的 RetryPolicy
我已经在 Kafka 容器上设置了 retryTemplate。我已将错误处理程序设置为新的 SeekToCurrentHandler,并将状态重试 属性 设置为 true。
侦听器方法
@KafkaListener(topics = "topicName", containerFactory = "containerFactory")
public void listenToKafkaTopic(@Payload Message<SomeAvroGeneratedClass> message, Acknowledgement ack){
SomeAvroGeneratedClass obj = message.getPayLoad();
processIncomingMessage(obj);
ack.acknowledge();
}
自定义重试策略class
@Component
public class MyRetryPolicy extends ExceptionClassifierRetryPolicy
{
@PostConstruct
public void init(){
final SimpleRetryPolicy simpleRetryPolicy = new SimpleRetryPolicy();
simpleRetryPolicy.setMaxAttempts(8);
this.setExceptionClassifier( classifiable ->
{
// Always Retry when instanceOf TransientDataAccessException
if( classifiable.getCause() instanceof TransientDataAccessException)
{
return new AlwaysRetryPolicy();
}
else if(classifiable.getCause() instanceOf NonTransientDataAccessException)
{
return new NeverRetryPolicy();
}
else
{
return simpleRetryPolicy;
}
} );
}}
重试模板和容器配置
@Configuration
public class RetryConfig{
@Bean
public RetryTemplate retryTemplate(@Autowired ConcurrentKafkaListenerContainerFactory factory){
RetryTemplate retryTemplate = new RetryTemplate();
retryTemplate.setRetryPolicy(new MyRetryPolicy());
FixedBackOffPolicy fixedBackOffPolicy = new FixedBackOffPolicy()
fixedBackOffPolicy.setBackOffPeriod(2000l);
retryTemplate.setBackOffPolicy(fixedBackOffPolicy);
factory.setRetryTemplate(retryTemplate);
factory.setAckOnError(false);
factory.setErrorHandler(new SeekToCurrentErrorHandler());
factory.setStateFulRetry(true);
factory.setRecoveryCallback(//configure recovery after retries are exhausted and commit offset
);
}
}
侦听器属性:
- AckMode = 手动
- auto.offset.commit = 假
问题:
使用我当前的代码,我能否实现在 MyRetryPolicy 中定义的重试逻辑,而不会在返回 AlwaysRetryPolicy 时导致消费者重新平衡?如果不是,请指导我正确的路径。
我的错误处理程序和重试方法是否正确?
有状态重试与 SeekToCurrentErrorHandler
相结合是正确的方法。
但是,如果您使用的是最新版本(2.2 或更高版本),错误处理程序将在尝试 10 次后放弃;您可以通过将 maxFailures
设置为 -1 (2.2) 或将 backOff
设置为 Long.MAX_VALUE
(2.3 或更高版本)来实现无穷大。
我正在尝试编写一个有弹性的 Kafka 消费者。如果在 listener 方法中处理消息时发生异常,我想重试。我想为某些异常重试几次,总是为某些异常重试,而从不为其余异常重试几次。我已经阅读了关于 SeekToCurrentErrorHandler 的 Spring 文档,但不是 100% 确定如何实现它。
我已经对 ExceptionClassifierRetryPolicy 进行了子class,并且正在根据 Listener 方法中发生的异常返回适当的重试策略。
我已经创建了 RetryTemplate 并在子 class.
中使用我的自定义实现设置了它的 RetryPolicy我已经在 Kafka 容器上设置了 retryTemplate。我已将错误处理程序设置为新的 SeekToCurrentHandler,并将状态重试 属性 设置为 true。
侦听器方法
@KafkaListener(topics = "topicName", containerFactory = "containerFactory")
public void listenToKafkaTopic(@Payload Message<SomeAvroGeneratedClass> message, Acknowledgement ack){
SomeAvroGeneratedClass obj = message.getPayLoad();
processIncomingMessage(obj);
ack.acknowledge();
}
自定义重试策略class
@Component
public class MyRetryPolicy extends ExceptionClassifierRetryPolicy
{
@PostConstruct
public void init(){
final SimpleRetryPolicy simpleRetryPolicy = new SimpleRetryPolicy();
simpleRetryPolicy.setMaxAttempts(8);
this.setExceptionClassifier( classifiable ->
{
// Always Retry when instanceOf TransientDataAccessException
if( classifiable.getCause() instanceof TransientDataAccessException)
{
return new AlwaysRetryPolicy();
}
else if(classifiable.getCause() instanceOf NonTransientDataAccessException)
{
return new NeverRetryPolicy();
}
else
{
return simpleRetryPolicy;
}
} );
}}
重试模板和容器配置
@Configuration
public class RetryConfig{
@Bean
public RetryTemplate retryTemplate(@Autowired ConcurrentKafkaListenerContainerFactory factory){
RetryTemplate retryTemplate = new RetryTemplate();
retryTemplate.setRetryPolicy(new MyRetryPolicy());
FixedBackOffPolicy fixedBackOffPolicy = new FixedBackOffPolicy()
fixedBackOffPolicy.setBackOffPeriod(2000l);
retryTemplate.setBackOffPolicy(fixedBackOffPolicy);
factory.setRetryTemplate(retryTemplate);
factory.setAckOnError(false);
factory.setErrorHandler(new SeekToCurrentErrorHandler());
factory.setStateFulRetry(true);
factory.setRecoveryCallback(//configure recovery after retries are exhausted and commit offset
);
}
}
侦听器属性:
- AckMode = 手动
- auto.offset.commit = 假
问题:
使用我当前的代码,我能否实现在 MyRetryPolicy 中定义的重试逻辑,而不会在返回 AlwaysRetryPolicy 时导致消费者重新平衡?如果不是,请指导我正确的路径。
我的错误处理程序和重试方法是否正确?
有状态重试与 SeekToCurrentErrorHandler
相结合是正确的方法。
但是,如果您使用的是最新版本(2.2 或更高版本),错误处理程序将在尝试 10 次后放弃;您可以通过将 maxFailures
设置为 -1 (2.2) 或将 backOff
设置为 Long.MAX_VALUE
(2.3 或更高版本)来实现无穷大。