ConsumerAwareRebalanceListener 实现

ConsumerAwareRebalanceListener implementation

我是第一次在生产中使用 spring-kafka,所以这可能是一个幼稚的问题。我正在使用 @KafkaListener 来使用我的消息。在 ConcurrentKafkaListenerContainerFactory 中,我添加了 ConsumerAwareRebalanceListener 的实现。我的用例非常简单,我想提交已处理记录的偏移量,但由于会话超时而发生的重新分区,我无法提交它们。现在的问题是 onPartitionsRevokedBeforeCommit 即使在 onPartitionsLost 上也会被调用,所以我得到了异常,因为消费者不再有分区了。现在我可以忍受它,因为我可以记录异常并发出警报并重新处理消息。但在考虑是否有更好的方法来处理这个问题。

您可以在侦听器中覆盖 onPartitionsLost 并设置一个布尔值以忽略这次对 onPartitionsRevoked 的调用(并重置布尔值)。

如果布尔值是 false,则表示它是正常的 onPartitionsRevoked)。