从 kafka 消费者控制台脚本获取最后一条消息

Get last message from kafka consumer console script

我们可以通过以下方式获取来自 Kafka 的每条消息:

 bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning

有没有办法仅获取最后一条消息

编辑:

如果您只想监控流中的一些消息 (--max-messages 10),一个方便的命令是:

watch -n5 "./bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic auction --max-messages 10"

我从一些谷歌搜索中得到了可能正确的答案 http://grokbase.com/t/kafka/users/145x930s27/how-to-get-last-message

有人建议用 getOffsetBefore api 找到最后一个偏移量然后 使用该偏移量 - 1 来获取。

我不知道有任何自动操作,但使用这种简单的两步方法,它应该可以工作。 请注意,在我的例子中,它是一个分区主题,如果你有一个未分区的主题,你可以将它的参数保留在外面:

1) 获取主题的最大偏移量(+它们的分区):

bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 --topic mytopic

mytopic:2:11
mytopic:1:7
mytopic:0:15
mytopic:3:8

2) 选择一个主题(+分区)并提供 offset - n 作为参数:

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic mytopic --offset 10 --partition 0  

主题的最后 n 条消息将打印到控制台。 在我的示例中,它将显示 5 条消息 (= 15-10)。

使用 KafkaCat (https://docs.confluent.io/platform/current/app-development/kafkacat-usage.html) 可以读取 Apache Kafka 主题的最后 N 条消息。

使用 KafkaCat 偏移参数 -o 可以指定负值:

kcat -b localhost:9092 -t mytopic -o -1

对最后 5 条消息等使用 -o -5