Kafka中的分区选择
Partition selection in Kafka
我很好奇如果我有主题 A 和 B,它们具有相同数量的分区,如果我用键 x
向主题 A 发送消息,它会进入分区 0
比方说。当我对主题 B 使用完全相同的密钥但它们是独立的时,在主题 B 中的密钥 x
,在 kafka 流处理期间消息是否仍会转到主题 B 上的分区?
默认情况下,Kafka 使用 DefaultPartitioner
(org.apache.kafka.clients.producer.internals.DefaultPartitioner
) 来跨主题分区分发消息:
/**
* Compute the partition for the given record.
*
* @param topic The topic name
* @param key The key to partition on (or null if no key)
* @param keyBytes serialized key to partition on (or null if no key)
* @param value The value to partition on or null
* @param valueBytes serialized value to partition on or null
* @param cluster The current cluster metadata
*/
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
if (keyBytes == null) {
return stickyPartitionCache.partition(topic, cluster);
}
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
int numPartitions = partitions.size();
// hash the keyBytes to choose a partition
return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
}
本质上,DefaultPartitioner
利用了 MurmurHash,一种非加密哈希函数,通常用于基于哈希的查找。然后在模运算 (% numPartitions
) 中使用此哈希,以确保返回的分区在 [0, N]
范围内,其中 N
是主题的分区数。
现在回答您的问题,只要分区的序列化密钥相同且不为空,那么两条消息将被放置到同一分区(假设两个主题具有相同的分区)。
我很好奇如果我有主题 A 和 B,它们具有相同数量的分区,如果我用键 x
向主题 A 发送消息,它会进入分区 0
比方说。当我对主题 B 使用完全相同的密钥但它们是独立的时,在主题 B 中的密钥 x
,在 kafka 流处理期间消息是否仍会转到主题 B 上的分区?
默认情况下,Kafka 使用 DefaultPartitioner
(org.apache.kafka.clients.producer.internals.DefaultPartitioner
) 来跨主题分区分发消息:
/**
* Compute the partition for the given record.
*
* @param topic The topic name
* @param key The key to partition on (or null if no key)
* @param keyBytes serialized key to partition on (or null if no key)
* @param value The value to partition on or null
* @param valueBytes serialized value to partition on or null
* @param cluster The current cluster metadata
*/
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
if (keyBytes == null) {
return stickyPartitionCache.partition(topic, cluster);
}
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
int numPartitions = partitions.size();
// hash the keyBytes to choose a partition
return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
}
本质上,DefaultPartitioner
利用了 MurmurHash,一种非加密哈希函数,通常用于基于哈希的查找。然后在模运算 (% numPartitions
) 中使用此哈希,以确保返回的分区在 [0, N]
范围内,其中 N
是主题的分区数。
现在回答您的问题,只要分区的序列化密钥相同且不为空,那么两条消息将被放置到同一分区(假设两个主题具有相同的分区)。