Spring Kafka 不尊重 max.poll.records 有奇怪的行为

Spring Kafka don't respect max.poll.records with strange behavior

好吧,我正在尝试以下场景:

  1. 在 application.properties 中将 max.poll.records 设置为 50。
  2. 在 application.properties 中将 enable-auto-commit=false 和 ack-mode 设置为手动。
  3. 在我的方法中添加了@KafkaListener,但不提交任何消息,只读取、记录但不进行ACK。

实际上,在我的 Kafka 主题中,我有 500 条消息要使用,所以我期待以下行为:

  1. Spring Kafka poll() 50 条消息(偏移量 0 到 50)。
  2. 正如我所说,我没有提交任何内容,只是记录了 50 条消息。
  3. 在接下来的 Spring Kafka poll() 调用中,获取与步骤 1 相同的 50 条消息(偏移量 0 到 50)。Spring Kafka,在我看来,应该继续这样做循环(步骤 1-3)始终读取相同的消息。

但是会发生以下情况:

  1. Spring Kafka poll() 50 条消息(偏移量 0 到 50)。
  2. 正如我所说,我没有提交任何内容,只是记录了 50 条消息。
  3. 在接下来的 Spring Kafka poll() 调用中,获取 NEXT 50 条消息,与步骤 1 不同(偏移量 50 到 100)。

Spring Kafka 以 50 条消息为单位读取 500 条消息,但不提交任何内容。如果我关闭应用程序并重新启动,则会再次收到 500 条消息。

所以,我的疑惑:

  1. 如果我将 max.poll.recors 配置为 50,如果我没有提交任何内容,spring Kafka 如何获取接下来的 50 条记录?我理解 poll() 方法应该 return 相同的记录。
  2. Spring Kafka 有缓存吗?如果是,如果我在没有提交的情况下在缓存中获得 100 万条记录,这可能是个问题。

消费者不提交偏移量只会在以下情况下产生影响:

  • 您的消费者在读取 200 条消息后崩溃,当您重新启动它时,它将重新从 0 开始。
  • 您的消费者不再分配分区。

所以在一个完美的世界里,你根本不需要提交,它会消耗所有的消息,因为消费者首先要求 1-50,然后是 51-100。

但是如果消费者崩溃了,没有人知道消费者读取的偏移量是多少。如果消费者提交了偏移量,当它重新启动时,它可以检查偏移量主题以查看崩溃的消费者离开的位置并从那里开始。

max.poll.records 定义一次性获取多少条记录,但未定义要获取哪些记录。

你的第一个问题:

If I configured the max.poll.recors to 50, how spring Kafka get the next 50 records if I didn't commit anything? I understand the poll() method should return the same records.

首先,为了确保你没有犯任何错误,你必须确保你理解以下3个参数,我相信你理解。

  • ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,设置为false(这也是推荐的默认值)。如果它设置为 false,请注意 auto.commit.interval.ms 变得无关紧要。查看 this 文档:

Because the listener container has it’s own mechanism for committing offsets, it prefers the Kafka ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG to be false. Starting with version 2.3, it unconditionally sets it to false unless specifically set in the consumer factory or the container’s consumer property overrides.

  • factory.getContainerProperties().setAckMode(AckMode.MANUAL);您有责任承认。 (正在使用事务时忽略)并且ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG不能是true.

  • factory.getContainerProperties().setSyncCommits(true/false); 设置容器负责提交时是否调用consumer.commitSync()commitAsync()默认为真。这负责与 Kafka 同步,没有别的,如果设置为 true,该调用将阻塞直到 Kafka 响应。

其次,没有消费者poll()不会return相同的记录。对于当前运行消费者,它使用一些内部索引跟踪它在内存中的偏移量,我们不必关心提交偏移量。另请参阅@GaryRussell 的解释 here.

简而言之,他解释说:

Once the records have been returned by the poll (and offsets not committed), they won't be returned again unless you restart the consumer or perform seek() operations on the consumer to reset the offset to the unprocessed ones.


你的第二个问题:

Does Spring Kafka have some cache? If yes, this can be a problem if I get 1million records in cache without commit.

没有“缓存”,全是偏移量和提交,解释同上。



现在要实现你想做的事情,你可以考虑做两件事在获取前 50 条记录后,即下一个 poll():

  • 或者,以编程方式重新启动容器
  • 或致电consumer.seek(partition, offset);


奖金:
无论您选择何种配置,您始终可以查看 结果 ,方法是查看此输出的 LAG 列:

kafka-consumer-groups.bat --bootstrap-server localhost:9091 --describe --group your_group_name