如何在不导致组重新平衡的情况下为 Kafka 消费者实现始终重试 policy/Custom 重试策略

How to achieve Always retry policy/Custom Retry Policy for a Kafka Consumer without causing a rebalance in the group

我正在尝试编写一个有弹性的 Kafka 消费者。如果在 listener 方法中处理消息时发生异常,我想重试。我想为某些异常重试几次,总是为某些异常重试,而从不为其余异常重试几次。我已经阅读了关于 SeekToCurrentErrorHandler 的 Spring 文档,但不是 100% 确定如何实现它。

我已经对 ExceptionClassifierRetryPolicy 进行了子class,并且正在根据 Listener 方法中发生的异常返回适当的重试策略。

我已经创建了 RetryTemplate 并在子 class.

中使用我的自定义实现设置了它的 RetryPolicy

我已经在 Kafka 容器上设置了 retryTemplate。我已将错误处理程序设置为新的 SeekToCurrentHandler,并将状态重试 属性 设置为 true。

侦听器方法

@KafkaListener(topics = "topicName", containerFactory = "containerFactory")
   public void listenToKafkaTopic(@Payload Message<SomeAvroGeneratedClass> message, Acknowledgement ack){
      SomeAvroGeneratedClass obj = message.getPayLoad();
      processIncomingMessage(obj);
      ack.acknowledge();
   }

自定义重试策略class

 @Component
 public class MyRetryPolicy extends ExceptionClassifierRetryPolicy
   {
      @PostConstruct
       public void init(){
             final SimpleRetryPolicy simpleRetryPolicy = new SimpleRetryPolicy();
             simpleRetryPolicy.setMaxAttempts(8);

     this.setExceptionClassifier( classifiable ->
           {
        // Always Retry when instanceOf   TransientDataAccessException
       if( classifiable.getCause() instanceof TransientDataAccessException)
            {
               return new AlwaysRetryPolicy();                                         

            }
      else if(classifiable.getCause() instanceOf NonTransientDataAccessException)
           {
              return new NeverRetryPolicy();
           }
          else
            {
            return simpleRetryPolicy;
            }

      } );
 }}

重试模板和容器配置

@Configuration
public class RetryConfig{


 @Bean
 public RetryTemplate retryTemplate(@Autowired ConcurrentKafkaListenerContainerFactory factory){

   RetryTemplate retryTemplate = new RetryTemplate();
   retryTemplate.setRetryPolicy(new MyRetryPolicy());
   FixedBackOffPolicy fixedBackOffPolicy = new FixedBackOffPolicy()
   fixedBackOffPolicy.setBackOffPeriod(2000l);
   retryTemplate.setBackOffPolicy(fixedBackOffPolicy);

   factory.setRetryTemplate(retryTemplate);
   factory.setAckOnError(false);
   factory.setErrorHandler(new SeekToCurrentErrorHandler());
   factory.setStateFulRetry(true);
   factory.setRecoveryCallback(//configure recovery after retries are exhausted and commit offset
   );

  }
}

侦听器属性:

  1. AckMode = 手动
  2. auto.offset.commit = 假

问题:

  1. 使用我当前的代码,我能否实现在 MyRetryPolicy 中定义的重试逻辑,而不会在返回 AlwaysRetryPolicy 时导致消费者重新平衡?如果不是,请指导我正确的路径。

  2. 我的错误处理程序和重试方法是否正确?

有状态重试与 SeekToCurrentErrorHandler 相结合是正确的方法。

但是,如果您使用的是最新版本(2.2 或更高版本),错误处理程序将在尝试 10 次后放弃;您可以通过将 maxFailures 设置为 -1 (2.2) 或将 backOff 设置为 Long.MAX_VALUE (2.3 或更高版本)来实现无穷大。