需要 Spring kafka 自定义日志记录
Need Spring kafka custom logging
我有一个标准的 spring kafka 设置如下,
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> feedbackStreamListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
@KafkaListener(topics = ("my-topic"),groupId = ("groupId"),containerFactory = "listenerContainerFactory")
public void myListener(@Payload String message) {
System.out.println("Received Message : " + message);
// do some heavy processing
}
现在我需要在 3 个场景中进行自定义日志记录(从我的应用程序生成日志),
- 当我启动消费者时,假设 kafka 集群已关闭或者我提供了错误的 bootstrap 服务器,我需要自定义记录它。
- 当消费者成功启动时,就在轮询之前提供自定义日志。
- 当有自定义消息转换器时,如果有反序列化问题,自定义记录它。
参见 KafkaEvent
实现:https://docs.spring.io/spring-kafka/docs/current/reference/html/#events。
因此,当您在 @EventListener
中捕获 ConsumerFailedToStartEvent
时,您可以使用 ConsumerStartedEvent
.
记录您想要的任何内容等等
甚至没有反序列化问题,但您绝对可以使用 GenericErrorHandler
来记录来自消费者的错误:https://docs.spring.io/spring-kafka/docs/current/reference/html/#annotation-error-handling
我有一个标准的 spring kafka 设置如下,
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> feedbackStreamListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
@KafkaListener(topics = ("my-topic"),groupId = ("groupId"),containerFactory = "listenerContainerFactory")
public void myListener(@Payload String message) {
System.out.println("Received Message : " + message);
// do some heavy processing
}
现在我需要在 3 个场景中进行自定义日志记录(从我的应用程序生成日志),
- 当我启动消费者时,假设 kafka 集群已关闭或者我提供了错误的 bootstrap 服务器,我需要自定义记录它。
- 当消费者成功启动时,就在轮询之前提供自定义日志。
- 当有自定义消息转换器时,如果有反序列化问题,自定义记录它。
参见 KafkaEvent
实现:https://docs.spring.io/spring-kafka/docs/current/reference/html/#events。
因此,当您在 @EventListener
中捕获 ConsumerFailedToStartEvent
时,您可以使用 ConsumerStartedEvent
.
甚至没有反序列化问题,但您绝对可以使用 GenericErrorHandler
来记录来自消费者的错误:https://docs.spring.io/spring-kafka/docs/current/reference/html/#annotation-error-handling