Kafka RoundRobin 分区程序没有将消息分发到所有分区
Kafka RoundRobin partitioner not distributing messages to all the partitions
我正在尝试使用 Kafka 的 RoundRobinPartitioner class 在所有分区之间均匀分发消息。我的Kafka主题配置如下:
名称:multischemakafkatopicodd
分区数:16
复制因子:2
比如说,如果我要生成 100 条消息,那么每个分区应该有 6 或 7 条消息。但是,我得到了类似的东西:
sh /usr/hdp/current/kafka-broker/bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list 10.0.55.211:9092 --topic multischemakafkatopicodd --time -1
multischemakafkatopicodd:0:26
multischemakafkatopicodd:5:0
multischemakafkatopicodd:10:24
multischemakafkatopicodd:15:0
multischemakafkatopicodd:13:0
multischemakafkatopicodd:8:26
multischemakafkatopicodd:2:26
multischemakafkatopicodd:12:24
multischemakafkatopicodd:14:24
multischemakafkatopicodd:9:0
multischemakafkatopicodd:11:0
multischemakafkatopicodd:4:26
multischemakafkatopicodd:1:0
multischemakafkatopicodd:6:24
multischemakafkatopicodd:7:0
multischemakafkatopicodd:3:0
我想可能是我没有生成足够的消息,所以我尝试使用 1M 记录并将分区数设置为奇数:
主题:multischemakafkatopicodd
分区数:31
复制因子:2
...我明白了。这次每个分区的消息数量分布比较均匀。
sh /usr/hdp/current/kafka-broker/bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list 10.0.55.211:9092 --topic multischemakafkatopicodd --time -1
multischemakafkatopicodd:0:33845
multischemakafkatopicodd:5:34388
multischemakafkatopicodd:10:33837
multischemakafkatopicodd:20:33819
multischemakafkatopicodd:15:33890
multischemakafkatopicodd:25:34414
multischemakafkatopicodd:30:33862
multischemakafkatopicodd:26:34066
multischemakafkatopicodd:9:34088
multischemakafkatopicodd:11:34124
multischemakafkatopicodd:16:33802
multischemakafkatopicodd:4:34061
multischemakafkatopicodd:17:34977
multischemakafkatopicodd:3:34084
multischemakafkatopicodd:24:33849
multischemakafkatopicodd:23:34111
multischemakafkatopicodd:13:34062
multischemakafkatopicodd:28:33876
multischemakafkatopicodd:18:34098
multischemakafkatopicodd:22:34058
multischemakafkatopicodd:8:34079
multischemakafkatopicodd:2:33839
multischemakafkatopicodd:12:34075
multischemakafkatopicodd:29:34132
multischemakafkatopicodd:19:33924
multischemakafkatopicodd:14:34109
multischemakafkatopicodd:1:34088
multischemakafkatopicodd:6:33832
multischemakafkatopicodd:7:34080
multischemakafkatopicodd:27:34188
multischemakafkatopicodd:21:34684
我再次进行了相同的测试,但将分区数量减少到 8 个,我得到了这个结果,我们可以清楚地看到一些分区有接近 15K 条消息,而其他分区有大约 10K 条消息:
multischemakafkatopicodd:0:155927
multischemakafkatopicodd:5:105351
multischemakafkatopicodd:1:107382
multischemakafkatopicodd:4:160533
multischemakafkatopicodd:6:158007
multischemakafkatopicodd:7:105608
multischemakafkatopicodd:2:157934
multischemakafkatopicodd:3:105599
我是不是做错了什么,或者这是它应该如何工作的?
为什么消息分布如此不均衡?
如果有人能帮助我,那就太好了。谢谢
据我了解,分区程序运行良好。但是你必须知道生产者为了最大化性能所做的优化:
生产者不会为每个发送调用将每条消息生成到不同的分区,因为这会过大。
Round-Robin
保证类似的分布,但 可以发送批次 。这意味着,它将根据remainder
(不是模数!)操作缓冲一定数量的发往分区的消息在 RoundRobinPartitioner
的代码中:
int part = Utils.toPositive(nextValue) % availablePartitions.size();
nextValue
是一个 AtomicInteger
,每个 partition/send 调用都会增加 1。因此,余数将始终增加一个 (以循环方式,例如有 4 个分区:0-1-2-3-0-1-2-3-...
),假设在过程。如果发生这种情况,循环可能看起来像 0-1-2-(partition4fails)-0-1-2-(partition4OK)-3-0-...
例子
- 具有 4 个分区的主题
- 每个分区的生产者分区线程缓冲区持有3消息
(留言数计数器从0开始 - new AtomicInteger(0)
)
MsgN % Partitions Partition
0%4 0
1%4 1
2%4 2
3%4 3
4%4 0
5%4 1
6%4 2
7%4 3
8%4 0
... ...
当 第 9 条消息 产生时,第一个分区的缓冲区已满(因为它已经包含 3 条消息),因此可以准备发送到 kafka。如果您在那里停止进程,4 个分区将如下所示:
Partition Offset
0 3
1 0
2 0
3 0
当生成第 10 条消息时,第二个分区的缓冲区也将准备好发送出去,主题如下:
Partition Offset
0 3
1 3
2 0
3 0
在现实生活中,缓冲区通常会保存大量消息(这也可以调整)。比方说,存储了 1000 条消息。对于相同的场景,分区看起来像:
Partition Offset
0 1000
1 1000
2 0
3 0
因此增加分区之间的“视觉”差异。 batch size / buffer size越大越臭名昭著。
这与生产者的 partitioner
线程本身的性质有关:默认情况下,它不会独立发送每条消息,而是在每次代理调用时按顺序存储它们 send multiple messages , 优化系统性能。
Batching is one of the big drivers of efficiency, and to enable
batching the Kafka producer will attempt to accumulate data in memory
and to send out larger batches in a single request
如果生产者是 stopped/started,这种不平衡可能会更加臭名昭著,因为它会重新启动机制,而不管之前选择的分区如何(所以它可以开始发送到与之前选择的相同的分区在停止之前选择,因此增加与上次执行的其他非选择分区的差异)。
在新的执行中,缓冲区将全部为空,因此无论哪个分区接收最多,进程都会重新启动。
所以,你在这里停止进程:
Partition Offset
0 1000
1 1000
2 0
3 0
重新启动保存每个主题的消息数计数器的映射,因为它不是代理的一部分,but of the Partitioner class 来自生产者。如果生产者未正确关闭 and/or 刷新,那些缓存的消息也将丢失。所以,在这种情况下,你得到的是前面逻辑的重复:
MsgN % Partitions Partition
0%4 0
1%4 1
2%4 2
3%4 3
(...)
那会在某个时刻导致这个:
Partition Offset
0 2000
1 2000
2 0
3 0
这是发送过程的非连续执行产生的不平衡,但对于RoundRobinPartitioner
来说是越界的,其本质是基于连续过程(不间断)。
You can verify this behaviour by checking each partition's offset while sending the messages: Only when the selected partition stores n messages, the next elected partition will get its batch of n 封邮件。
注意:示例中显示的数字参考了“完美”场景;在现实生活中,消息也可以被撤销、压缩、失败、刷新,而不管缓冲区大小、分区不可用……导致偏移量,如您的问题中所示。
最后一个冲洗场景示例:
Partition Offset
0 1000
1 1000
2 0
3 0
进程已停止,但生产者已正确关闭并刷新其消息,因此主题如下:
Partition Offset
0 1997
1 1996
2 999
3 998
进程重新启动。刷新第一个分区的缓冲区后,看起来像:
Partition Offset
0 2997
1 1996
2 999
3 998
因此增加了对该机制“公平性”的混淆。但这不是它的错,因为在分区程序的映射、计数器和缓冲区中没有持久性。如果让该过程连续几天不间断地执行,您会发现它确实以“接近相等”的方式平衡消息。
RoundRobinPartitioner
的相关方法:
@Override
public int partition(String topic, Object key, byte[] keyBytes, Object value,
byte[] valueBytes, Cluster cluster)
{
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
int numPartitions = partitions.size();
int nextValue = nextValue(topic);
List<PartitionInfo> availablePartitions=cluster.availablePartitionsForTopic(topic);
if (!availablePartitions.isEmpty()) {
/*remainder calculus in order to select next partition*/
int part = Utils.toPositive(nextValue) % availablePartitions.size();
return availablePartitions.get(part).partition();
} else {
// no partitions are available, give a non-available partition
return Utils.toPositive(nextValue) % numPartitions;
}
}
private int nextValue(String topic)
{
/*Counter of num messages sent. topicCounterMap is part of the producer
process, hence not persisted by default.
It will start by 0 for every topic with each new launch*/
AtomicInteger counter = topicCounterMap.computeIfAbsent(topic, k -> {
return new AtomicInteger(0); });
return counter.getAndIncrement();
}
我正在尝试使用 Kafka 的 RoundRobinPartitioner class 在所有分区之间均匀分发消息。我的Kafka主题配置如下:
名称:multischemakafkatopicodd
分区数:16
复制因子:2
比如说,如果我要生成 100 条消息,那么每个分区应该有 6 或 7 条消息。但是,我得到了类似的东西:
sh /usr/hdp/current/kafka-broker/bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list 10.0.55.211:9092 --topic multischemakafkatopicodd --time -1
multischemakafkatopicodd:0:26
multischemakafkatopicodd:5:0
multischemakafkatopicodd:10:24
multischemakafkatopicodd:15:0
multischemakafkatopicodd:13:0
multischemakafkatopicodd:8:26
multischemakafkatopicodd:2:26
multischemakafkatopicodd:12:24
multischemakafkatopicodd:14:24
multischemakafkatopicodd:9:0
multischemakafkatopicodd:11:0
multischemakafkatopicodd:4:26
multischemakafkatopicodd:1:0
multischemakafkatopicodd:6:24
multischemakafkatopicodd:7:0
multischemakafkatopicodd:3:0
我想可能是我没有生成足够的消息,所以我尝试使用 1M 记录并将分区数设置为奇数:
主题:multischemakafkatopicodd
分区数:31
复制因子:2
...我明白了。这次每个分区的消息数量分布比较均匀。
sh /usr/hdp/current/kafka-broker/bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list 10.0.55.211:9092 --topic multischemakafkatopicodd --time -1
multischemakafkatopicodd:0:33845
multischemakafkatopicodd:5:34388
multischemakafkatopicodd:10:33837
multischemakafkatopicodd:20:33819
multischemakafkatopicodd:15:33890
multischemakafkatopicodd:25:34414
multischemakafkatopicodd:30:33862
multischemakafkatopicodd:26:34066
multischemakafkatopicodd:9:34088
multischemakafkatopicodd:11:34124
multischemakafkatopicodd:16:33802
multischemakafkatopicodd:4:34061
multischemakafkatopicodd:17:34977
multischemakafkatopicodd:3:34084
multischemakafkatopicodd:24:33849
multischemakafkatopicodd:23:34111
multischemakafkatopicodd:13:34062
multischemakafkatopicodd:28:33876
multischemakafkatopicodd:18:34098
multischemakafkatopicodd:22:34058
multischemakafkatopicodd:8:34079
multischemakafkatopicodd:2:33839
multischemakafkatopicodd:12:34075
multischemakafkatopicodd:29:34132
multischemakafkatopicodd:19:33924
multischemakafkatopicodd:14:34109
multischemakafkatopicodd:1:34088
multischemakafkatopicodd:6:33832
multischemakafkatopicodd:7:34080
multischemakafkatopicodd:27:34188
multischemakafkatopicodd:21:34684
我再次进行了相同的测试,但将分区数量减少到 8 个,我得到了这个结果,我们可以清楚地看到一些分区有接近 15K 条消息,而其他分区有大约 10K 条消息:
multischemakafkatopicodd:0:155927
multischemakafkatopicodd:5:105351
multischemakafkatopicodd:1:107382
multischemakafkatopicodd:4:160533
multischemakafkatopicodd:6:158007
multischemakafkatopicodd:7:105608
multischemakafkatopicodd:2:157934
multischemakafkatopicodd:3:105599
我是不是做错了什么,或者这是它应该如何工作的? 为什么消息分布如此不均衡?
如果有人能帮助我,那就太好了。谢谢
据我了解,分区程序运行良好。但是你必须知道生产者为了最大化性能所做的优化:
生产者不会为每个发送调用将每条消息生成到不同的分区,因为这会过大。
Round-Robin
保证类似的分布,但 可以发送批次 。这意味着,它将根据remainder
(不是模数!)操作缓冲一定数量的发往分区的消息在RoundRobinPartitioner
的代码中:int part = Utils.toPositive(nextValue) % availablePartitions.size();
nextValue
是一个 AtomicInteger
,每个 partition/send 调用都会增加 1。因此,余数将始终增加一个 (以循环方式,例如有 4 个分区:0-1-2-3-0-1-2-3-...
),假设在过程。如果发生这种情况,循环可能看起来像 0-1-2-(partition4fails)-0-1-2-(partition4OK)-3-0-...
例子
- 具有 4 个分区的主题
- 每个分区的生产者分区线程缓冲区持有3消息
(留言数计数器从0开始 - new AtomicInteger(0)
)
MsgN % Partitions Partition
0%4 0
1%4 1
2%4 2
3%4 3
4%4 0
5%4 1
6%4 2
7%4 3
8%4 0
... ...
当 第 9 条消息 产生时,第一个分区的缓冲区已满(因为它已经包含 3 条消息),因此可以准备发送到 kafka。如果您在那里停止进程,4 个分区将如下所示:
Partition Offset
0 3
1 0
2 0
3 0
当生成第 10 条消息时,第二个分区的缓冲区也将准备好发送出去,主题如下:
Partition Offset
0 3
1 3
2 0
3 0
在现实生活中,缓冲区通常会保存大量消息(这也可以调整)。比方说,存储了 1000 条消息。对于相同的场景,分区看起来像:
Partition Offset
0 1000
1 1000
2 0
3 0
因此增加分区之间的“视觉”差异。 batch size / buffer size越大越臭名昭著。
这与生产者的 partitioner
线程本身的性质有关:默认情况下,它不会独立发送每条消息,而是在每次代理调用时按顺序存储它们 send multiple messages , 优化系统性能。
Batching is one of the big drivers of efficiency, and to enable batching the Kafka producer will attempt to accumulate data in memory and to send out larger batches in a single request
如果生产者是 stopped/started,这种不平衡可能会更加臭名昭著,因为它会重新启动机制,而不管之前选择的分区如何(所以它可以开始发送到与之前选择的相同的分区在停止之前选择,因此增加与上次执行的其他非选择分区的差异)。
在新的执行中,缓冲区将全部为空,因此无论哪个分区接收最多,进程都会重新启动。
所以,你在这里停止进程:
Partition Offset
0 1000
1 1000
2 0
3 0
重新启动保存每个主题的消息数计数器的映射,因为它不是代理的一部分,but of the Partitioner class 来自生产者。如果生产者未正确关闭 and/or 刷新,那些缓存的消息也将丢失。所以,在这种情况下,你得到的是前面逻辑的重复:
MsgN % Partitions Partition
0%4 0
1%4 1
2%4 2
3%4 3
(...)
那会在某个时刻导致这个:
Partition Offset
0 2000
1 2000
2 0
3 0
这是发送过程的非连续执行产生的不平衡,但对于RoundRobinPartitioner
来说是越界的,其本质是基于连续过程(不间断)。
You can verify this behaviour by checking each partition's offset while sending the messages: Only when the selected partition stores n messages, the next elected partition will get its batch of n 封邮件。
注意:示例中显示的数字参考了“完美”场景;在现实生活中,消息也可以被撤销、压缩、失败、刷新,而不管缓冲区大小、分区不可用……导致偏移量,如您的问题中所示。
最后一个冲洗场景示例:
Partition Offset
0 1000
1 1000
2 0
3 0
进程已停止,但生产者已正确关闭并刷新其消息,因此主题如下:
Partition Offset
0 1997
1 1996
2 999
3 998
进程重新启动。刷新第一个分区的缓冲区后,看起来像:
Partition Offset
0 2997
1 1996
2 999
3 998
因此增加了对该机制“公平性”的混淆。但这不是它的错,因为在分区程序的映射、计数器和缓冲区中没有持久性。如果让该过程连续几天不间断地执行,您会发现它确实以“接近相等”的方式平衡消息。
RoundRobinPartitioner
的相关方法:
@Override
public int partition(String topic, Object key, byte[] keyBytes, Object value,
byte[] valueBytes, Cluster cluster)
{
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
int numPartitions = partitions.size();
int nextValue = nextValue(topic);
List<PartitionInfo> availablePartitions=cluster.availablePartitionsForTopic(topic);
if (!availablePartitions.isEmpty()) {
/*remainder calculus in order to select next partition*/
int part = Utils.toPositive(nextValue) % availablePartitions.size();
return availablePartitions.get(part).partition();
} else {
// no partitions are available, give a non-available partition
return Utils.toPositive(nextValue) % numPartitions;
}
}
private int nextValue(String topic)
{
/*Counter of num messages sent. topicCounterMap is part of the producer
process, hence not persisted by default.
It will start by 0 for every topic with each new launch*/
AtomicInteger counter = topicCounterMap.computeIfAbsent(topic, k -> {
return new AtomicInteger(0); });
return counter.getAndIncrement();
}