获取发送到 kafka 主题的最后一条消息

Getting the last message sent to a kafka topic

我是 Kafka 的新手,正在研究将专有流媒体服务连接到 Kafka 的原型。

我正在寻找关于主题的最后一条消息的密钥,因为我们的内部流消费者需要使用连接时收到的最后一条消息的 ID 登录。

是否可以使用 KafkaProducer 或 KafkaConsumer 来执行此操作?

我尝试使用消费者执行以下操作,但是当 运行 控制台消费者时,我看到消息重播。

    // Poll so we know we're connected
    consumer.poll(100);
    // Get the assigned partitions
    Set<TopicPartition> assignedPartitions = consumer.assignment();
    // Seek to the end of those partitions
    consumer.seekToEnd(assignedPartitions);

    for(TopicPartition partition : assignedPartitions) {
        final long offset = consumer.committed(partition).offset();
        // Seek to the previous message
        consumer.seek(partition,offset - 1);
    }

    // Now get the last message
    ConsumerRecords<String, String> records = consumer.poll(100);
    for (ConsumerRecord<String, String> record : records) {
        lastKey = record.key();
    }
    consumer.close();

这是预期的行为还是我走错了路?

问题出在 final long offset = consumer.committed(partition).offset() 行,因为 link api 指的是 committed 方法是为给定分区获取 the last committed offset,即:您的消费者告诉的最后一个偏移量它已经读取的kafka服务器。 所以,你肯定会得到 messages replayed,因为你总是从特定的偏移量读取。 因为我认为我只需要删除第一个块。

检查记录数并获取最后一条消息:

    // Poll so we know we're connected
    consumer.poll(100);
    // Get the assigned partitions
    Set<TopicPartition> assignedPartitions = consumer.assignment();
    // Seek to the end of those partitions
    consumer.seekToEnd(assignedPartitions);

    for (TopicPartition partition : assignedPartitions) {
        final long offset = consumer.committed(partition).offset();
        // Seek to the previous message
        consumer.seek(partition, offset - 1);
    }

    // Now get the last message
    ConsumerRecords<String, String> records = consumer.poll(100);
    int size = records.count();
    int index = 0;
    for (ConsumerRecord<String, String> record : records) {
        index = index + 1;
        if (index == size) {
            String value = record.value();
            System.out.println("Last Message = " + value);
        }
    }
    consumer.close();