使用 spring kafka 重新加载消费者属性
Reload consumer properties with spring kafka
我们开发了使用 spring 引导应用程序实时重新加载配置属性。我有一个 spring-kafka 消费者,我想利用实时重新加载,如果我更改消费者 属性 我应该能够在不重新启动应用程序的情况下启动容器。我用过:
KafkaListenerEndpointRegistry.stop()
KafkaListenerEndpointRegistry.start()
我以为上面实际上创建了一个新容器,但事实并非如此。所以我想知道我是否必须启动一个具有新配置属性的容器,我该怎么做
@Bean
@ConfigurationProperties(prefix = "container.config.properties")
@ConditionalOnMissingBean
@RefreshScope
ContainerConfigProperties containerConfigProperties() {
return new ContainerConfigProperties();
}
@Bean
@ConditionalOnMissingBean
@ConditionalOnBean(value = {ContainerConfigProperties.class})
@RefreshScope
<K, V> ConcurrentKafkaListenerContainerFactory<K, ValueDeserializerContainer<V>> kafkaListenerContainerFactory(final ConsumerFactory<K, ValueDeserializerContainer<V>> consumerFactory,
final ContainerConfigProperties containerConfigProperties,
final Optional<IAMIdentity> iamIdentity) {
val factory = new ConcurrentKafkaListenerContainerFactory<K, ValueDeserializerContainer<V>>();
factory.setBatchListener(true);
factory.setBatchErrorHandler(new SeekToCurrentBatchErrorHandler());
factory.setConsumerFactory(consumerFactory);
factory.getContainerProperties().setAckMode(containerConfigProperties.getAckMode());
factory.setConcurrency(containerConfigProperties.getConcurrency());
factory.getContainerProperties().setConsumerRebalanceListener(simpleConsumerRebalanceListener());
// update kafka consumer properties. Default is taken from the config file
iamIdentity.ifPresent(identity -> consumerFactory.updateConfigs(addIAMIdentity(identity)));
log.info("kafkaListenerContainerFactory");
return factory;
}
您具体要更改哪些属性?当 stopping/starting 父容器时确实会重新创建子容器,因此任何 ContainerProperties
更改都将被拾取。
如果您谈论的是 kafka 消费者属性,则需要重新配置消费者工厂,或者通过 ContainerProperties.kafkaConsumerProperties
设置更改的属性以覆盖消费者工厂设置。
编辑
类似这样的方法可能有效:
@Bean
@RefreshScope
Object containerReconfigurer(KafkaListenerEndpointRegistry registry) {
registry.getListenerContainers().forEach(container -> {
container.stop();
// reconfigure container
container.start();
});
return null;
}
我们开发了使用 spring 引导应用程序实时重新加载配置属性。我有一个 spring-kafka 消费者,我想利用实时重新加载,如果我更改消费者 属性 我应该能够在不重新启动应用程序的情况下启动容器。我用过:
KafkaListenerEndpointRegistry.stop()
KafkaListenerEndpointRegistry.start()
我以为上面实际上创建了一个新容器,但事实并非如此。所以我想知道我是否必须启动一个具有新配置属性的容器,我该怎么做
@Bean
@ConfigurationProperties(prefix = "container.config.properties")
@ConditionalOnMissingBean
@RefreshScope
ContainerConfigProperties containerConfigProperties() {
return new ContainerConfigProperties();
}
@Bean
@ConditionalOnMissingBean
@ConditionalOnBean(value = {ContainerConfigProperties.class})
@RefreshScope
<K, V> ConcurrentKafkaListenerContainerFactory<K, ValueDeserializerContainer<V>> kafkaListenerContainerFactory(final ConsumerFactory<K, ValueDeserializerContainer<V>> consumerFactory,
final ContainerConfigProperties containerConfigProperties,
final Optional<IAMIdentity> iamIdentity) {
val factory = new ConcurrentKafkaListenerContainerFactory<K, ValueDeserializerContainer<V>>();
factory.setBatchListener(true);
factory.setBatchErrorHandler(new SeekToCurrentBatchErrorHandler());
factory.setConsumerFactory(consumerFactory);
factory.getContainerProperties().setAckMode(containerConfigProperties.getAckMode());
factory.setConcurrency(containerConfigProperties.getConcurrency());
factory.getContainerProperties().setConsumerRebalanceListener(simpleConsumerRebalanceListener());
// update kafka consumer properties. Default is taken from the config file
iamIdentity.ifPresent(identity -> consumerFactory.updateConfigs(addIAMIdentity(identity)));
log.info("kafkaListenerContainerFactory");
return factory;
}
您具体要更改哪些属性?当 stopping/starting 父容器时确实会重新创建子容器,因此任何 ContainerProperties
更改都将被拾取。
如果您谈论的是 kafka 消费者属性,则需要重新配置消费者工厂,或者通过 ContainerProperties.kafkaConsumerProperties
设置更改的属性以覆盖消费者工厂设置。
编辑
类似这样的方法可能有效:
@Bean
@RefreshScope
Object containerReconfigurer(KafkaListenerEndpointRegistry registry) {
registry.getListenerContainers().forEach(container -> {
container.stop();
// reconfigure container
container.start();
});
return null;
}