如何以编程方式自动配置?
How to auto configure programmatically?
我有一个 spring 引导 kafka 应用程序。我的经纪人每隔几天就会被回收一次。取消配置旧代理并配置新代理。
我有一个调度程序,每隔几个小时就会检查一次代理。我想确保一旦我们有新的经纪人,
我们应该重新加载所有 Spring Kafka 相关 bean。与 KafkaAutoConfiguration 非常相似,除了我想要触发代理值更改并以编程方式加载自动配置。
当旧代理被新代理替换时,如何以编程方式调用自动配置?
您的要求听起来像 Config Server in Spring Cloud:https://cloud.spring.io/spring-cloud-static/Greenwich.SR2/multi/multi__spring_cloud_config_2.html#_spring_cloud_config_2 with its @RefreshScope
feature: https://cloud.spring.io/spring-cloud-static/Greenwich.SR2/multi/multi__spring_cloud_context_application_context_services.html#refresh-scope.
因此,您需要指定自己的 bean 并用该注解标记它们:
@Bean
@RefreshScope
public ConsumerFactory<?, ?> kafkaConsumerFactory() {
return new DefaultKafkaConsumerFactory<>(this.properties.buildConsumerProperties());
}
@Bean
@RefreshScope
public ProducerFactory<?, ?> kafkaProducerFactory() {
DefaultKafkaProducerFactory<?, ?> factory = new DefaultKafkaProducerFactory<>(
this.properties.buildProducerProperties());
String transactionIdPrefix = this.properties.getProducer().getTransactionIdPrefix();
if (transactionIdPrefix != null) {
factory.setTransactionIdPrefix(transactionIdPrefix);
}
return factory;
}
这两个 bean 依赖于连接到 Apache Kafka 代理的配置属性,这足以让它们刷新。每当 ContextRefreshedEvent
发生时,这些 bean 将 re-initialized 具有新的配置属性。
我认为 ConsumerFactory
消费者(MessageListenerContainer
和 KafkaListenerEndpointRegistry
)也必须在该事件上重新启动。重点是 MessageListenerContainer
启动了一个 long-living 进程,因此缓存了一个 KafkaConsumer
实例用于 poll
目的。
所有 ProducerFactory
消费者不需要重新启动。即使 KafkaProducer
缓存在 DefaultKafkaProducerFactory
中,它也会在 @RefreshScope
阶段重新初始化。
更新
I don’t use config server. I get the new hosts from consul catalog service.
对,我没说你用的是Config Server。那只是以类似的方式寻找我。因此,从高处来看,我真的会研究一下您的 Consul 目录解决方案的 Config Client 实现。
尽管如此,您仍然可以发出一个 RefreshEvent
,这将触发所有 @RefreshScope
的 bean 重新加载。为此,您需要实现一个 ApplicationEventPublisherAware
并在您从 Consul 获得更新时发出该事件。请记住:必须重新启动 Kafka 侦听器容器。为此,您可以监听 RefreshScopeRefreshedEvent
,因为只有当所有 @RefreshScope
都已刷新时,您才真正对重启感兴趣。
更多关于刷新范围:https://gist.github.com/dsyer/a43fe5f74427b371519af68c5c4904c7
我有一个 spring 引导 kafka 应用程序。我的经纪人每隔几天就会被回收一次。取消配置旧代理并配置新代理。
我有一个调度程序,每隔几个小时就会检查一次代理。我想确保一旦我们有新的经纪人, 我们应该重新加载所有 Spring Kafka 相关 bean。与 KafkaAutoConfiguration 非常相似,除了我想要触发代理值更改并以编程方式加载自动配置。
当旧代理被新代理替换时,如何以编程方式调用自动配置?
您的要求听起来像 Config Server in Spring Cloud:https://cloud.spring.io/spring-cloud-static/Greenwich.SR2/multi/multi__spring_cloud_config_2.html#_spring_cloud_config_2 with its @RefreshScope
feature: https://cloud.spring.io/spring-cloud-static/Greenwich.SR2/multi/multi__spring_cloud_context_application_context_services.html#refresh-scope.
因此,您需要指定自己的 bean 并用该注解标记它们:
@Bean
@RefreshScope
public ConsumerFactory<?, ?> kafkaConsumerFactory() {
return new DefaultKafkaConsumerFactory<>(this.properties.buildConsumerProperties());
}
@Bean
@RefreshScope
public ProducerFactory<?, ?> kafkaProducerFactory() {
DefaultKafkaProducerFactory<?, ?> factory = new DefaultKafkaProducerFactory<>(
this.properties.buildProducerProperties());
String transactionIdPrefix = this.properties.getProducer().getTransactionIdPrefix();
if (transactionIdPrefix != null) {
factory.setTransactionIdPrefix(transactionIdPrefix);
}
return factory;
}
这两个 bean 依赖于连接到 Apache Kafka 代理的配置属性,这足以让它们刷新。每当 ContextRefreshedEvent
发生时,这些 bean 将 re-initialized 具有新的配置属性。
我认为 ConsumerFactory
消费者(MessageListenerContainer
和 KafkaListenerEndpointRegistry
)也必须在该事件上重新启动。重点是 MessageListenerContainer
启动了一个 long-living 进程,因此缓存了一个 KafkaConsumer
实例用于 poll
目的。
所有 ProducerFactory
消费者不需要重新启动。即使 KafkaProducer
缓存在 DefaultKafkaProducerFactory
中,它也会在 @RefreshScope
阶段重新初始化。
更新
I don’t use config server. I get the new hosts from consul catalog service.
对,我没说你用的是Config Server。那只是以类似的方式寻找我。因此,从高处来看,我真的会研究一下您的 Consul 目录解决方案的 Config Client 实现。
尽管如此,您仍然可以发出一个 RefreshEvent
,这将触发所有 @RefreshScope
的 bean 重新加载。为此,您需要实现一个 ApplicationEventPublisherAware
并在您从 Consul 获得更新时发出该事件。请记住:必须重新启动 Kafka 侦听器容器。为此,您可以监听 RefreshScopeRefreshedEvent
,因为只有当所有 @RefreshScope
都已刷新时,您才真正对重启感兴趣。
更多关于刷新范围:https://gist.github.com/dsyer/a43fe5f74427b371519af68c5c4904c7