如何使用休息端点重置 Spring Kafka Offset

How do I reset Spring Kafka Offset with a rest end point

我想要一个休息端点,以便在出现任何问题时重播消息。

  1. 暂停 Kafka 容器。
  2. 重置特定 partition/topic 的偏移量。
  3. 恢复 Kafka 容器

如何在Consumer中调用seek()并传递partition/topic。下面是controller

    @GetMapping("/KafkaConsumerMaintainence")
        public void pause(  )
        {
            
    
            kafkaListenerEndpointRegistry.getListenerContainer("foo").pause();
            
            // seek the offset here 
            
            kafkaListenerEndpointRegistry.getListenerContainer("foo").resume();
            
            
        }

@Service
public class KafKaConsumerService extends  AbstractConsumerSeekAware
    @KafkaListener(id = "foo", topics = "mytopic-3", concurrency = "3", groupId = "mytopic-1-groupid")
        public void consumeFromTopic1(@Payload @Valid Foo message, ConsumerRecordMetadata c)  {
    
            dbservice.processInDB(message);
        }
//not able to pass offset
public void seekToOffset(TopicPartition topicPartition) {
        getSeekCallbackFor(topicPartition);
    }

不清楚您的问题是什么,只需这样做...

public void seekToOffset(TopicPartition tp, long offset) {
    getSeekCallbackFor(topicPartition).seek(tp.topic(), tp.partition(), offset);
}