如何从 Kafka 有线格式的 avro 事件中解码模式 ID?

How do I decode the schema id from avro event in Kafka wire format?

祝大家一切顺利

我正在从 Kafka 主题中提取数据,其中我有多种事件类型和每种类型的模式。由于 Kafka 使用 wire format,第一个字节是魔法字节,从字节 1 到 4 我们有模式 ID,在第 5 个字节之后我们有数据本身。

我想解码架构 ID,以便能够从架构注册表中获取架构。我怎样才能用 Python 做到这一点?

例如,如果我有 b'\x00\x00\x00\x04' 作为 schema-id 二进制文件,我如何解码这个二进制文件以便获得 schema-id 的实际值?

鉴于完整的五个字节 header,您应该能够执行以下操作:

from struct import unpack
magic, schema_id = unpack('>bI', header_bytes)

这取自 confluent-kafka-python 库获取 schema_id 的方式:https://github.com/confluentinc/confluent-kafka-python/blob/e671bccb8a4f98302748ccf60d5d579f68c6613d/src/confluent_kafka/schema_registry/avro.py#L315