如果 auto.offset.reset=earliest 但主题没有消息,将设置什么消费者偏移量

What consumer offset will be set if auto.offset.reset=earliest but topic has no messages

我有 Kafka 服务器版本 2.4 并设置 log.retention.hours=168(这样主题中的消息将在 7 天后被删除)和 auto.offset.reset=earliest(这样如果消费者没有得到最后提交的偏移量,那么它应该从头开始处理)。因为我使用的是 Kafka 2.4 版本所以默认值 offsets.retention.minutes=10080 (因为我没有在我的应用程序中设置这个 属性)。

我的主题数据是:1,2,3,4,5,6,7,8,9,10

关闭消费者前的当前消费者偏移量:10

结束偏移量:10

消费者最后提交的偏移量:10

假设我的消费者在过去 7 天没有 运行,而我在第 8 天开始使用消费者。所以我最后一次提交的消费者抵消将过期(由于 offsets.retention.minutes=10080 属性)并且主题消息也将被删除(由于 log.retention.hours=168 属性).

所以想知道 auto.offset.reset=earliest 属性 现在将设置什么消费者偏移量?

一旦消费者组从日志中删除,auto.offset.reset将优先,消费者将从头开始消费数据。

My Topic data is : 1,2,3,4,5,6,7,8,9,10

如果topic有以上数据,则从头开始消费,1到10条记录全部消费

My Topic data is : 11,12,13,14,15,16,17,18,19,20

在这种情况下,如果旧数据由于保留而被清除,消费者会将偏移量重置为最早(当时可用的最早偏移量)并从那里开始消费,例如在这种情况下它将消费所有从 11到 20(因为清除了 1 到 10)

虽然 Kafka 主题中没有可用数据,但您的代理仍然知道该分区内的“下一个”偏移量。在您的情况下,该主题的第一个和最后一个偏移量是 10 而它不包含任何数据。

因此,已经提交偏移量 10 的消费者将在再次启动时尝试读取 11,与消费者配置无关 auto.offset.reset

当您的主题有偏移量时,例如,直到 15,而消费者在提交偏移量 10 后关闭时,您的示例将变得更加有趣。现在,假设由于保留策略,所有偏移量都从主题中删除。如果您随后仅启动消费者,那么消费者配置 auto.offset.reset 将生效,如文档中所述:

"What to do when there is no initial offset in Kafka or if the current offset does not exist any more on the server (e.g. because that data has been deleted)"

只要 Kafka 主题为空,就不会为消费者“设置”偏移量。消费者只是试图找到下一个可用的偏移量,或者基于

  • 最后提交的偏移量或,
  • 如果最后提交的偏移量不再存在,则通过 auto.offset.reset.
  • 给出的配置

作为附加说明:即使消息似乎已被保留策略清除,您仍可能会在主题中看到一些数据,因为 即使在保留后数据仍保留在 Kafka 主题中 time/size