一个 Kafka 消费者如何从多个分区读取数据?
How does one Kafka consumer read from more than one partition?
我想知道一个消费者如何从多个分区消费,具体来说,从不同分区读取消息的顺序是什么?
我看了一眼源代码 (Consumer, Fetcher),但我无法完全理解它。
这是我认为会发生的事情:
按顺序读取分区。即:在继续下一个分区之前,将读取一个分区中的所有消息。如果我们达到 max.poll.records
而没有消耗整个分区,下一次提取将继续读取当前分区,直到它耗尽,然后再继续下一个。
我尝试将 max.poll.records
设置为相对较低的数字,然后看看会发生什么。
如果我将消息发送到一个主题然后启动一个消费者,那么在继续到下一个分区之前会从一个分区读取所有消息,即使该分区中的消息数量高于 max.poll.records
.
然后我尝试通过向该分区连续发送消息(使用 JMeter)来查看是否可以将消费者“锁定”在一个分区中。但是我做不到:来自其他分区的消息也在被读取。
消费者正在以贪婪的循环方式从其分配的分区中轮询消息。
例如如果 max.poll.records
设置为 100,并且有 2 个分区分配了 A,B。消费者将尝试从 A 中轮询 100 条消息。如果分区 A 没有 100 条可用消息,它将轮询分区 B 中剩余的 100 条消息。
虽然这并不理想,但这样就不会缺少分区。
这也解释了为什么不能保证分区之间的顺序。
我已经阅读了评论中链接的问题的答案中提到的KIP,我想我终于明白了消费者是如何工作的。
有两个主要的配置选项会影响数据的使用方式:
max.partition.fetch.bytes
:服务器将为给定分区return的最大数据量
max.poll.records
:消费者每次轮询returned 的最大记录数
从每个分区取数据的过程是贪婪的,以循环方式进行。贪婪意味着从每个分区中检索尽可能多的记录;如果一个partition中的所有记录占用小于max.partition.fetch.bytes
,则全部取出;否则,只会获取 max.partition.fetch.bytes
。
现在,并非所有获取的记录都会在轮询调用中 returned。只有 max.poll.records
会被 return 编辑。
剩余的记录将被保留以供下次调用轮询。
而且,如果保留的记录数小于max.poll.records
,poll方法会在returning之前开始新一轮的fetching(pre-fetching)。这意味着,通常情况下,消费者正在处理记录,同时正在获取新记录。
如果某些分区收到的消息比其他分区多得多,这可能会导致不太活跃的分区长时间得不到处理。
The only downside to this approach is that it could lead to some partitions going unconsumed for an extended amount of time when there is a large imbalance between the partition's respective message rates. For example, suppose that a consumer with max messages set to 1 fetches data from partitions A and B. If the returned fetch includes 1000 records from A and no records from B, the consumer will have to process all 1000 available records from A before fetching on partition B again.
为了防止这种情况,我们可以减少 max.partition.fetch.bytes
。
我想知道一个消费者如何从多个分区消费,具体来说,从不同分区读取消息的顺序是什么?
我看了一眼源代码 (Consumer, Fetcher),但我无法完全理解它。
这是我认为会发生的事情:
按顺序读取分区。即:在继续下一个分区之前,将读取一个分区中的所有消息。如果我们达到 max.poll.records
而没有消耗整个分区,下一次提取将继续读取当前分区,直到它耗尽,然后再继续下一个。
我尝试将 max.poll.records
设置为相对较低的数字,然后看看会发生什么。
如果我将消息发送到一个主题然后启动一个消费者,那么在继续到下一个分区之前会从一个分区读取所有消息,即使该分区中的消息数量高于 max.poll.records
.
然后我尝试通过向该分区连续发送消息(使用 JMeter)来查看是否可以将消费者“锁定”在一个分区中。但是我做不到:来自其他分区的消息也在被读取。
消费者正在以贪婪的循环方式从其分配的分区中轮询消息。
例如如果 max.poll.records
设置为 100,并且有 2 个分区分配了 A,B。消费者将尝试从 A 中轮询 100 条消息。如果分区 A 没有 100 条可用消息,它将轮询分区 B 中剩余的 100 条消息。
虽然这并不理想,但这样就不会缺少分区。
这也解释了为什么不能保证分区之间的顺序。
我已经阅读了评论中链接的问题的答案中提到的KIP,我想我终于明白了消费者是如何工作的。
有两个主要的配置选项会影响数据的使用方式:
max.partition.fetch.bytes
:服务器将为给定分区return的最大数据量max.poll.records
:消费者每次轮询returned 的最大记录数
从每个分区取数据的过程是贪婪的,以循环方式进行。贪婪意味着从每个分区中检索尽可能多的记录;如果一个partition中的所有记录占用小于max.partition.fetch.bytes
,则全部取出;否则,只会获取 max.partition.fetch.bytes
。
现在,并非所有获取的记录都会在轮询调用中 returned。只有 max.poll.records
会被 return 编辑。
剩余的记录将被保留以供下次调用轮询。
而且,如果保留的记录数小于max.poll.records
,poll方法会在returning之前开始新一轮的fetching(pre-fetching)。这意味着,通常情况下,消费者正在处理记录,同时正在获取新记录。
如果某些分区收到的消息比其他分区多得多,这可能会导致不太活跃的分区长时间得不到处理。
The only downside to this approach is that it could lead to some partitions going unconsumed for an extended amount of time when there is a large imbalance between the partition's respective message rates. For example, suppose that a consumer with max messages set to 1 fetches data from partitions A and B. If the returned fetch includes 1000 records from A and no records from B, the consumer will have to process all 1000 available records from A before fetching on partition B again.
为了防止这种情况,我们可以减少 max.partition.fetch.bytes
。