如何让kafka消费者从上次消费的偏移量而不是从头开始读取

How to make kafka consumer to read from last consumed offset but not from beginning

我是 kafka 的新手,正在尝试了解是否有办法从上次使用的偏移量而不是从头开始读取消息。

我正在写一个例子,这样我的意图就不会偏离。

Eg:
1) I produced 5 messages at 7:00 PM and console consumer consumed those.
2) I stopped consumer at 7:10 PM
3) I produced 10 message at 7:20 PM. No consumer had read those messages.
4) Now, i have started console consumer at 7:30 PM, without from-beginning.
5) Now, it Will read the messages produced after it has started. Not the earlier ones, which were produced at 7.20 PM

有没有办法获取上次消耗的偏移量产生的消息。?

您应该在 largest 上的消费者配置中设置 auto.offset.reset 参数,这样它将读取上次提交偏移后的所有消息。

I am new to kafka and trying to understand if there is a way to read messages from last consumed offset, but not from beginning.

是的,可以使用控制台使用者从上次使用的偏移量中读取。您必须在调用 kafka-console-consumer 时添加 consumer.config 标志。

示例:-

[root@sandbox bin]# ./kafka-console-consumer.sh --topic test1 --zookeeper localhost:2181 --consumer.config /home/mrnakumar/consumer.properties

这里/home/mrnakumar/consumer.properties是一个包含group.id的文件。 这是 /home/mrnakumar/consumer.properties 的样子:-

group.id=consoleGroup

Withoug 使用 consumer.config,可以通过使用 --from-beginning] 从开头 [ 或仅从日志结尾读取。 End of the Log表示消费者启动后发布的所有消息。

在消费者配置中设置 auto.offset.reset=earliest 和固定的 group.id=something 将在最后提交的偏移量处启动消费者。在您的情况下,它应该在 7:20 处的第一条消息处开始使用。如果您希望它在启动后开始阅读发布的消息,那么 auto.offset.reset=latest 将忽略在 7:20 发送的 10 条消息并阅读它启动后收到的任何消息。

如果您希望它从头开始,您必须在第一个 consumer.poll() 之后调用 seekToBeginning,或者将消费者组 ID 更改为唯一的。