如何在 KafkaListenerContainerFactory 创建的 KafkaListener class 中添加 Micrometer Timer?
How to add Micrometer Timer in KafkaListener class, created by KafkaListenerContainerFactory?
当我尝试将 MicrometerRegistry 自动连接到包含 KafkaListener 的 class 时,出现以下错误 -
org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'kafkaConsumer' defined in file [/apps/flux-mariadb-pipeline/build/classes/java/main/com/processor/consumer/KafkaConsumer.class]: Bean instantiation via constructor failed; nested exception is org.springframework.beans.BeanInstantiationException: Failed to instantiate [com.processor.consumer.KafkaConsumer]: Constructor threw exception; nested exception is java.lang.NullPointerException
at org.springframework.beans.factory.support.ConstructorResolver.instantiate(ConstructorResolver.java:313)
at org.springframework.beans.factory.support.ConstructorResolver.autowireConstructor(ConstructorResolver.java:294)
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.autowireConstructor(AbstractAutowireCapableBeanFactory.java:1358)
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.createBeanInstance(AbstractAutowireCapableBeanFactory.java:1204)
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.doCreateBean(AbstractAutowireCapableBeanFactory.java:557)
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.createBean(AbstractAutowireCapableBeanFactory.java:517)
at org.springframework.beans.factory.support.AbstractBeanFactory.lambda$doGetBean[=11=](AbstractBeanFactory.java:323)
at org.springframework.beans.factory.support.DefaultSingletonBeanRegistry.getSingleton(DefaultSingletonBeanRegistry.java:226)
at org.springframework.beans.factory.support.AbstractBeanFactory.doGetBean(AbstractBeanFactory.java:321)
at org.springframework.beans.factory.support.AbstractBeanFactory.getBean(AbstractBeanFactory.java:202)
at org.springframework.beans.factory.support.DefaultListableBeanFactory.preInstantiateSingletons(DefaultListableBeanFactory.java:893)
at org.springframework.context.support.AbstractApplicationContext.finishBeanFactoryInitialization(AbstractApplicationContext.java:879)
at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:551)
at org.springframework.boot.web.servlet.context.ServletWebServerApplicationContext.refresh(ServletWebServerApplicationContext.java:143)
at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:758)
at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:750)
at org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:397)
at org.springframework.boot.SpringApplication.run(SpringApplication.java:315)
at org.springframework.boot.SpringApplication.run(SpringApplication.java:1237)
at org.springframework.boot.SpringApplication.run(SpringApplication.java:1226)
at com.processor.App.main(App.java:10)
Caused by: org.springframework.beans.BeanInstantiationException: Failed to instantiate [com.processor.consumer.KafkaConsumer]: Constructor threw exception; nested exception is java.lang.NullPointerException
at org.springframework.beans.BeanUtils.instantiateClass(BeanUtils.java:217)
at org.springframework.beans.factory.support.SimpleInstantiationStrategy.instantiate(SimpleInstantiationStrategy.java:117)
at org.springframework.beans.factory.support.ConstructorResolver.instantiate(ConstructorResolver.java:309)
... 20 common frames omitted
Caused by: java.lang.NullPointerException: null
at java.util.Objects.requireNonNull(Objects.java:203)
at io.micrometer.core.instrument.ImmutableTag.<init>(ImmutableTag.java:35)
at io.micrometer.core.instrument.Tag.of(Tag.java:29)
at io.micrometer.core.instrument.Tags.and(Tags.java:74)
at io.micrometer.core.instrument.Timer$Builder.tag(Timer.java:364)
at com.processor.consumer.KafkaConsumer.<init>(KafkaConsumer.java:47)
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
at org.springframework.beans.BeanUtils.instantiateClass(BeanUtils.java:204)
... 22 common frames omitted
卡夫卡消费者class-
public KafkaConsumer(MeterRegistry meterRegistry) {
eventTimer =
Timer.builder("travel.time")
.description("The time it takes for the event to travel.")
.tag("topic", topic)
.publishPercentiles(.30, .65, .99)
.publishPercentileHistogram()
.minimumExpectedValue(Duration.ofMillis(1))
.maximumExpectedValue(Duration.ofMillis(5000))
.register(meterRegistry);
}
@KafkaListener(
topics = "${spring.kafka.consumer.properties.topic}",
groupId = "${spring.kafka.consumer.group-id}")
public void consume(
ConsumerRecord<String, DataRecord> record,
@Header(KafkaHeaders.RECEIVED_PARTITION_ID) Integer partition,
@Header(KafkaHeaders.OFFSET) Long offset,
Acknowledgment ack)
throws IOException {
.
.
}
我正在使用如下工厂创建 KafkaListener 容器 -
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>>
kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setConcurrency(8);
factory.getContainerProperties().setAckMode(AckMode.MANUAL_IMMEDIATE);
factory.setErrorHandler(getErrorHandler());
return factory;
}
我发现添加千分尺注册表的唯一方法如下
@Bean public ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> props = kafkaProperties.buildConsumerProperties();
DefaultKafkaConsumerFactory<String, String> cf = new DefaultKafkaConsumerFactory<>(props);
cf.addListener(new MicrometerConsumerListener<>(meterRegistry(),
Collections.singletonList(new ImmutableTag("customTag", "customTagValue"))));
return cf;
}
现在,我不知道如何在@KafkaListener 方法中访问这个注册表
at io.micrometer.core.instrument.ImmutableTag.(ImmutableTag.java:35)
.tag("topic", topic)
topic
变量是 null
。
如果有@Value
注解,则不能在构造函数中使用;改为在 @PostConstruct
方法中创建仪表。
当我尝试将 MicrometerRegistry 自动连接到包含 KafkaListener 的 class 时,出现以下错误 -
org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'kafkaConsumer' defined in file [/apps/flux-mariadb-pipeline/build/classes/java/main/com/processor/consumer/KafkaConsumer.class]: Bean instantiation via constructor failed; nested exception is org.springframework.beans.BeanInstantiationException: Failed to instantiate [com.processor.consumer.KafkaConsumer]: Constructor threw exception; nested exception is java.lang.NullPointerException
at org.springframework.beans.factory.support.ConstructorResolver.instantiate(ConstructorResolver.java:313)
at org.springframework.beans.factory.support.ConstructorResolver.autowireConstructor(ConstructorResolver.java:294)
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.autowireConstructor(AbstractAutowireCapableBeanFactory.java:1358)
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.createBeanInstance(AbstractAutowireCapableBeanFactory.java:1204)
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.doCreateBean(AbstractAutowireCapableBeanFactory.java:557)
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.createBean(AbstractAutowireCapableBeanFactory.java:517)
at org.springframework.beans.factory.support.AbstractBeanFactory.lambda$doGetBean[=11=](AbstractBeanFactory.java:323)
at org.springframework.beans.factory.support.DefaultSingletonBeanRegistry.getSingleton(DefaultSingletonBeanRegistry.java:226)
at org.springframework.beans.factory.support.AbstractBeanFactory.doGetBean(AbstractBeanFactory.java:321)
at org.springframework.beans.factory.support.AbstractBeanFactory.getBean(AbstractBeanFactory.java:202)
at org.springframework.beans.factory.support.DefaultListableBeanFactory.preInstantiateSingletons(DefaultListableBeanFactory.java:893)
at org.springframework.context.support.AbstractApplicationContext.finishBeanFactoryInitialization(AbstractApplicationContext.java:879)
at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:551)
at org.springframework.boot.web.servlet.context.ServletWebServerApplicationContext.refresh(ServletWebServerApplicationContext.java:143)
at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:758)
at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:750)
at org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:397)
at org.springframework.boot.SpringApplication.run(SpringApplication.java:315)
at org.springframework.boot.SpringApplication.run(SpringApplication.java:1237)
at org.springframework.boot.SpringApplication.run(SpringApplication.java:1226)
at com.processor.App.main(App.java:10)
Caused by: org.springframework.beans.BeanInstantiationException: Failed to instantiate [com.processor.consumer.KafkaConsumer]: Constructor threw exception; nested exception is java.lang.NullPointerException
at org.springframework.beans.BeanUtils.instantiateClass(BeanUtils.java:217)
at org.springframework.beans.factory.support.SimpleInstantiationStrategy.instantiate(SimpleInstantiationStrategy.java:117)
at org.springframework.beans.factory.support.ConstructorResolver.instantiate(ConstructorResolver.java:309)
... 20 common frames omitted
Caused by: java.lang.NullPointerException: null
at java.util.Objects.requireNonNull(Objects.java:203)
at io.micrometer.core.instrument.ImmutableTag.<init>(ImmutableTag.java:35)
at io.micrometer.core.instrument.Tag.of(Tag.java:29)
at io.micrometer.core.instrument.Tags.and(Tags.java:74)
at io.micrometer.core.instrument.Timer$Builder.tag(Timer.java:364)
at com.processor.consumer.KafkaConsumer.<init>(KafkaConsumer.java:47)
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
at org.springframework.beans.BeanUtils.instantiateClass(BeanUtils.java:204)
... 22 common frames omitted
卡夫卡消费者class-
public KafkaConsumer(MeterRegistry meterRegistry) {
eventTimer =
Timer.builder("travel.time")
.description("The time it takes for the event to travel.")
.tag("topic", topic)
.publishPercentiles(.30, .65, .99)
.publishPercentileHistogram()
.minimumExpectedValue(Duration.ofMillis(1))
.maximumExpectedValue(Duration.ofMillis(5000))
.register(meterRegistry);
}
@KafkaListener(
topics = "${spring.kafka.consumer.properties.topic}",
groupId = "${spring.kafka.consumer.group-id}")
public void consume(
ConsumerRecord<String, DataRecord> record,
@Header(KafkaHeaders.RECEIVED_PARTITION_ID) Integer partition,
@Header(KafkaHeaders.OFFSET) Long offset,
Acknowledgment ack)
throws IOException {
.
.
}
我正在使用如下工厂创建 KafkaListener 容器 -
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>>
kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setConcurrency(8);
factory.getContainerProperties().setAckMode(AckMode.MANUAL_IMMEDIATE);
factory.setErrorHandler(getErrorHandler());
return factory;
}
我发现添加千分尺注册表的唯一方法如下
@Bean public ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> props = kafkaProperties.buildConsumerProperties();
DefaultKafkaConsumerFactory<String, String> cf = new DefaultKafkaConsumerFactory<>(props);
cf.addListener(new MicrometerConsumerListener<>(meterRegistry(),
Collections.singletonList(new ImmutableTag("customTag", "customTagValue"))));
return cf;
}
现在,我不知道如何在@KafkaListener 方法中访问这个注册表
at io.micrometer.core.instrument.ImmutableTag.(ImmutableTag.java:35)
.tag("topic", topic)
topic
变量是 null
。
如果有@Value
注解,则不能在构造函数中使用;改为在 @PostConstruct
方法中创建仪表。