需要 Spring kafka 自定义日志记录

Need Spring kafka custom logging

我有一个标准的 spring kafka 设置如下,

@Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> feedbackStreamListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        return factory;
    }
@KafkaListener(topics = ("my-topic"),groupId = ("groupId"),containerFactory = "listenerContainerFactory")
    public void myListener(@Payload String message) {
        System.out.println("Received Message : " + message);
        // do some heavy processing
    }

现在我需要在 3 个场景中进行自定义日志记录(从我的应用程序生成日志),

  1. 当我启动消费者时,假设 kafka 集群已关闭或者我提供了错误的 bootstrap 服务器,我需要自定义记录它。
  2. 当消费者成功启动时,就在轮询之前提供自定义日志。
  3. 当有自定义消息转换器时,如果有反序列化问题,自定义记录它。

参见 KafkaEvent 实现:https://docs.spring.io/spring-kafka/docs/current/reference/html/#events

因此,当您在 @EventListener 中捕获 ConsumerFailedToStartEvent 时,您可以使用 ConsumerStartedEvent.

记录您想要的任何内容等等

甚至没有反序列化问题,但您绝对可以使用 GenericErrorHandler 来记录来自消费者的错误:https://docs.spring.io/spring-kafka/docs/current/reference/html/#annotation-error-handling