如何在属于 AbstractConsumerSeekAware class 的 Spring Kafka 中实现 seekToEnd()?
how to implement seekToEnd() in Spring Kafka belonging to AbstractConsumerSeekAware class?
我有一个要求。我们有一个 spring boot kafka consumer 应用程序,它正在读取 kafka 主题。我们的要求是,每当应用程序出现故障和出现时,我都想开始使用最新的偏移量,而不是被旧值所困扰。是否有可能重置组的偏移量?我进行了一些研究,并使用 AbstractConsumerSeekAware 使用 seekToEnd() 将偏移量设置为末尾,如下面的代码所示。
public class KafkaConsumer extends AbstractConsumerSeekAware {
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
@KafkaListener(topics = "${topic.consumer}")
public void consume(ConsumerRecord<?, ?> consumerRecord, Acknowledgment ack) {
//consuming the message from consumer Record.
seekToEnd();
doSomething();
ack.acknowledge();
}
然而,当我们停止应用程序并重新启动它时,它开始从它离开的最后一个偏移量开始读取,但我们只想从应用程序启动时的偏移量读取。我们怎样才能做到这一点?
在 @KafkaListener
方法中这样做是不正确的。只有当消费者从分区传送记录时才真正调用这个。
出于这个原因,您必须实施 onPartitionsAssigned()
,以便消费者在开始轮询之前寻找这些分区。
在文档中查看更多信息:https://docs.spring.io/spring-kafka/docs/current/reference/html/#seek
我有一个要求。我们有一个 spring boot kafka consumer 应用程序,它正在读取 kafka 主题。我们的要求是,每当应用程序出现故障和出现时,我都想开始使用最新的偏移量,而不是被旧值所困扰。是否有可能重置组的偏移量?我进行了一些研究,并使用 AbstractConsumerSeekAware 使用 seekToEnd() 将偏移量设置为末尾,如下面的代码所示。
public class KafkaConsumer extends AbstractConsumerSeekAware {
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
@KafkaListener(topics = "${topic.consumer}")
public void consume(ConsumerRecord<?, ?> consumerRecord, Acknowledgment ack) {
//consuming the message from consumer Record.
seekToEnd();
doSomething();
ack.acknowledge();
}
然而,当我们停止应用程序并重新启动它时,它开始从它离开的最后一个偏移量开始读取,但我们只想从应用程序启动时的偏移量读取。我们怎样才能做到这一点?
在 @KafkaListener
方法中这样做是不正确的。只有当消费者从分区传送记录时才真正调用这个。
出于这个原因,您必须实施 onPartitionsAssigned()
,以便消费者在开始轮询之前寻找这些分区。
在文档中查看更多信息:https://docs.spring.io/spring-kafka/docs/current/reference/html/#seek