为卡夫卡消费者重置以前的偏移量
reset previous offset for a kafka consumer
我想重置对应于给定消费者的先前偏移量。
原因:我编写应用程序的 spring-boot 消费者代码使用 "earliest" 作为自动偏移重置的值。由于偏移量现在已存储在 __consumer_offsets 中,因此将自动偏移量重置的值更改为最新不起作用。
注意:我使用的kafka版本高于0.9。不确定删除消费者是否有帮助,因为我知道偏移量现在存储在主题 __consumer_offsets.
中
如果您的侦听器实现了 ConsumerSeekAware
,您可以寻找消费者。参见 the documentation。
void registerSeekCallback(ConsumerSeekCallback callback);
void onPartitionsAssigned(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback);
void onIdleContainer(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback);
回调提供:
void seek(String topic, int partition, long offset);
我想重置对应于给定消费者的先前偏移量。
原因:我编写应用程序的 spring-boot 消费者代码使用 "earliest" 作为自动偏移重置的值。由于偏移量现在已存储在 __consumer_offsets 中,因此将自动偏移量重置的值更改为最新不起作用。
注意:我使用的kafka版本高于0.9。不确定删除消费者是否有帮助,因为我知道偏移量现在存储在主题 __consumer_offsets.
中如果您的侦听器实现了 ConsumerSeekAware
,您可以寻找消费者。参见 the documentation。
void registerSeekCallback(ConsumerSeekCallback callback);
void onPartitionsAssigned(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback);
void onIdleContainer(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback);
回调提供:
void seek(String topic, int partition, long offset);