Spring-Cloud-Stream-Kafka 自定义健康检查不工作

Spring-Cloud-Stream-Kafka Custom Health check not working

我在 spring-boot(consumer) 中使用 spring-cloud-stream-kafka application.The 应用程序的运行状况不准确,'UP' 即使该应用程序无法连接到 Kafka(Kafka 代理已关闭)。我读过关于卡夫卡健康检查的文章。 spring 执行器健康检查中似乎禁用了 kafka 健康检查。

因此,我设法编写了以下代码来为我的应用程序启用 kafka 健康检查。我想,我缺少应用程序配置和我的代码之间的一些联系,而且我没有看到 Kafka 运行状况正常。

(1) 我正在创建一个自定义健康指标 bean,如下所示:

      import java.util.HashMap;
      import java.util.Map;

      import org.apache.kafka.clients.consumer.ConsumerConfig;
      import org.apache.kafka.common.serialization.ByteArrayDeserializer;
      import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
      import org.springframework.cloud.stream.binder.kafka.KafkaBinderHealthIndicator;
      import org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder;
      import org.springframework.cloud.stream.binder.kafka.properties.KafkaBinderConfigurationProperties;
      import org.springframework.context.annotation.Bean;
      import org.springframework.context.annotation.Configuration;
      import org.springframework.kafka.core.ConsumerFactory;
      import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
      import org.springframework.util.ObjectUtils;

      @Configuration
      @ConditionalOnClass(name = "org.springframework.boot.actuate.health.HealthIndicator")
      public class KafkaBinderHealthIndicatorConfiguration {

        @Bean
        KafkaBinderHealthIndicator healthIndicator(KafkaMessageChannelBinder kafkaMessageChannelBinder,
            KafkaBinderConfigurationProperties configurationProperties) {
          Map<String, Object> props = new HashMap<>();
          props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
          props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
          Map<String, Object> mergedConfig = configurationProperties.getConsumerConfiguration();
          if (!ObjectUtils.isEmpty(mergedConfig)) {
            props.putAll(mergedConfig);
          }
          if (!props.containsKey(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG)) {
            props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, configurationProperties.getKafkaConnectionString());
          }
          ConsumerFactory<?, ?> consumerFactory = new DefaultKafkaConsumerFactory<>(props);
          KafkaBinderHealthIndicator indicator = new KafkaBinderHealthIndicator(kafkaMessageChannelBinder, consumerFactory);
          indicator.setTimeout(configurationProperties.getHealthTimeout());
          return indicator;
        }
      }

(2) 创建活页夹配置:

          import java.io.IOException;

          import org.springframework.beans.factory.annotation.Autowired;
          import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
          import org.springframework.boot.autoconfigure.context.PropertyPlaceholderAutoConfiguration;
          import org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration;
          import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
          import org.springframework.boot.context.properties.EnableConfigurationProperties;
          import org.springframework.cloud.stream.binder.Binder;
          import org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder;
          import org.springframework.cloud.stream.binder.kafka.admin.Kafka10AdminUtilsOperation;
          import org.springframework.cloud.stream.binder.kafka.properties.KafkaBinderConfigurationProperties;
          import org.springframework.cloud.stream.binder.kafka.properties.KafkaExtendedBindingProperties;
          import org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner;
          import org.springframework.context.annotation.Bean;
          import org.springframework.context.annotation.Configuration;
          import org.springframework.context.annotation.Import;
          import org.springframework.kafka.security.jaas.KafkaJaasLoginModuleInitializer;

          @Configuration
          @ConditionalOnMissingBean(Binder.class)
          @Import({ KafkaAutoConfiguration.class, PropertyPlaceholderAutoConfiguration.class,
              KafkaBinderHealthIndicatorConfiguration.class })
          @EnableConfigurationProperties({ KafkaExtendedBindingProperties.class })
          public class KafkaBinderConfiguration {

            @Autowired
            private KafkaExtendedBindingProperties kafkaExtendedBindingProperties;

          //  @Autowired
          //  private ProducerListener               producerListener;

            @Bean
            KafkaBinderConfigurationProperties configurationProperties(KafkaProperties kafkaProperties) {
              return new KafkaBinderConfigurationProperties();
            }

            @Bean
            KafkaTopicProvisioner provisioningProvider(KafkaBinderConfigurationProperties configurationProperties) {
              return new KafkaTopicProvisioner(configurationProperties, new Kafka10AdminUtilsOperation());
            }

            @Bean
            KafkaMessageChannelBinder kafkaMessageChannelBinder(KafkaBinderConfigurationProperties configurationProperties,
                KafkaTopicProvisioner provisioningProvider) {

              KafkaMessageChannelBinder kafkaMessageChannelBinder = new KafkaMessageChannelBinder(configurationProperties,
                  provisioningProvider);
          //    kafkaMessageChannelBinder.setProducerListener(producerListener);
              kafkaMessageChannelBinder.setExtendedBindingProperties(this.kafkaExtendedBindingProperties);
              return kafkaMessageChannelBinder;
            }

            @Bean
            public KafkaJaasLoginModuleInitializer jaasInitializer() throws IOException {
              return new KafkaJaasLoginModuleInitializer();
            }

          }
  1. 我添加的应用属性:

    management.health.binders.enabled = 真, management.health.kafka.enabled = 真

===========输出============= 当我在本地启动我的应用程序并点击 /health 端点时,我看到了 kafka 的以下内容:

 "binders": {
     "status": "UNKNOWN",
     "kafka": {
     "status": "UNKNOWN"
     }
  },

问题已通过使用最新版本 'spring-cloud-stream-binder-kafka' 得到解决。我最初使用的是旧版本(早于 1.3.0.RELEASE 的版本)并且 kafka 的健康检查没有工作。正如@Sobychacko 所建议的,我使用了最新版本 2.0.0 REALEASE 并且 kafka 活页夹的健康检查工作正常 :) 没有自定义健康指标 bean。

"binders": { "status": "UP", "kafka":{ "status": "UP", "healthIndicator":{ "status": "UP" } } },

此检查也适用于 1.3 版。0.RELEASE