配置Spring 启动@KafkaListener 监听最新消息

Configure Spring Boot @KafkaListener for listening to the latest messages

我正在使用 Spring Boot 的 @KafkaListener 来监视一些服务器的心跳消息:

@KafkaListener(topics = "heartbeat-topic", groupId = "monitor")
public void listenToHeartbeatMsg(String message) {}

问题是侦听器订阅者应用程序启动时,即使服务器已关闭,订阅者应用程序仍会接收到那些以前的心跳消息。

如何解决这个问题并只监听实时心跳消息?

实施 ConsumerSeekAware 并在 onPartitionsAssigned 中调用回调中的 seekToBeginning。

https://docs.spring.io/spring-kafka/docs/current/reference/html/#seek

public class MyListener implements ConsumerSeekAware {

...

    @Override
    public void onPartitionsAssigned(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback) {
        callback.seekToEnd(assignments.keySet());
    }

}