Kafka 获取相应消息 ID 的分区 ID
Kafka get the partition id for a corresponding message ID
我正在编写一个 Kafka 消费者(在 python 中),它需要从特定时间戳开始流式传输给定密钥的所有消息。
集群使用默认的分区策略,每条消息都有一个键,所以我知道我想要的所有消息都在一个分区上。
问题是,我如何才能找到我的消息写入了哪个分区?
即我需要这样的东西...(伪代码)
import kafka
from kafka.structs import TopicPartition
target_msg_key = "3a11d08b-d635-490a-aa4e-16b282a599e6"
consumer = kafka.KafkaConsumer([...])
# This doesn't exist?
partition_id = consumer.getPartitionIDforKey(target_msg_key)
tp = TopicPartition("my-topic",partition_id)
consumer.assign([tp])
for msg in consumer:
....
基本上,我需要一种方法将消息密钥转换为分区 ID(在 python 中)。
您可以在 DefaultPartitioner
的源代码中找到该逻辑
# given: all_partitions as a sorted list of numbers for the topic's partitions
idx = murmur2(key)
idx &= 0x7fffffff
idx %= len(all_partitions)
return all_partitions[idx]
我正在编写一个 Kafka 消费者(在 python 中),它需要从特定时间戳开始流式传输给定密钥的所有消息。
集群使用默认的分区策略,每条消息都有一个键,所以我知道我想要的所有消息都在一个分区上。
问题是,我如何才能找到我的消息写入了哪个分区?
即我需要这样的东西...(伪代码)
import kafka
from kafka.structs import TopicPartition
target_msg_key = "3a11d08b-d635-490a-aa4e-16b282a599e6"
consumer = kafka.KafkaConsumer([...])
# This doesn't exist?
partition_id = consumer.getPartitionIDforKey(target_msg_key)
tp = TopicPartition("my-topic",partition_id)
consumer.assign([tp])
for msg in consumer:
....
基本上,我需要一种方法将消息密钥转换为分区 ID(在 python 中)。
您可以在 DefaultPartitioner
的源代码中找到该逻辑# given: all_partitions as a sorted list of numbers for the topic's partitions
idx = murmur2(key)
idx &= 0x7fffffff
idx %= len(all_partitions)
return all_partitions[idx]