如何在 Spring Cloud Stream 中获取自动生成的 KafkaTemplate?
How to Grab Auto-Generated KafkaTemplate in Spring Cloud Stream?
实施 HealthIndicator 的几个示例需要 KafkaTemplate。我实际上并没有手动创建 KafkaTemplate,但 HealthIndicator 需要一个。有没有办法自动获取创建的 KafkaTemplate(使用 application.yml 配置)?这与在新创建的 consumerFactory 中手动创建已存在于 application.yml 中的重复配置不同。
参见。
(S)他想要访问模板的 ProducerFactory
但您可以使用相同的技术来获取对模板的引用。
也就是说,活页夹自带健康指标。
我们也有创建自定义健康检查程序的相同要求Spring 云流。我们利用了内置健康检查器 (KafkaBinderHealthIndicator
)。但是在注入 KafkaBinderHealthIndicator bean 时面临很多问题。因此,我们注入了健康检查器持有者 HealthContributorRegistry
,并从中获取了 KafkaBinderHealthIndicator bean。
示例:
@Component
@Slf4j
@RequiredArgsConstructor
public class KafkaHealthChecker implements ComponentHealthChecker {
private final HealthContributorRegistry registry;
public String checkHealth() {
String status;
try {
BindersHealthContributor bindersHealthContributor = (BindersHealthContributor)
registry.getContributor("binders");
KafkaBinderHealthIndicator kafkaBinderHealthIndicator = (KafkaBinderHealthIndicator)
bindersHealthContributor.getContributor("kafka");
Health health = kafkaBinderHealthIndicator.health();
status = UP.equals(health.getStatus()) ? "OK" : "FAIL";
} catch (Exception e) {
log.error("Error occurred while checking the kafka health ", e);
status = "DEGRADED";
}
return status;
}
}
实施 HealthIndicator 的几个示例需要 KafkaTemplate。我实际上并没有手动创建 KafkaTemplate,但 HealthIndicator 需要一个。有没有办法自动获取创建的 KafkaTemplate(使用 application.yml 配置)?这与在新创建的 consumerFactory 中手动创建已存在于 application.yml 中的重复配置不同。
参见
(S)他想要访问模板的 ProducerFactory
但您可以使用相同的技术来获取对模板的引用。
也就是说,活页夹自带健康指标。
我们也有创建自定义健康检查程序的相同要求Spring 云流。我们利用了内置健康检查器 (KafkaBinderHealthIndicator
)。但是在注入 KafkaBinderHealthIndicator bean 时面临很多问题。因此,我们注入了健康检查器持有者 HealthContributorRegistry
,并从中获取了 KafkaBinderHealthIndicator bean。
示例:
@Component
@Slf4j
@RequiredArgsConstructor
public class KafkaHealthChecker implements ComponentHealthChecker {
private final HealthContributorRegistry registry;
public String checkHealth() {
String status;
try {
BindersHealthContributor bindersHealthContributor = (BindersHealthContributor)
registry.getContributor("binders");
KafkaBinderHealthIndicator kafkaBinderHealthIndicator = (KafkaBinderHealthIndicator)
bindersHealthContributor.getContributor("kafka");
Health health = kafkaBinderHealthIndicator.health();
status = UP.equals(health.getStatus()) ? "OK" : "FAIL";
} catch (Exception e) {
log.error("Error occurred while checking the kafka health ", e);
status = "DEGRADED";
}
return status;
}
}