使用 Spring Kafka 降低处理速度的选项
Options to reduce processing rate using Spring Kafka
我正在使用 Spring Boot 2.2.7 和 Spring Kafka。我有一个 KafkaListener,它持续处理来自主题的统计数据并将数据写入 MongoDB 和 Elasticsearch(使用 Spring 数据)。
我的配置如下:
@Configuration
public class StatListenerConfig {
@Autowired
private KafkaConfig kafkaConfig;
@Bean
public ConsumerFactory<String, StatsRequestDto> statsConsumerFactory() {
return new DefaultKafkaConsumerFactory<>(kafkaConfig.statsConsumerConfigs());
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, StatsRequestDto> kafkaStatsListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, StatsRequestDto> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(statsConsumerFactory());
factory.getContainerProperties().setAckMode(AckMode.RECORD);
return factory;
}
}
@Service
public class StatListener {
private static final Logger LOGGER = LoggerFactory.getLogger(StatListener.class);
@Autowired
private StatsService statsService;
@KafkaListener(topics = "${kafka.topic.stats}", containerFactory = "kafkaStatsListenerContainerFactory")
public void receive(@Payload StatsRequestDto data) {
Stat stats = statsService.convertToStats(data);
statsService.save(stats).get();
}
}
save
方法是一个异步方法。
我遇到的问题是,在处理队列时,Elastisearch CPU 消耗约为 250%。这会导致整个应用程序偶尔出现超时错误。我正在研究如何优化 Elasticsearch,因为索引会导致 CPU 峰值。
我想检查一下,如果我使用异步方法(如上),主题中的下一条消息在上一条消息完成之前不会被处理。如果这是正确的,我可以使用 Spring Kafka 中的哪些选项来减轻可能需要时间才能完成的下游操作的压力。
如有任何建议,我们将不胜感激。
在 2.3 版中,我们添加了 idleBetweenPolls
容器 属性。
对于早期版本,您可以模拟这种情况,例如,在收到一定数量的记录后在消费者中休眠一段时间。
您只需确保轮询返回的记录的睡眠+处理时间不超过 max.poll.intervsl.ms
,以避免重新平衡。
我正在使用 Spring Boot 2.2.7 和 Spring Kafka。我有一个 KafkaListener,它持续处理来自主题的统计数据并将数据写入 MongoDB 和 Elasticsearch(使用 Spring 数据)。
我的配置如下:
@Configuration
public class StatListenerConfig {
@Autowired
private KafkaConfig kafkaConfig;
@Bean
public ConsumerFactory<String, StatsRequestDto> statsConsumerFactory() {
return new DefaultKafkaConsumerFactory<>(kafkaConfig.statsConsumerConfigs());
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, StatsRequestDto> kafkaStatsListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, StatsRequestDto> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(statsConsumerFactory());
factory.getContainerProperties().setAckMode(AckMode.RECORD);
return factory;
}
}
@Service
public class StatListener {
private static final Logger LOGGER = LoggerFactory.getLogger(StatListener.class);
@Autowired
private StatsService statsService;
@KafkaListener(topics = "${kafka.topic.stats}", containerFactory = "kafkaStatsListenerContainerFactory")
public void receive(@Payload StatsRequestDto data) {
Stat stats = statsService.convertToStats(data);
statsService.save(stats).get();
}
}
save
方法是一个异步方法。
我遇到的问题是,在处理队列时,Elastisearch CPU 消耗约为 250%。这会导致整个应用程序偶尔出现超时错误。我正在研究如何优化 Elasticsearch,因为索引会导致 CPU 峰值。
我想检查一下,如果我使用异步方法(如上),主题中的下一条消息在上一条消息完成之前不会被处理。如果这是正确的,我可以使用 Spring Kafka 中的哪些选项来减轻可能需要时间才能完成的下游操作的压力。
如有任何建议,我们将不胜感激。
在 2.3 版中,我们添加了 idleBetweenPolls
容器 属性。
对于早期版本,您可以模拟这种情况,例如,在收到一定数量的记录后在消费者中休眠一段时间。
您只需确保轮询返回的记录的睡眠+处理时间不超过 max.poll.intervsl.ms
,以避免重新平衡。