Spring kafka setErrorHandler 已弃用替换(启动 2.6.4)

Spring kafka setErrorHandler deprecated replacement (boot 2.6.4)

在 spring 引导 2.6.4 上,此方法已弃用。

public ConcurrentKafkaListenerContainerFactory<Object, Object> kafkaListenerContainerFactory(
            ConcurrentKafkaListenerContainerFactoryConfigurer configurer) {
        var factory = new ConcurrentKafkaListenerContainerFactory<Object, Object>();
        configurer.configure(factory, consumerFactory());

        // deprecated
        factory.setErrorHandler(new GlobalErrorHandler());

        return factory;
    }

全局错误处理程序class

public class GlobalErrorHandler implements ConsumerAwareErrorHandler {

    private static final Logger log = LoggerFactory.getLogger(GlobalErrorHandler.class);

    @Override
    public void handle(Exception thrownException, ConsumerRecord<?, ?> data, Consumer<?, ?> consumer) {
        // my custom global logic (e.g. notify ops team via slack)
    }

}

这个的替换样本是什么?文档说我应该使用 setCommonErrorHandler,但是如何实现 CommonErrorHandler 接口,因为那里没有方法可以被覆盖。

要点是,我必须根据特定条件(消息 tpye,在 kafka 消息头上可用)向 ops 团队发送松弛通知

这不是阻塞,只是一条烦人的已弃用消息。 谢谢

请参阅 Spring Apache Kafka 文档;遗留错误处理程序被替换为 CommonErrorHandler 实现。

有什么新消息?

https://docs.spring.io/spring-kafka/docs/current/reference/html/#x28-eh

The legacy GenericErrorHandler and its sub-interface hierarchies for record an batch listeners have been replaced by a new single interface CommonErrorHandler with implementations corresponding to most legacy implementations of GenericErrorHandler. See Container Error Handlers for more information.

容器错误处理程序

https://docs.spring.io/spring-kafka/docs/current/reference/html/#error-handlers

Starting with version 2.8, the legacy ErrorHandler and BatchErrorHandler interfaces have been superseded by a new CommonErrorHandler. These error handlers can handle errors for both record and batch listeners, allowing a single listener container factory to create containers for both types of listener. CommonErrorHandler implementations to replace most legacy framework error handler implementations are provided and the legacy error handlers deprecated. The legacy interfaces are still supported by listener containers and listener container factories; they will be deprecated in a future release.

我遇到了完全相同的问题,所以我将方法实现 ConsumerAwareErrorHandler 更改为

CommonErrorHandler

并实施

handleRecord

就像文档中描述的那样,它有效!

public class GlobalErrorHandler implements CommonErrorHandler {

  private static final Logger log = LoggerFactory.getLogger(GlobalErrorHandler.class);

  @Override
  public void handleRecord(
      Exception thrownException,
      ConsumerRecord<?, ?> record,
      Consumer<?, ?> consumer,
      MessageListenerContainer container) {
    log.warn("Global error handler for message: {}", record.value().toString());
  }
}

在KafkaConfig.class

  @Bean(value = "kafkaListenerContainerFactory")
  public ConcurrentKafkaListenerContainerFactory<Object, Object> kafkaListenerContainerFactory(
      ConcurrentKafkaListenerContainerFactoryConfigurer configurer) {
    var factory = new ConcurrentKafkaListenerContainerFactory<>();
    configurer.configure(factory, consumerFactory());

    factory.setCommonErrorHandler(new GlobalErrorHandler());

    return factory;
  }