存在偏移量时如何使用最大偏移量的kafka?

How to consume kafka frome the max offset when the offset is exist?

我已阅读文档并找到配置 "auto.offset.reset" :

ZooKeeper 中没有初始 偏移量或偏移量超出范围时该怎么办:

问题是我曾经使用组ID消费kafka,我想保留组ID但放弃旧消息。

我该怎么做?

您可以尝试在 ConsumerRebalanceListener.onPartitionsAssigned 中的消费者中执行此操作:

public void onPartitionsAssigned(Collection<TopicPartition> partitions) {   
    kafkaConsumer.seekToEnd(partitions);
}