Spring-带有 Micrometer 的 Kafka 在应用程序关闭期间抛出 NullPointerException

Spring-Kafka with Micrometer throws NullPointerException during app shutdown

我们有 spring-boot 个具有多个节点的应用程序,并使用 KafkaTemplate 来生成 kafka 消息。在 class 路径上,我们有 org.springframework.kafka:spring-kafkaio.micrometer:micrometer-core。 在我们的例子中,消息连续产生,每个主题每秒大约几千条(具有多个分区的主题)。 在应用正常关闭期间(无论是在重新部署期间还是在自动缩减规模期间),我们经常遇到 NPE 错误:

[kafka-producer-network-thread | producer-1] ERROR org.apache.kafka.clients.producer.internals.ProducerBatch - Error executing user-provided callback on message for topic-partition 'xxx-1'
java.lang.NullPointerException
    at io.micrometer.core.instrument.Timer$Sample.stop(Timer.java:280)
    at org.springframework.kafka.support.micrometer.MicrometerHolder.success(MicrometerHolder.java:93)
    at org.springframework.kafka.core.KafkaTemplate.lambda$buildCallback(KafkaTemplate.java:587)
    at org.springframework.kafka.core.DefaultKafkaProducerFactory$CloseSafeProducer.onCompletion(DefaultKafkaProducerFactory.java:764)
    at org.apache.kafka.clients.producer.KafkaProducer$InterceptorCallback.onCompletion(KafkaProducer.java:1348)
    at org.apache.kafka.clients.producer.internals.ProducerBatch.completeFutureAndFireCallbacks(ProducerBatch.java:227)
    at org.apache.kafka.clients.producer.internals.ProducerBatch.done(ProducerBatch.java:196)
    at org.apache.kafka.clients.producer.internals.Sender.completeBatch(Sender.java:707)
    at org.apache.kafka.clients.producer.internals.Sender.completeBatch(Sender.java:688)
    at org.apache.kafka.clients.producer.internals.Sender.handleProduceResponse(Sender.java:596)
    at org.apache.kafka.clients.producer.internals.Sender.access0(Sender.java:74)
    at org.apache.kafka.clients.producer.internals.Sender.onComplete(Sender.java:798)
    at org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:109)
    at org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:569)
    at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:561)
    at org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:335)
    at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:244)
    at java.base/java.lang.Thread.run(Thread.java:834)

我检查了 class org.springframework.kafka.support.micrometer.MicrometerHolder 的实现,发现它只有在 destroy 方法早于 success 调用时才会发生。所以看起来这里我们有一个竞争条件。我想 success 方法实现应该以某种方式处理 destroy 方法已经被调用的情况。 是 spring-kafka 的错误吗?

spring-kafka 版本 2.5.1; micrometer-core 版本 1.5.1.

如果是版本 2.5.1 中的错误 Spring-Kafka with Micrometer throws NullPointerException during app shutdown #1531。由于 spring-kafka 版本 2.6.0 问题已修复。

同时,如果您不需要 KafkaTemplate 的千分尺指标,我们可以禁用它:

kafkaTemplate.setMicrometerEnabled(false);