如何在属于 AbstractConsumerSeekAware class 的 Spring Kafka 中实现 seekToEnd()?

how to implement seekToEnd() in Spring Kafka belonging to AbstractConsumerSeekAware class?

我有一个要求。我们有一个 spring boot kafka consumer 应用程序,它正在读取 kafka 主题。我们的要求是,每当应用程序出现故障和出现时,我都想开始使用最新的偏移量,而不是被旧值所困扰。是否有可能重置组的偏移量?我进行了一些研究,并使用 AbstractConsumerSeekAware 使用 seekToEnd() 将偏移量设置为末尾,如下面的代码所示。

public class KafkaConsumer extends AbstractConsumerSeekAware {

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;



    @KafkaListener(topics = "${topic.consumer}")
    public void consume(ConsumerRecord<?, ?> consumerRecord, Acknowledgment ack) {
        //consuming the message from consumer Record.
        seekToEnd();
        doSomething();
        ack.acknowledge();
    }

然而,当我们停止应用程序并重新启动它时,它开始从它离开的最后一个偏移量开始读取,但我们只想从应用程序启动时的偏移量读取。我们怎样才能做到这一点?

@KafkaListener 方法中这样做是不正确的。只有当消费者从分区传送记录时才真正调用这个。

出于这个原因,您必须实施 onPartitionsAssigned(),以便消费者在开始轮询之前寻找这些分区。

在文档中查看更多信息:https://docs.spring.io/spring-kafka/docs/current/reference/html/#seek