有没有办法从 Kafka 主题中获取最后一条消息?
Is there a way to get the last message from Kafka topic?
我有一个包含多个分区的 Kafka 主题,我想知道 Java 中是否有办法获取该主题的最后一条消息。我不关心分区我只想获取最新消息。
我试过@KafkaListener
,但它仅在主题更新时才获取消息。如果应用程序打开后没有发布任何内容,则不会返回任何内容。
也许倾听者根本不是解决问题的正确方法?
您必须使用来自每个分区的最新消息,然后在客户端进行比较(如果消息包含时间戳,则使用消息上的时间戳)。这是因为 Kafka 不保证分区间的顺序。在分区内,可以确定偏移量最大的消息是最新推送给它的消息。
以下代码片段对我有用。你可以试试这个。在评论中解释。
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
consumer.subscribe(Collections.singletonList(topic));
consumer.poll(Duration.ofSeconds(10));
consumer.assignment().forEach(System.out::println);
AtomicLong maxTimestamp = new AtomicLong();
AtomicReference<ConsumerRecord<String, String>> latestRecord = new AtomicReference<>();
// get the last offsets for each partition
consumer.endOffsets(consumer.assignment()).forEach((topicPartition, offset) -> {
System.out.println("offset: "+offset);
// seek to the last offset of each partition
consumer.seek(topicPartition, (offset==0) ? offset:offset - 1);
// poll to get the last record in each partition
consumer.poll(Duration.ofSeconds(10)).forEach(record -> {
// the latest record in the 'topic' is the one with the highest timestamp
if (record.timestamp() > maxTimestamp.get()) {
maxTimestamp.set(record.timestamp());
latestRecord.set(record);
}
});
});
System.out.println(latestRecord.get());
我有一个包含多个分区的 Kafka 主题,我想知道 Java 中是否有办法获取该主题的最后一条消息。我不关心分区我只想获取最新消息。
我试过@KafkaListener
,但它仅在主题更新时才获取消息。如果应用程序打开后没有发布任何内容,则不会返回任何内容。
也许倾听者根本不是解决问题的正确方法?
您必须使用来自每个分区的最新消息,然后在客户端进行比较(如果消息包含时间戳,则使用消息上的时间戳)。这是因为 Kafka 不保证分区间的顺序。在分区内,可以确定偏移量最大的消息是最新推送给它的消息。
以下代码片段对我有用。你可以试试这个。在评论中解释。
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
consumer.subscribe(Collections.singletonList(topic));
consumer.poll(Duration.ofSeconds(10));
consumer.assignment().forEach(System.out::println);
AtomicLong maxTimestamp = new AtomicLong();
AtomicReference<ConsumerRecord<String, String>> latestRecord = new AtomicReference<>();
// get the last offsets for each partition
consumer.endOffsets(consumer.assignment()).forEach((topicPartition, offset) -> {
System.out.println("offset: "+offset);
// seek to the last offset of each partition
consumer.seek(topicPartition, (offset==0) ? offset:offset - 1);
// poll to get the last record in each partition
consumer.poll(Duration.ofSeconds(10)).forEach(record -> {
// the latest record in the 'topic' is the one with the highest timestamp
if (record.timestamp() > maxTimestamp.get()) {
maxTimestamp.set(record.timestamp());
latestRecord.set(record);
}
});
});
System.out.println(latestRecord.get());