使用 spring kafka 重新加载消费者属性

Reload consumer properties with spring kafka

我们开发了使用 spring 引导应用程序实时重新加载配置属性。我有一个 spring-kafka 消费者,我想利用实时重新加载,如果我更改消费者 属性 我应该能够在不重新启动应用程序的情况下启动容器。我用过:

KafkaListenerEndpointRegistry.stop()
KafkaListenerEndpointRegistry.start()

我以为上面实际上创建了一个新容器,但事实并非如此。所以我想知道我是否必须启动一个具有新配置属性的容器,我该怎么做

 @Bean
    @ConfigurationProperties(prefix = "container.config.properties")
    @ConditionalOnMissingBean
    @RefreshScope
    ContainerConfigProperties containerConfigProperties() {
        return new ContainerConfigProperties();
    }

    @Bean
    @ConditionalOnMissingBean
    @ConditionalOnBean(value = {ContainerConfigProperties.class})
    @RefreshScope
    <K, V> ConcurrentKafkaListenerContainerFactory<K, ValueDeserializerContainer<V>> kafkaListenerContainerFactory(final ConsumerFactory<K, ValueDeserializerContainer<V>> consumerFactory,
                                                                                                                   final ContainerConfigProperties containerConfigProperties,
                                                                                                                   final Optional<IAMIdentity> iamIdentity) {
        val factory = new ConcurrentKafkaListenerContainerFactory<K, ValueDeserializerContainer<V>>();
        factory.setBatchListener(true);
        factory.setBatchErrorHandler(new SeekToCurrentBatchErrorHandler());
        factory.setConsumerFactory(consumerFactory);
        factory.getContainerProperties().setAckMode(containerConfigProperties.getAckMode());
        factory.setConcurrency(containerConfigProperties.getConcurrency());
        factory.getContainerProperties().setConsumerRebalanceListener(simpleConsumerRebalanceListener());

        // update kafka consumer properties. Default is taken from the config file
        iamIdentity.ifPresent(identity -> consumerFactory.updateConfigs(addIAMIdentity(identity)));
        log.info("kafkaListenerContainerFactory");

        return factory;
    }

您具体要更改哪些属性?当 stopping/starting 父容器时确实会重新创建子容器,因此任何 ContainerProperties 更改都将被拾取。

如果您谈论的是 kafka 消费者属性,则需要重新配置消费者工厂,或者通过 ContainerProperties.kafkaConsumerProperties 设置更改的属性以覆盖消费者工厂设置。

编辑

类似这样的方法可能有效:

@Bean
@RefreshScope
Object containerReconfigurer(KafkaListenerEndpointRegistry registry) {
    registry.getListenerContainers().forEach(container -> {
        container.stop();
        // reconfigure container
        container.start();
    });
    return null;
}