什么决定了 Kafka 消费者的偏移量?

What determines Kafka consumer offset?

我对 Kafka 比较陌生。我已经对它做了一些试验,但我不清楚关于消费者补偿的一些事情。到目前为止,据我了解,当消费者启动时,它将开始读取的偏移量由配置设置 auto.offset.reset 决定(如果我错了请纠正我)。

现在举例来说,主题中有 10 条消息(偏移量 0 到 9),并且消费者在它关闭之前(或者在我杀死消费者之前)恰好消费了其中的 5 条消息。然后说我重新启动那个消费者进程。我的问题是:

  1. 如果auto.offset.reset设置为earliest,是否总是从offset 0开始消费?

  2. 如果auto.offset.reset设置为latest,是否会从偏移量5开始消费?

  3. 关于这种场景的行为是否总是确定性的?

如果我的问题中有任何不清楚的地方,请随时发表评论。

比您描述的要复杂一些。
auto.offset.reset config 仅当您的消费者组没有在某处提交有效的偏移量时才会启动(2 个支持的偏移量存储现在是 Kafka 和 Zookeeper),并且它还取决于您使用的消费者类型。

如果您使用高级 java 消费者,请想象以下场景:

  1. 你的消费者组group1中有一个消费者已经消费了5条消息并死亡。下次你启动这个消费者时,它甚至不会使用那个 auto.offset.reset 配置,并且会从它死掉的地方继续,因为它只会从偏移存储(我提到的 Kafka 或 ZK)中获取存储的偏移量。

  2. 您在一个主题中有消息(如您​​所描述的),并且您在一个新的消费者组中启动了一个消费者 group2。任何地方都没有存储偏移量,这次 auto.offset.reset 配置将决定是从主题的开头 (earliest) 还是从主题的结尾 (latest)

影响偏移值对应 earliestlatest 配置的另一件事是日志保留策略。假设您有一个保留配置为 1 小时的主题。您生成了 5 条消息,然后一个小时后您 post 又生成了 5 条消息。 latest 偏移量仍将与前面的示例保持相同,但 earliest 将无法成为 0 因为 Kafka 已经删除了这些消息,因此最早可用的偏移量将是 5.

上面提到的所有内容都与 SimpleConsumer 无关,每次你 运行 它都会决定从哪里开始使用 auto.offset.reset 配置。

如果你使用的Kafka版本低于0.9,你必须将earliest, latest替换为smallest,largest.

只是一个更新:从 Kafka 0.9 开始,Kafka 使用新的 Java 版本的消费者并且 auto.offset.reset 参数名称已更改;来自手册:

What to do when there is no initial offset in Kafka or if the current offset does not exist any more on the server (e.g. because that data has been deleted):

earliest: automatically reset the offset to the earliest offset

latest: automatically reset the offset to the latest offset

none: throw exception to the consumer if no previous offset is found for the consumer's group

anything else: throw exception to the consumer.

我在查看已接受的答案后花了一些时间才找到这个,所以我认为 post 它可能对社区有用。

此外还有offsets.retention.minutes。如果距离上次提交的时间 > offsets.retention.minutes,那么 auto.offset.reset 也会开始