使用@KafkaListener 注释时如何知道何时调用 consumer.poll()?
How to know when consumer.poll() is called while using @KafkaListener annotation?
我知道如果我使用@KafkaListener,我无法控制何时进行轮询,我从中了解到
the next poll()
is performed after the last message from the previous poll has been processed by the listener.
所以我想知道如何知道每个 poll()
何时执行?或者等价地,处理每个 poll()
调用中收到的所有消息需要多长时间?
我问是因为我的程序出现“偏移量提交失败...请求超时”异常,我想调整我的消费者配置,即 max.poll.interval.ms
和 max.poll.records
,但是我要先知道现在的表现。
如果有帮助,这是我的@KafkaListener 方法的一部分:
@KafkaListener(id = "dataListener", topics ="${spring.kafka.topic}", containerFactory = "kafkaListenerContainerFactory")
public void listen(@Payload(required = false) ConsumerRecord payload, @Header(KafkaHeaders.RECEIVED_PARTITION_ID) String partition,
@Header(KafkaHeaders.OFFSET)Long offset, @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY)String messageKey){
// processing messages
}
您可以通过打开 DEBUG 日志记录来查看轮询 activity。
this.logger.debug(() -> "Received: " + records.count() + " records");
在每个 poll()
后记录。
我知道如果我使用@KafkaListener,我无法控制何时进行轮询,我从
the next
poll()
is performed after the last message from the previous poll has been processed by the listener.
所以我想知道如何知道每个 poll()
何时执行?或者等价地,处理每个 poll()
调用中收到的所有消息需要多长时间?
我问是因为我的程序出现“偏移量提交失败...请求超时”异常,我想调整我的消费者配置,即 max.poll.interval.ms
和 max.poll.records
,但是我要先知道现在的表现。
如果有帮助,这是我的@KafkaListener 方法的一部分:
@KafkaListener(id = "dataListener", topics ="${spring.kafka.topic}", containerFactory = "kafkaListenerContainerFactory")
public void listen(@Payload(required = false) ConsumerRecord payload, @Header(KafkaHeaders.RECEIVED_PARTITION_ID) String partition,
@Header(KafkaHeaders.OFFSET)Long offset, @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY)String messageKey){
// processing messages
}
您可以通过打开 DEBUG 日志记录来查看轮询 activity。
this.logger.debug(() -> "Received: " + records.count() + " records");
在每个 poll()
后记录。