如何使用休息端点重置 Spring Kafka Offset
How do I reset Spring Kafka Offset with a rest end point
我想要一个休息端点,以便在出现任何问题时重播消息。
- 暂停 Kafka 容器。
- 重置特定 partition/topic 的偏移量。
- 恢复 Kafka 容器
如何在Consumer中调用seek()并传递partition/topic。下面是controller
@GetMapping("/KafkaConsumerMaintainence")
public void pause( )
{
kafkaListenerEndpointRegistry.getListenerContainer("foo").pause();
// seek the offset here
kafkaListenerEndpointRegistry.getListenerContainer("foo").resume();
}
@Service
public class KafKaConsumerService extends AbstractConsumerSeekAware
@KafkaListener(id = "foo", topics = "mytopic-3", concurrency = "3", groupId = "mytopic-1-groupid")
public void consumeFromTopic1(@Payload @Valid Foo message, ConsumerRecordMetadata c) {
dbservice.processInDB(message);
}
//not able to pass offset
public void seekToOffset(TopicPartition topicPartition) {
getSeekCallbackFor(topicPartition);
}
不清楚您的问题是什么,只需这样做...
public void seekToOffset(TopicPartition tp, long offset) {
getSeekCallbackFor(topicPartition).seek(tp.topic(), tp.partition(), offset);
}
我想要一个休息端点,以便在出现任何问题时重播消息。
- 暂停 Kafka 容器。
- 重置特定 partition/topic 的偏移量。
- 恢复 Kafka 容器
如何在Consumer中调用seek()并传递partition/topic。下面是controller
@GetMapping("/KafkaConsumerMaintainence")
public void pause( )
{
kafkaListenerEndpointRegistry.getListenerContainer("foo").pause();
// seek the offset here
kafkaListenerEndpointRegistry.getListenerContainer("foo").resume();
}
@Service
public class KafKaConsumerService extends AbstractConsumerSeekAware
@KafkaListener(id = "foo", topics = "mytopic-3", concurrency = "3", groupId = "mytopic-1-groupid")
public void consumeFromTopic1(@Payload @Valid Foo message, ConsumerRecordMetadata c) {
dbservice.processInDB(message);
}
//not able to pass offset
public void seekToOffset(TopicPartition topicPartition) {
getSeekCallbackFor(topicPartition);
}
不清楚您的问题是什么,只需这样做...
public void seekToOffset(TopicPartition tp, long offset) {
getSeekCallbackFor(topicPartition).seek(tp.topic(), tp.partition(), offset);
}