添加分区 Spring boot kafka 需要一些时间才能生效

Adding partition on Spring boot kafka take some time to effective

我正在更改 Kafka 主题,为其添加分区(从 3 个分区到 4 个分区)。 在消费者上,我已经有 4 个并发,使用 @KafkaListener

@KafkaListener(topics = "t_multi_partitions", concurrency = "4")

生产者每隔几秒发送一次消息,但通常每条消息少于 5 秒。

有趣的是,当我改变分区时

kafka-topics.sh --bootstrap-server localhost:9092 --alter --topic t_multi_partitions --partitions 4

不直接生效
生产者开始向第 4 个分区发送消息大约需要 5 分钟,重新平衡也需要 5 分钟,因此第 4 个消费者生效

这正常吗?我正在使用 Spring boot 2.1 我该如何调整这 5 分钟的时间(更长或更短)?

消费者代码(简体)

@KafkaListener(topics = "t_multi_partitions", concurrency = "4")
public void listen(ConsumerRecord<String, String> message) throws InterruptedException {
    log.info("Key : {}, Partition : {}, Message : {}", message.key(), message.partition(), message.value());
}

这是由 Apache-Kafka 而不是 Spring-Kafka 控制的。行为正常。

您可以通过调整metadata.max.age.ms来修改刷新率。这是生产者和消费者配置。生产者和消费者配置不需要具有相同的配置值。

引自 Apache-Kafka documentation.

The period of time in milliseconds after which we force a refresh of metadata even if we haven't seen any partition leadership changes to proactively discover any new brokers or partitions.

Spring-Kafka 在您的示例中是 运行 四个 @KafkaListener 线程。根据 partition.assignment.strategy 定义的分配器策略,将主题和分区委托给每个 @KafkaListenerRangeAssignor 是默认策略。这是消费者配置。

引自 Apache-Kafka 文档。

The class name of the partition assignment strategy that the client will use to distribute partition ownership amongst consumer instances when group management is used

基于@kkflf 指导,生产者配置

@Configuration
public class KafkaConfig {

    @Autowired
    private KafkaProperties kafkaProperties;

    @Bean
    public KafkaTemplate<String, String> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }

    @Bean
    public Map<String, Object> producerConfigs() {
        var props = kafkaProperties.buildProducerProperties();
        props.put(ProducerConfig.METADATA_MAX_AGE_CONFIG, "600000");

        return props;
    }

    @Bean
    public ProducerFactory<String, String> producerFactory() {
        return new DefaultKafkaProducerFactory<>(producerConfigs());
    }

}