获取发送到 kafka 主题的最后一条消息
Getting the last message sent to a kafka topic
我是 Kafka 的新手,正在研究将专有流媒体服务连接到 Kafka 的原型。
我正在寻找关于主题的最后一条消息的密钥,因为我们的内部流消费者需要使用连接时收到的最后一条消息的 ID 登录。
是否可以使用 KafkaProducer 或 KafkaConsumer 来执行此操作?
我尝试使用消费者执行以下操作,但是当 运行 控制台消费者时,我看到消息重播。
// Poll so we know we're connected
consumer.poll(100);
// Get the assigned partitions
Set<TopicPartition> assignedPartitions = consumer.assignment();
// Seek to the end of those partitions
consumer.seekToEnd(assignedPartitions);
for(TopicPartition partition : assignedPartitions) {
final long offset = consumer.committed(partition).offset();
// Seek to the previous message
consumer.seek(partition,offset - 1);
}
// Now get the last message
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
lastKey = record.key();
}
consumer.close();
这是预期的行为还是我走错了路?
问题出在 final long offset = consumer.committed(partition).offset()
行,因为 link api 指的是 committed
方法是为给定分区获取 the last committed offset
,即:您的消费者告诉的最后一个偏移量它已经读取的kafka服务器。
所以,你肯定会得到 messages replayed
,因为你总是从特定的偏移量读取。
因为我认为我只需要删除第一个块。
检查记录数并获取最后一条消息:
// Poll so we know we're connected
consumer.poll(100);
// Get the assigned partitions
Set<TopicPartition> assignedPartitions = consumer.assignment();
// Seek to the end of those partitions
consumer.seekToEnd(assignedPartitions);
for (TopicPartition partition : assignedPartitions) {
final long offset = consumer.committed(partition).offset();
// Seek to the previous message
consumer.seek(partition, offset - 1);
}
// Now get the last message
ConsumerRecords<String, String> records = consumer.poll(100);
int size = records.count();
int index = 0;
for (ConsumerRecord<String, String> record : records) {
index = index + 1;
if (index == size) {
String value = record.value();
System.out.println("Last Message = " + value);
}
}
consumer.close();
我是 Kafka 的新手,正在研究将专有流媒体服务连接到 Kafka 的原型。
我正在寻找关于主题的最后一条消息的密钥,因为我们的内部流消费者需要使用连接时收到的最后一条消息的 ID 登录。
是否可以使用 KafkaProducer 或 KafkaConsumer 来执行此操作?
我尝试使用消费者执行以下操作,但是当 运行 控制台消费者时,我看到消息重播。
// Poll so we know we're connected
consumer.poll(100);
// Get the assigned partitions
Set<TopicPartition> assignedPartitions = consumer.assignment();
// Seek to the end of those partitions
consumer.seekToEnd(assignedPartitions);
for(TopicPartition partition : assignedPartitions) {
final long offset = consumer.committed(partition).offset();
// Seek to the previous message
consumer.seek(partition,offset - 1);
}
// Now get the last message
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
lastKey = record.key();
}
consumer.close();
这是预期的行为还是我走错了路?
问题出在 final long offset = consumer.committed(partition).offset()
行,因为 link api 指的是 committed
方法是为给定分区获取 the last committed offset
,即:您的消费者告诉的最后一个偏移量它已经读取的kafka服务器。
所以,你肯定会得到 messages replayed
,因为你总是从特定的偏移量读取。
因为我认为我只需要删除第一个块。
检查记录数并获取最后一条消息:
// Poll so we know we're connected
consumer.poll(100);
// Get the assigned partitions
Set<TopicPartition> assignedPartitions = consumer.assignment();
// Seek to the end of those partitions
consumer.seekToEnd(assignedPartitions);
for (TopicPartition partition : assignedPartitions) {
final long offset = consumer.committed(partition).offset();
// Seek to the previous message
consumer.seek(partition, offset - 1);
}
// Now get the last message
ConsumerRecords<String, String> records = consumer.poll(100);
int size = records.count();
int index = 0;
for (ConsumerRecord<String, String> record : records) {
index = index + 1;
if (index == size) {
String value = record.value();
System.out.println("Last Message = " + value);
}
}
consumer.close();