Kafka如何处理与分区相关的Keyed Message
How Kafka Handles Keyed Message Related to Partition
谁能解释一下:
- Kafka实际上是如何存储键控消息的?分区是否只分配给一个键?我的意思是,分区是否可以存储具有多个密钥的消息?
- 如果第一个问题的答案是肯定的,那么如果键的数量多于可用的分区怎么办?
我的用例是,我正在考虑将大量船舶数据发送给经纪人并通过 ship_id
(MMSI,如果您知道)作为密钥进行存储。问题是,我不知道那时会收到多少船。所以我不能提前定义分区号。
is it possible that a partition stores messages with multiple keys?
是的,murmur2 哈希(Kafka 使用的算法),mod 一个主题中的分区数可以得出相同的数字。例如,如果你只有一个分区,那么任何键显然都去同一个分区
how if the number of key is more than partition available?
散列是 modulo'd,所以它总是被分配一个有效的分区
现在,如果您有一个定义明确的键,就可以保证将消息按顺序放入分区中,因此分区数量的答案实际上取决于单个分区可以处理多少吞吐量,并且没有短的回答 - 您要发送多少数据,一个消费者在 "peak" 消费时从一个分区获取数据的速度有多快?进行适当的性能测试,然后在新主题上扩展分区数量以处理潜在的未来负载
您还需要考虑 "hot" / "cold" 数据。例如,如果您有 10 个分区映射到 ID 的第一位数字,那么您的所有数据都以偶数开头,您最终将有一半的分区为空
1. Kafka messages are form of key and value and it stored into in topics. Topics are partitioned into multiple partitioner and each
partition further divided into segment each segment has a log file to
store the actual message in key - value form and index or offset of
the message.
Key是可选的,用于标识要存储消息的分区,如果key为null,则消息以循环方式存储,而如果key不为null,则它将使用具有保证选择的模块分区大小的哈希键分区之一。
例如
hash(key)%num_partition
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
int numPartitions = partitions.size();
if (keyBytes == null) {
int nextValue = nextValue(topic);
List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
if (availablePartitions.size() > 0) {
int part = Utils.toPositive(nextValue) % availablePartitions.size();
return availablePartitions.get(part).partition();
} else {
// no partitions are available, give a non-available partition
return Utils.toPositive(nextValue) % numPartitions;
}
} else {
// hash the keyBytes to choose a partition
return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
}
}
因此,自从它使用模块以来,它将始终将消息存储在可用分区范围内,这就是多个键可能进入同一分区的原因。消息键的主要好处是将相同的消息键存储到相同的分区。
2. 所以你不用担心分区的数量可以根据键的数量来定义。如上所述,密钥用于根据默认分区程序逻辑将消息分桶到不同的分区。分区号基本上有助于将进程并行化以实现高吞吐量。
Note:You also make sure by using key for partitioned data may cause
unequal distribution so if you don't worry just keep key null which select partition on round-robin
Other approach is to create custom partitioner to further refine partition selection logic.
here
谁能解释一下:
- Kafka实际上是如何存储键控消息的?分区是否只分配给一个键?我的意思是,分区是否可以存储具有多个密钥的消息?
- 如果第一个问题的答案是肯定的,那么如果键的数量多于可用的分区怎么办?
我的用例是,我正在考虑将大量船舶数据发送给经纪人并通过 ship_id
(MMSI,如果您知道)作为密钥进行存储。问题是,我不知道那时会收到多少船。所以我不能提前定义分区号。
is it possible that a partition stores messages with multiple keys?
是的,murmur2 哈希(Kafka 使用的算法),mod 一个主题中的分区数可以得出相同的数字。例如,如果你只有一个分区,那么任何键显然都去同一个分区
how if the number of key is more than partition available?
散列是 modulo'd,所以它总是被分配一个有效的分区
现在,如果您有一个定义明确的键,就可以保证将消息按顺序放入分区中,因此分区数量的答案实际上取决于单个分区可以处理多少吞吐量,并且没有短的回答 - 您要发送多少数据,一个消费者在 "peak" 消费时从一个分区获取数据的速度有多快?进行适当的性能测试,然后在新主题上扩展分区数量以处理潜在的未来负载
您还需要考虑 "hot" / "cold" 数据。例如,如果您有 10 个分区映射到 ID 的第一位数字,那么您的所有数据都以偶数开头,您最终将有一半的分区为空
1. Kafka messages are form of key and value and it stored into in topics. Topics are partitioned into multiple partitioner and each partition further divided into segment each segment has a log file to store the actual message in key - value form and index or offset of the message.
Key是可选的,用于标识要存储消息的分区,如果key为null,则消息以循环方式存储,而如果key不为null,则它将使用具有保证选择的模块分区大小的哈希键分区之一。 例如
hash(key)%num_partition
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
int numPartitions = partitions.size();
if (keyBytes == null) {
int nextValue = nextValue(topic);
List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
if (availablePartitions.size() > 0) {
int part = Utils.toPositive(nextValue) % availablePartitions.size();
return availablePartitions.get(part).partition();
} else {
// no partitions are available, give a non-available partition
return Utils.toPositive(nextValue) % numPartitions;
}
} else {
// hash the keyBytes to choose a partition
return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
}
}
因此,自从它使用模块以来,它将始终将消息存储在可用分区范围内,这就是多个键可能进入同一分区的原因。消息键的主要好处是将相同的消息键存储到相同的分区。
2. 所以你不用担心分区的数量可以根据键的数量来定义。如上所述,密钥用于根据默认分区程序逻辑将消息分桶到不同的分区。分区号基本上有助于将进程并行化以实现高吞吐量。
Note:You also make sure by using key for partitioned data may cause unequal distribution so if you don't worry just keep key null which select partition on round-robin
Other approach is to create custom partitioner to further refine partition selection logic. here