为卡夫卡消费者重置以前的偏移量

reset previous offset for a kafka consumer

我想重置对应于给定消费者的先前偏移量。

原因:我编写应用程序的 spring-boot 消费者代码使用 "earliest" 作为自动偏移重置的值。由于偏移量现在已存储在 __consumer_offsets 中,因此将自动偏移量重置的值更改为最新不起作用。

注意:我使用的kafka版本高于0.9。不确定删除消费者是否有帮助,因为我知道偏移量现在存储在主题 __consumer_offsets.

如果您的侦听器实现了 ConsumerSeekAware,您可以寻找消费者。参见 the documentation

void registerSeekCallback(ConsumerSeekCallback callback);

void onPartitionsAssigned(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback);

void onIdleContainer(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback);

回调提供:

void seek(String topic, int partition, long offset);