Kafka:如何在Kafka中实现Round Robin Partition
Kafka: How to achieve Round Robin Partition in Kafka
我是卡夫卡的新手。我的要求是,我有两个分区,例如 Partition-0 和 Partition-1,并且我有值列表,其中也包含 KEY 值。我想根据我的密钥存储数据,例如 key-1 将转到 Partition-0,key-2 将转到 Partition-1。使用旧的 API 有办法实现,就像我们需要实现分区接口一样,但是我如何使用新的 API 来实现。谢谢
对于新的生产者,您还可以实现Partitioner
接口(https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/producer/Partitioner.java)以实现循环分配。
可以参考DefaultPartitioner
- https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/producer/internals/DefaultPartitioner.java
您可以通过覆盖 kafka 生产者的 default partitioner 以循环方式生产到 kafka。
伪实现
class RRPartitioner():
def __init__():
# Using topic metadata get total number of partitions
self.total_partitions = client[topic].get_number_partitions()
self.part_offset = 0
def partitioner(self, key, msg):
if self.part_offset > self.total_partitions:
self.part_offset = 0
return self.part_offset
else:
self.part_offset += 1
return self.part_offset
以上实现是纯循环法,如果您希望消息根据键排序并具有循环法,则必须在自定义分区程序中执行更多操作。
如果您想要 round-robin 行为,只需在写入 Producer 时不传递密钥,DefaultPartitioner
会为您完成这项工作。您不需要编写自定义实现。来自 javadocs:
/**
* The default partitioning strategy:
* <ul>
* <li>If a partition is specified in the record, use it
* <li>If no partition is specified but a key is present choose a partition based on a hash of the key
* <li>If no partition or key is present choose a partition in a round-robin fashion
*/
从 Kafka 2.4.0 开始,您可以选择执行 "Always" Roundrobin。
我是卡夫卡的新手。我的要求是,我有两个分区,例如 Partition-0 和 Partition-1,并且我有值列表,其中也包含 KEY 值。我想根据我的密钥存储数据,例如 key-1 将转到 Partition-0,key-2 将转到 Partition-1。使用旧的 API 有办法实现,就像我们需要实现分区接口一样,但是我如何使用新的 API 来实现。谢谢
对于新的生产者,您还可以实现Partitioner
接口(https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/producer/Partitioner.java)以实现循环分配。
可以参考DefaultPartitioner
- https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/producer/internals/DefaultPartitioner.java
您可以通过覆盖 kafka 生产者的 default partitioner 以循环方式生产到 kafka。
伪实现
class RRPartitioner():
def __init__():
# Using topic metadata get total number of partitions
self.total_partitions = client[topic].get_number_partitions()
self.part_offset = 0
def partitioner(self, key, msg):
if self.part_offset > self.total_partitions:
self.part_offset = 0
return self.part_offset
else:
self.part_offset += 1
return self.part_offset
以上实现是纯循环法,如果您希望消息根据键排序并具有循环法,则必须在自定义分区程序中执行更多操作。
如果您想要 round-robin 行为,只需在写入 Producer 时不传递密钥,DefaultPartitioner
会为您完成这项工作。您不需要编写自定义实现。来自 javadocs:
/**
* The default partitioning strategy:
* <ul>
* <li>If a partition is specified in the record, use it
* <li>If no partition is specified but a key is present choose a partition based on a hash of the key
* <li>If no partition or key is present choose a partition in a round-robin fashion
*/
从 Kafka 2.4.0 开始,您可以选择执行 "Always" Roundrobin。