Kafka 获取相应消息 ID 的分区 ID

Kafka get the partition id for a corresponding message ID

我正在编写一个 Kafka 消费者(在 python 中),它需要从特定时间戳开始流式传输给定密钥的所有消息。

集群使用默认的分区策略,每条消息都有一个键,所以我知道我想要的所有消息都在一个分区上。

问题是,我如何才能找到我的消息写入了哪个分区?

即我需要这样的东西...(伪代码)

import kafka
from kafka.structs import TopicPartition

target_msg_key = "3a11d08b-d635-490a-aa4e-16b282a599e6"

consumer = kafka.KafkaConsumer([...])

# This doesn't exist?
partition_id = consumer.getPartitionIDforKey(target_msg_key)

tp = TopicPartition("my-topic",partition_id)

consumer.assign([tp])

for msg in consumer:
    ....

基本上,我需要一种方法将消息密钥转换为分区 ID(在 python 中)。

您可以在 DefaultPartitioner

的源代码中找到该逻辑
# given: all_partitions as a sorted list of numbers for the topic's partitions
idx = murmur2(key)
idx &= 0x7fffffff
idx %= len(all_partitions)
return all_partitions[idx]