使用 Spring Kafka 降低处理速度的选项

Options to reduce processing rate using Spring Kafka

我正在使用 Spring Boot 2.2.7 和 Spring Kafka。我有一个 KafkaListener,它持续处理来自主题的统计数据并将数据写入 MongoDB 和 Elasticsearch(使用 Spring 数据)。

我的配置如下:

@Configuration
public class StatListenerConfig {

    @Autowired
    private KafkaConfig kafkaConfig;
  
    @Bean
    public ConsumerFactory<String, StatsRequestDto> statsConsumerFactory() {
        return new DefaultKafkaConsumerFactory<>(kafkaConfig.statsConsumerConfigs());
    }
  
    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, StatsRequestDto> kafkaStatsListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, StatsRequestDto> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(statsConsumerFactory());
        factory.getContainerProperties().setAckMode(AckMode.RECORD);
        return factory;
    }
}

@Service
public class StatListener {

    private static final Logger LOGGER = LoggerFactory.getLogger(StatListener.class);

    @Autowired
    private StatsService statsService;

    @KafkaListener(topics = "${kafka.topic.stats}", containerFactory = "kafkaStatsListenerContainerFactory")
    public void receive(@Payload StatsRequestDto data) {
      
        Stat stats = statsService.convertToStats(data);
        statsService.save(stats).get();   
    }
}

save 方法是一个异步方法。

我遇到的问题是,在处理队列时,Elastisearch CPU 消耗约为 250%。这会导致整个应用程序偶尔出现超时错误。我正在研究如何优化 Elasticsearch,因为索引会导致 CPU 峰值。

我想检查一下,如果我使用异步方法(如上),主题中的下一条消息在上一条消息完成之前不会被处理。如果这是正确的,我可以使用 Spring Kafka 中的哪些选项来减轻可能需要时间才能完成的下游操作的压力。

如有任何建议,我们将不胜感激。

在 2.3 版中,我们添加了 idleBetweenPolls 容器 属性。

对于早期版本,您可以模拟这种情况,例如,在收到一定数量的记录后在消费者中休眠一段时间。

您只需确保轮询返回的记录的睡眠+处理时间不超过 max.poll.intervsl.ms,以避免重新平衡。