Micrometer 在创建第二个 kafka-consumer 时抛出异常

Micrometer throws exception when creating a second kafka-consumer

升级到spring-boot 2.3.0 时出现异常。异常如下:

java.lang.IllegalArgumentException: Prometheus requires that all meters with the same name have the same set of tag keys. There is already an existing meter named 'kafka_consumer_fetch_manager_records_consumed_total' containing tag keys [client_id, kafka_version, product, spring_id, topic]. The meter you are attempting to register has keys [client_id, kafka_version, product, spring_id].
    at io.micrometer.prometheus.PrometheusMeterRegistry.lambda$applyToCollector(PrometheusMeterRegistry.java:429)
    at java.base/java.util.concurrent.ConcurrentHashMap.compute(ConcurrentHashMap.java:1932)
    at io.micrometer.prometheus.PrometheusMeterRegistry.applyToCollector(PrometheusMeterRegistry.java:413)
    at io.micrometer.prometheus.PrometheusMeterRegistry.newFunctionCounter(PrometheusMeterRegistry.java:247)
    at io.micrometer.core.instrument.MeterRegistry$More.lambda$counter(MeterRegistry.java:884)
    at io.micrometer.core.instrument.MeterRegistry.lambda$registerMeterIfNecessary(MeterRegistry.java:559)
    at io.micrometer.core.instrument.MeterRegistry.getOrCreateMeter(MeterRegistry.java:612)
    at io.micrometer.core.instrument.MeterRegistry.registerMeterIfNecessary(MeterRegistry.java:566)
    at io.micrometer.core.instrument.MeterRegistry.registerMeterIfNecessary(MeterRegistry.java:559)
    at io.micrometer.core.instrument.MeterRegistry.access0(MeterRegistry.java:76)
    at io.micrometer.core.instrument.MeterRegistry$More.counter(MeterRegistry.java:884)
    at io.micrometer.core.instrument.FunctionCounter$Builder.register(FunctionCounter.java:122)
    at io.micrometer.core.instrument.binder.kafka.KafkaMetrics.registerCounter(KafkaMetrics.java:189)
    at io.micrometer.core.instrument.binder.kafka.KafkaMetrics.bindMeter(KafkaMetrics.java:174)
    at io.micrometer.core.instrument.binder.kafka.KafkaMetrics.lambda$checkAndBindMetrics(KafkaMetrics.java:161)
    at java.base/java.util.concurrent.ConcurrentHashMap.forEach(ConcurrentHashMap.java:1603)
    at java.base/java.util.Collections$UnmodifiableMap.forEach(Collections.java:1505)
    at io.micrometer.core.instrument.binder.kafka.KafkaMetrics.checkAndBindMetrics(KafkaMetrics.java:137)
    at io.micrometer.core.instrument.binder.kafka.KafkaMetrics.bindTo(KafkaMetrics.java:93)
    at io.micrometer.core.instrument.binder.kafka.KafkaClientMetrics.bindTo(KafkaClientMetrics.java:39)
    at org.springframework.kafka.core.MicrometerConsumerListener.consumerAdded(MicrometerConsumerListener.java:74)
    at org.springframework.kafka.core.DefaultKafkaConsumerFactory.createKafkaConsumer(DefaultKafkaConsumerFactory.java:301)
    at org.springframework.kafka.core.DefaultKafkaConsumerFactory.createKafkaConsumer(DefaultKafkaConsumerFactory.java:242)
    at org.springframework.kafka.core.DefaultKafkaConsumerFactory.createConsumer(DefaultKafkaConsumerFactory.java:212)
    at org.springframework.kafka.core.ConsumerFactory.createConsumer(ConsumerFactory.java:67)
    at org.springframework.kafka.core.ConsumerFactory.createConsumer(ConsumerFactory.java:54)
    at org.springframework.kafka.core.ConsumerFactory.createConsumer(ConsumerFactory.java:43)

当我尝试通过 ConsumerFactory.createConsumer 创建消费者时出现此异常。

应用中还有一个consumer是使用spring-kafka创建的,方法注解为@KafkaListener(topics = TOPICS, groupId = GROUP_ID).

io.micrometer.core.instrument.binder.kafka.KafkaMetrics第146-147行,我读到

//Kafka has metrics with lower number of tags (e.g. with/without topic or partition tag)
//Remove meters with lower number of tags

这意味着新指标将被丢弃,因为它缺少 topic-tag。

是否有任何原因可以解释为什么创建消费者的方式不同会导致标签出现偏差?如果是这样,是否可以将 topic 标签附加到通过 ConsumerFactory.createConsumer 创建的指标?

经过一些调试,我们发现:

我会再四处看看,但似乎当消费者启动时(@KafkaListener)它也会添加一些metics及其分配给的主题? 到目前为止只是一个假设。

另一个堆栈较少的示例 - 计划任务在 KafkaMetrics.bindTo -> scheduler.scheduleAtFixedRate(() -> checkAndBindMetrics(registry), getRefreshIntervalInMillis(), getRefreshIntervalInMillis(), TimeUnit.MILLISECONDS);

开始时似乎注册了主题