当剩余的记录超过 `max.poll.records` 时,Kafka 如何决定哪些记录包含在消费者轮询循环中?

How does Kafka decide which records are contained in the consumer poll loop when there are more than `max.poll.records` records left?

我有一个 Kafka 消费者组消费多个主题(每个主题有多个分区)。所有主题在每个分区上都包含大量记录。 我目前正在尝试了解消费者最初开始消费时的行为。 特别是,我想知道经纪人如何决定哪些记录首先到达客户。

以下几个方面值得注意:

我在消费者配置参数中找不到任何允许我更改此行为的内容。这不是真正的问题,因为所有消息最终都会被读取。但我想更详细地了解这种行为。

所以我的问题是:代理如何决定哪些记录最终出现在消费者轮询循环的结果中?

消费者使用 Fetch requests.

从 Kafka 获取记录

如果你看一下协议,这个 API 相当复杂并且有很多字段,但我们可以专注于与你的问题相关的几个字段:

  • max_wait_ms:这表示如果有 no/not 足够的可用记录,代理应该等待多长时间。这可以使用 fetch.max.wait.ms.
  • 配置
  • min_bytes:这表示代理需要响应多少数据(记录的大小)。这可以使用 fetch.min.bytes.
  • 配置
  • max_bytes:这表示响应的最大大小。这可以使用 fetch.max.bytes.
  • 配置

一旦代理达到这些限制之一,它就会发回响应。

Fetch 请求还指示消费者想要读取哪些分区。对于每个分区,partition_max_bytes 指示该分区的最大大小 return。这可以使用 max.partition.fetch.bytes.

配置

过去,Fetch 请求包含完整的分区列表。经纪人将按顺序迭代列表,直到达到上述限制之一。

从 1.1 (KIP-227) 开始,它有点复杂,因为消费者使用提取会话来避免在每个提取请求中发送完整列表。为了简单起见,代理使用 FetchSessions 在分区列表上保留一个迭代器,以确保公平地从所有分区获取记录。

现在让我们看看客户端...

此时,您可能已经注意到我没有提到 max.poll.records。此设置仅在客户端使用。消费者尝试有效地获取记录。因此,即使您设置了 max.poll.records=1,消费者也可能会大批量获取记录,将它们保存在内存中,并且每次调用 poll() 时只有 return 1 条记录。这避免了不必要地发送许多小请求和超载代理。

消费者还会跟踪它在内存中的记录。如果它已经有分区的记录,则不能将其包含在下一个 Fetch 请求中。

因此,虽然每个 Fetch 响应可能不包括所有分区的数据,但在一段时间内,应该公平地提取所有分区。


我已经简化了过程以使其简短,但如果您想深入了解这个逻辑,我建议您检查以下内容 类:

  • Fetcher.java:这是客户端逻辑,用于确定从代理获取什么以及 return 在 poll() 中获取什么。
  • ReplicaManager.scala:这是服务器端逻辑,用于确定在 Fetch 响应中 return 的内容。参见 fetchMessages()
  • FetchSession.scala: This is the session logic introduced by KIP-227