Spring Kafka Consumer如何实现高性能
How to achieve high performance of Spring Kafka Consumer
如何提高 Kafka 消费者的性能?我有(并且需要)至少一次 Kafka 消费者语义
我有下面的 configuration.The processInDB() 需要 2 分钟才能完成。所以处理 10 条消息(全部在单个分区中)需要 20 分钟(假设每条消息 2 分钟)。我可以在不同的线程中调用 processInDB,但我可能会丢失消息!如何在 2 到 4 分钟内处理所有 10 条消息 window?
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "grpid-mytopic120112141");
props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 10);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
props.put(JsonDeserializer.TRUSTED_PACKAGES, "*");
ConcurrentKafkaListenerContainerFactory<String, ValidatedConsumerClass> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.getContainerProperties().setAckMode(AckMode.RECORD);
factory.setErrorHandler(errorHandler());
下面是我的 Kafka 消费者代码。
@KafkaListener(id = "foo", topics = "mytopic-3", concurrency = "6", groupId = "mytopic-1-groupid")
public void consumeFromTopic1(@Payload @Valid ValidatedConsumerClass message, ConsumerRecordMetadata c) {
dbservice.processInDB(message);
}
使用批处理侦听器会有所帮助 - 您只需要在侦听器中暂停使用者线程,直到所有单独的记录都已完成处理。
在下一个版本(今天发布的 2.8.0-M1 里程碑)中,支持乱序手动确认,其中框架将提交推迟到“差距被填补”https://docs.spring.io/spring-kafka/docs/2.8.0-M1/reference/html/#x28-ooo-commits
另一个与 spring kafka 不完全相关的建议,正如你在标签中所述,你也在探索消费者 api 而不仅仅是 spring kafka,所以我允许我自己在这里建议,你可能想测试一下 api
https://www.confluent.io/blog/introducing-confluent-parallel-message-processing-client/
https://github.com/confluentinc/parallel-consumer
- 它处于 alpha 阶段,因此不推荐用于生产,但也可能会关注它
但正如我之前的评论所述,您可能只想创建更多分区
如何提高 Kafka 消费者的性能?我有(并且需要)至少一次 Kafka 消费者语义
我有下面的 configuration.The processInDB() 需要 2 分钟才能完成。所以处理 10 条消息(全部在单个分区中)需要 20 分钟(假设每条消息 2 分钟)。我可以在不同的线程中调用 processInDB,但我可能会丢失消息!如何在 2 到 4 分钟内处理所有 10 条消息 window?
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "grpid-mytopic120112141");
props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 10);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
props.put(JsonDeserializer.TRUSTED_PACKAGES, "*");
ConcurrentKafkaListenerContainerFactory<String, ValidatedConsumerClass> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.getContainerProperties().setAckMode(AckMode.RECORD);
factory.setErrorHandler(errorHandler());
下面是我的 Kafka 消费者代码。
@KafkaListener(id = "foo", topics = "mytopic-3", concurrency = "6", groupId = "mytopic-1-groupid")
public void consumeFromTopic1(@Payload @Valid ValidatedConsumerClass message, ConsumerRecordMetadata c) {
dbservice.processInDB(message);
}
使用批处理侦听器会有所帮助 - 您只需要在侦听器中暂停使用者线程,直到所有单独的记录都已完成处理。
在下一个版本(今天发布的 2.8.0-M1 里程碑)中,支持乱序手动确认,其中框架将提交推迟到“差距被填补”https://docs.spring.io/spring-kafka/docs/2.8.0-M1/reference/html/#x28-ooo-commits
另一个与 spring kafka 不完全相关的建议,正如你在标签中所述,你也在探索消费者 api 而不仅仅是 spring kafka,所以我允许我自己在这里建议,你可能想测试一下 api
https://www.confluent.io/blog/introducing-confluent-parallel-message-processing-client/
https://github.com/confluentinc/parallel-consumer
- 它处于 alpha 阶段,因此不推荐用于生产,但也可能会关注它
但正如我之前的评论所述,您可能只想创建更多分区