当剩余的记录超过 `max.poll.records` 时,Kafka 如何决定哪些记录包含在消费者轮询循环中?
How does Kafka decide which records are contained in the consumer poll loop when there are more than `max.poll.records` records left?
我有一个 Kafka 消费者组消费多个主题(每个主题有多个分区)。所有主题在每个分区上都包含大量记录。
我目前正在尝试了解消费者最初开始消费时的行为。
特别是,我想知道经纪人如何决定哪些记录首先到达客户。
以下几个方面值得注意:
- 记录比消费者在一次往返中可以处理的记录多得多(即记录比消费者的
max.poll.records
配置多)
- 消费者必须阅读来自多个主题和多个分区的记录
- 我天真地假设代理 returns 在每个轮询循环中记录每个主题,以便消费者以相似的速度阅读所有主题。但这似乎并非如此。显然,它一次对单个主题的记录进行优先排序,在没有明显模式的情况下切换主题(至少这是我在消费者指标中看到的)。
我在消费者配置参数中找不到任何允许我更改此行为的内容。这不是真正的问题,因为所有消息最终都会被读取。但我想更详细地了解这种行为。
所以我的问题是:代理如何决定哪些记录最终出现在消费者轮询循环的结果中?
消费者使用 Fetch requests.
从 Kafka 获取记录
如果你看一下协议,这个 API 相当复杂并且有很多字段,但我们可以专注于与你的问题相关的几个字段:
max_wait_ms
:这表示如果有 no/not 足够的可用记录,代理应该等待多长时间。这可以使用 fetch.max.wait.ms
. 配置
min_bytes
:这表示代理需要响应多少数据(记录的大小)。这可以使用 fetch.min.bytes
. 配置
max_bytes
:这表示响应的最大大小。这可以使用 fetch.max.bytes
. 配置
一旦代理达到这些限制之一,它就会发回响应。
Fetch 请求还指示消费者想要读取哪些分区。对于每个分区,partition_max_bytes
指示该分区的最大大小 return。这可以使用 max.partition.fetch.bytes
.
配置
过去,Fetch 请求包含完整的分区列表。经纪人将按顺序迭代列表,直到达到上述限制之一。
从 1.1 (KIP-227) 开始,它有点复杂,因为消费者使用提取会话来避免在每个提取请求中发送完整列表。为了简单起见,代理使用 FetchSession
s 在分区列表上保留一个迭代器,以确保公平地从所有分区获取记录。
现在让我们看看客户端...
此时,您可能已经注意到我没有提到 max.poll.records
。此设置仅在客户端使用。消费者尝试有效地获取记录。因此,即使您设置了 max.poll.records=1
,消费者也可能会大批量获取记录,将它们保存在内存中,并且每次调用 poll()
时只有 return 1 条记录。这避免了不必要地发送许多小请求和超载代理。
消费者还会跟踪它在内存中的记录。如果它已经有分区的记录,则不能将其包含在下一个 Fetch 请求中。
因此,虽然每个 Fetch 响应可能不包括所有分区的数据,但在一段时间内,应该公平地提取所有分区。
我已经简化了过程以使其简短,但如果您想深入了解这个逻辑,我建议您检查以下内容 类:
- Fetcher.java:这是客户端逻辑,用于确定从代理获取什么以及 return 在
poll()
中获取什么。
- ReplicaManager.scala:这是服务器端逻辑,用于确定在 Fetch 响应中 return 的内容。参见
fetchMessages()
。
- FetchSession.scala: This is the session logic introduced by KIP-227
我有一个 Kafka 消费者组消费多个主题(每个主题有多个分区)。所有主题在每个分区上都包含大量记录。 我目前正在尝试了解消费者最初开始消费时的行为。 特别是,我想知道经纪人如何决定哪些记录首先到达客户。
以下几个方面值得注意:
- 记录比消费者在一次往返中可以处理的记录多得多(即记录比消费者的
max.poll.records
配置多) - 消费者必须阅读来自多个主题和多个分区的记录
- 我天真地假设代理 returns 在每个轮询循环中记录每个主题,以便消费者以相似的速度阅读所有主题。但这似乎并非如此。显然,它一次对单个主题的记录进行优先排序,在没有明显模式的情况下切换主题(至少这是我在消费者指标中看到的)。
我在消费者配置参数中找不到任何允许我更改此行为的内容。这不是真正的问题,因为所有消息最终都会被读取。但我想更详细地了解这种行为。
所以我的问题是:代理如何决定哪些记录最终出现在消费者轮询循环的结果中?
消费者使用 Fetch requests.
从 Kafka 获取记录如果你看一下协议,这个 API 相当复杂并且有很多字段,但我们可以专注于与你的问题相关的几个字段:
max_wait_ms
:这表示如果有 no/not 足够的可用记录,代理应该等待多长时间。这可以使用fetch.max.wait.ms
. 配置
min_bytes
:这表示代理需要响应多少数据(记录的大小)。这可以使用fetch.min.bytes
. 配置
max_bytes
:这表示响应的最大大小。这可以使用fetch.max.bytes
. 配置
一旦代理达到这些限制之一,它就会发回响应。
Fetch 请求还指示消费者想要读取哪些分区。对于每个分区,partition_max_bytes
指示该分区的最大大小 return。这可以使用 max.partition.fetch.bytes
.
过去,Fetch 请求包含完整的分区列表。经纪人将按顺序迭代列表,直到达到上述限制之一。
从 1.1 (KIP-227) 开始,它有点复杂,因为消费者使用提取会话来避免在每个提取请求中发送完整列表。为了简单起见,代理使用 FetchSession
s 在分区列表上保留一个迭代器,以确保公平地从所有分区获取记录。
现在让我们看看客户端...
此时,您可能已经注意到我没有提到 max.poll.records
。此设置仅在客户端使用。消费者尝试有效地获取记录。因此,即使您设置了 max.poll.records=1
,消费者也可能会大批量获取记录,将它们保存在内存中,并且每次调用 poll()
时只有 return 1 条记录。这避免了不必要地发送许多小请求和超载代理。
消费者还会跟踪它在内存中的记录。如果它已经有分区的记录,则不能将其包含在下一个 Fetch 请求中。
因此,虽然每个 Fetch 响应可能不包括所有分区的数据,但在一段时间内,应该公平地提取所有分区。
我已经简化了过程以使其简短,但如果您想深入了解这个逻辑,我建议您检查以下内容 类:
- Fetcher.java:这是客户端逻辑,用于确定从代理获取什么以及 return 在
poll()
中获取什么。 - ReplicaManager.scala:这是服务器端逻辑,用于确定在 Fetch 响应中 return 的内容。参见
fetchMessages()
。 - FetchSession.scala: This is the session logic introduced by KIP-227