记录 spring kafka listener 抛出的异常

Log the exceptions thrown in spring kafka listener

我们正在使用 Spring kafka 2.7 非阻塞重试机制。我们想记录在我们的@KafkaListener 方法中使用数据时抛出的错误。

例如:https://github.com/spring-projects/spring-kafka/blob/main/samples/sample-04/src/main/java/com/example/Application.java

在上面的例子中我们可以看到,有一个RuntimeException 被抛出。但是该异常不会被记录,相反我们会得到 Seek to current after exception .....

// our configuration

 @Bean
  public ConcurrentKafkaListenerContainerFactory<String, Object> kafkaListenerContainerFactory() {

    ConcurrentKafkaListenerContainerFactory<String, Object> factory
        = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());
    return factory;
  }


@Bean
  public RetryTopicConfiguration retryTopicConfiguration(KafkaTemplate<String, Object> template) {

    List<Class<? extends Throwable>> throwableList = Arrays.asList(IllegalArgumentException.class,
        IllegalAccessException.class);

    return RetryTopicConfigurationBuilder
        .newInstance()
        .dltHandlerMethod(XYZ.class, "xyz")
        .exponentialBackoff(delayMs, backoffMultiplier, maxIntervalInMs)
        .maxAttempts(retryAttempt)
        .notRetryOn(throwableList)
        .doNotAutoCreateRetryTopics()
        .listenerFactory(kafkaListenerContainerFactory())
        .setTopicSuffixingStrategy(TopicSuffixingStrategy.SUFFIX_WITH_INDEX_VALUE)
        .create(template);
  }

您可以使用 RecordInterceptor.

@Bean
RecordInterceptor<String, String> interceptor(ConcurrentKafkaListenerContainerFactory<String, String> factory) {
    RecordInterceptor<String, String> inter = new RecordInterceptor<String, String>() {

        @Override
        @Nullable
        public ConsumerRecord<String, String> intercept(ConsumerRecord<String, String> record) {
            return record;
        }

        @Override
        public void failure(ConsumerRecord<String, String> record, Exception exception,
                Consumer<String, String> consumer) {

            logger.error("Record failed " + ListenerUtils.recordToString(record, true), exception);
        }

    };
    factory.setRecordInterceptor(inter);
    return inter;
}