配置Spring 启动@KafkaListener 监听最新消息
Configure Spring Boot @KafkaListener for listening to the latest messages
我正在使用 Spring Boot 的 @KafkaListener 来监视一些服务器的心跳消息:
@KafkaListener(topics = "heartbeat-topic", groupId = "monitor")
public void listenToHeartbeatMsg(String message) {}
问题是侦听器订阅者应用程序启动时,即使服务器已关闭,订阅者应用程序仍会接收到那些以前的心跳消息。
如何解决这个问题并只监听实时心跳消息?
实施 ConsumerSeekAware
并在 onPartitionsAssigned
中调用回调中的 seekToBeginning。
见https://docs.spring.io/spring-kafka/docs/current/reference/html/#seek
public class MyListener implements ConsumerSeekAware {
...
@Override
public void onPartitionsAssigned(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback) {
callback.seekToEnd(assignments.keySet());
}
}
我正在使用 Spring Boot 的 @KafkaListener 来监视一些服务器的心跳消息:
@KafkaListener(topics = "heartbeat-topic", groupId = "monitor")
public void listenToHeartbeatMsg(String message) {}
问题是侦听器订阅者应用程序启动时,即使服务器已关闭,订阅者应用程序仍会接收到那些以前的心跳消息。
如何解决这个问题并只监听实时心跳消息?
实施 ConsumerSeekAware
并在 onPartitionsAssigned
中调用回调中的 seekToBeginning。
见https://docs.spring.io/spring-kafka/docs/current/reference/html/#seek
public class MyListener implements ConsumerSeekAware {
...
@Override
public void onPartitionsAssigned(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback) {
callback.seekToEnd(assignments.keySet());
}
}