添加分区 Spring boot kafka 需要一些时间才能生效
Adding partition on Spring boot kafka take some time to effective
我正在更改 Kafka 主题,为其添加分区(从 3 个分区到 4 个分区)。
在消费者上,我已经有 4 个并发,使用 @KafkaListener
@KafkaListener(topics = "t_multi_partitions", concurrency = "4")
生产者每隔几秒发送一次消息,但通常每条消息少于 5 秒。
有趣的是,当我改变分区时
kafka-topics.sh --bootstrap-server localhost:9092 --alter --topic t_multi_partitions --partitions 4
不直接生效
生产者开始向第 4 个分区发送消息大约需要 5 分钟,重新平衡也需要 5 分钟,因此第 4 个消费者生效
这正常吗?我正在使用 Spring boot 2.1
我该如何调整这 5 分钟的时间(更长或更短)?
消费者代码(简体)
@KafkaListener(topics = "t_multi_partitions", concurrency = "4")
public void listen(ConsumerRecord<String, String> message) throws InterruptedException {
log.info("Key : {}, Partition : {}, Message : {}", message.key(), message.partition(), message.value());
}
这是由 Apache-Kafka 而不是 Spring-Kafka 控制的。行为正常。
您可以通过调整metadata.max.age.ms
来修改刷新率。这是生产者和消费者配置。生产者和消费者配置不需要具有相同的配置值。
引自 Apache-Kafka documentation.
The period of time in milliseconds after which we force a refresh of
metadata even if we haven't seen any partition leadership changes to
proactively discover any new brokers or partitions.
Spring-Kafka 在您的示例中是 运行 四个 @KafkaListener
线程。根据 partition.assignment.strategy
定义的分配器策略,将主题和分区委托给每个 @KafkaListener
。 RangeAssignor
是默认策略。这是消费者配置。
引自 Apache-Kafka 文档。
The class name of the partition assignment strategy that the client
will use to distribute partition ownership amongst consumer instances
when group management is used
基于@kkflf 指导,生产者配置
@Configuration
public class KafkaConfig {
@Autowired
private KafkaProperties kafkaProperties;
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
@Bean
public Map<String, Object> producerConfigs() {
var props = kafkaProperties.buildProducerProperties();
props.put(ProducerConfig.METADATA_MAX_AGE_CONFIG, "600000");
return props;
}
@Bean
public ProducerFactory<String, String> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs());
}
}
我正在更改 Kafka 主题,为其添加分区(从 3 个分区到 4 个分区)。
在消费者上,我已经有 4 个并发,使用 @KafkaListener
@KafkaListener(topics = "t_multi_partitions", concurrency = "4")
生产者每隔几秒发送一次消息,但通常每条消息少于 5 秒。
有趣的是,当我改变分区时
kafka-topics.sh --bootstrap-server localhost:9092 --alter --topic t_multi_partitions --partitions 4
不直接生效
生产者开始向第 4 个分区发送消息大约需要 5 分钟,重新平衡也需要 5 分钟,因此第 4 个消费者生效
这正常吗?我正在使用 Spring boot 2.1 我该如何调整这 5 分钟的时间(更长或更短)?
消费者代码(简体)
@KafkaListener(topics = "t_multi_partitions", concurrency = "4")
public void listen(ConsumerRecord<String, String> message) throws InterruptedException {
log.info("Key : {}, Partition : {}, Message : {}", message.key(), message.partition(), message.value());
}
这是由 Apache-Kafka 而不是 Spring-Kafka 控制的。行为正常。
您可以通过调整metadata.max.age.ms
来修改刷新率。这是生产者和消费者配置。生产者和消费者配置不需要具有相同的配置值。
引自 Apache-Kafka documentation.
The period of time in milliseconds after which we force a refresh of metadata even if we haven't seen any partition leadership changes to proactively discover any new brokers or partitions.
Spring-Kafka 在您的示例中是 运行 四个 @KafkaListener
线程。根据 partition.assignment.strategy
定义的分配器策略,将主题和分区委托给每个 @KafkaListener
。 RangeAssignor
是默认策略。这是消费者配置。
引自 Apache-Kafka 文档。
The class name of the partition assignment strategy that the client will use to distribute partition ownership amongst consumer instances when group management is used
基于@kkflf 指导,生产者配置
@Configuration
public class KafkaConfig {
@Autowired
private KafkaProperties kafkaProperties;
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
@Bean
public Map<String, Object> producerConfigs() {
var props = kafkaProperties.buildProducerProperties();
props.put(ProducerConfig.METADATA_MAX_AGE_CONFIG, "600000");
return props;
}
@Bean
public ProducerFactory<String, String> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs());
}
}