如何在 Spring Cloud Stream 中获取自动生成的 KafkaTemplate?

How to Grab Auto-Generated KafkaTemplate in Spring Cloud Stream?

实施 HealthIndicator 的几个示例需要 KafkaTemplate。我实际上并没有手动创建 KafkaTemplate,但 HealthIndicator 需要一个。有没有办法自动获取创建的 KafkaTemplate(使用 application.yml 配置)?这与在新创建的 consumerFactory 中手动创建已存在于 application.yml 中的重复配置不同。

参见

(S)他想要访问模板的 ProducerFactory 但您可以使用相同的技术来获取对模板的引用。

也就是说,活页夹自带健康指标。

https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/blob/a7299df63f495af3a798637551c2179c947af9cf/spring-cloud-stream-binder-kafka/src/main/java/org/springframework/cloud/stream/binder/kafka/KafkaBinderHealthIndicator.java#L52

我们也有创建自定义健康检查程序的相同要求Spring 云流。我们利用了内置健康检查器 (KafkaBinderHealthIndicator)。但是在注入 KafkaBinderHealthIndicator bean 时面临很多问题。因此,我们注入了健康检查器持有者 HealthContributorRegistry,并从中获取了 KafkaBinderHealthIndicator bean。

示例:

@Component
@Slf4j
@RequiredArgsConstructor
public class KafkaHealthChecker implements ComponentHealthChecker {

  private final HealthContributorRegistry registry;


  public String checkHealth() {
    String status;
    try {
      BindersHealthContributor bindersHealthContributor = (BindersHealthContributor)
          registry.getContributor("binders");
      KafkaBinderHealthIndicator kafkaBinderHealthIndicator = (KafkaBinderHealthIndicator)
          bindersHealthContributor.getContributor("kafka");

      Health health = kafkaBinderHealthIndicator.health();
      status = UP.equals(health.getStatus()) ? "OK" : "FAIL";
    } catch (Exception e) {
      log.error("Error occurred while checking the kafka health ", e);
      status = "DEGRADED";
    }
    return status;
  }

}