Kafka 消费者无效负载错误处理程序

Kafka Consumer Invalid Payload Error Handler

我有以下配置。当消息无效时,我想发送一封电子邮件,对于错误,我想将其保存在数据库中。我如何在 errorHandler() 中处理这个问题?

    @Configuration
    @EnableKafka
    public class KafkaConsumerConfig implements KafkaListenerConfigurer{
    @Bean
        ErrorHandler errorHandler() {
            return new SeekToCurrentErrorHandler((rec, ex) ->
            
            { dbService.saveErrorMsg(rec); }
          ,new FixedBackOff(5000, 3)) ;
            
        }
        
      
        @Override
        public void configureKafkaListeners(KafkaListenerEndpointRegistrar registrar) {
              registrar.setValidator(this.validator);
            
        }

@KafkaListener(topics = "mytopic", concurrency = "3", groupId = "mytopic-1-groupid")
    public void consumeFromTopic1(@Payload @Valid ValidatedClass val, ConsumerRecordMetadata meta) throws Exception
    {

        dbservice.callDB(val,"t");
    }

我猜你的 EMAI 代码在 dbService.saveErrorMsg.

Spring 引导应自动检测 ErrorHandler @Bean 并将其连接到容器工厂。

参见 Boot 的 KafkaAnnotationDrivenConfiguration class 和 ConcurrentKafkaListenerContainerFactoryConfigurer