带有 Avro 和 Schema Repo 的 Apache Kafka - 模式 ID 在消息中的什么位置?
Apache Kafka with Avro and Schema Repo - where in the message does the schema Id go?
我想使用 Avro 来序列化我的 Kafka 消息的数据,并希望将它与 Avro 模式存储库一起使用,这样我就不必在每条消息中都包含该模式。
将 Avro 与 Kafka 一起使用似乎是一件很受欢迎的事情,很多博客/Stack Overflow 问题/用户组等都参考了发送带有消息的架构 ID,但我找不到它应该去哪里的实际示例。
我认为它应该放在 Kafka 消息头的某个地方,但我找不到明显的地方。如果它在 Avro 消息中,您必须根据模式对其进行解码以获取消息内容并显示您需要解码的模式,这有明显的问题。
我正在使用 C# 客户端,但任何语言的示例都很棒。消息 class 具有以下字段:
public MessageMetadata Meta { get; set; }
public byte MagicNumber { get; set; }
public byte Attribute { get; set; }
public byte[] Key { get; set; }
public byte[] Value { get; set; }
但这些似乎都不正确。 MessageMetaData 只有 Offset 和 PartitionId。
那么,Avro Schema Id 应该去哪里呢?
模式 ID 实际上是在 avro 消息本身中编码的。查看 this 了解 encoders/decoders 是如何实现的。
一般情况下,当您向 Kafka 发送 Avro 消息时会发生什么:
- 编码器从要编码的对象中获取模式。
- 编码器向模式注册表询问此模式的 ID。如果架构已经注册,您将获得一个现有 ID,如果没有 - 注册表将注册架构和 return 新 ID。
- 对象编码如下:[magic byte][schema id][actual message] 其中 magic byte 只是一个
0x0
字节,用于区分那种消息,schema id 是一个4 字节整数值,其余为实际编码的消息。
当您解码返回的消息时,会发生以下情况:
- 解码器读取第一个字节并确保它是
0x0
。
- 解码器读取接下来的 4 个字节并将它们转换为整数值。这就是模式 ID 的解码方式。
- 现在,当解码器有模式 ID 时,它可能会向模式注册表询问此 ID 的实际模式。瞧!
如果您的密钥是 Avro 编码的,那么您的密钥将采用上述格式。这同样适用于价值。这样您的键和值可能都是 Avro 值并使用不同的模式。
编辑回答评论中的问题:
实际模式存储在模式存储库中(这实际上是模式存储库的全部要点 - 存储模式:))。 Avro 对象容器文件格式与上述格式无关。 KafkaAvroEncoder/Decoder 使用略有不同的消息格式(但实际消息的编码方式肯定完全相同)。
这些格式之间的主要区别在于对象容器文件携带实际模式并且可能包含与该模式对应的多条消息,而上述格式仅携带模式 ID 和一条与该模式对应的消息。
传递对象容器文件编码的消息对 follow/maintain 来说可能并不明显,因为一条 Kafka 消息将包含多条 Avro 消息。或者您可以确保一条 Kafka 消息仅包含一条 Avro 消息,但这会导致每条消息都携带架构。
Avro 模式可能非常大(我见过 600 KB 甚至更大的模式)并且在每条消息中携带模式会非常昂贵和浪费,所以这就是模式存储库开始的地方 - 模式仅被获取一次并在本地缓存,所有其他查找都只是快速的地图查找。
我想使用 Avro 来序列化我的 Kafka 消息的数据,并希望将它与 Avro 模式存储库一起使用,这样我就不必在每条消息中都包含该模式。
将 Avro 与 Kafka 一起使用似乎是一件很受欢迎的事情,很多博客/Stack Overflow 问题/用户组等都参考了发送带有消息的架构 ID,但我找不到它应该去哪里的实际示例。
我认为它应该放在 Kafka 消息头的某个地方,但我找不到明显的地方。如果它在 Avro 消息中,您必须根据模式对其进行解码以获取消息内容并显示您需要解码的模式,这有明显的问题。
我正在使用 C# 客户端,但任何语言的示例都很棒。消息 class 具有以下字段:
public MessageMetadata Meta { get; set; }
public byte MagicNumber { get; set; }
public byte Attribute { get; set; }
public byte[] Key { get; set; }
public byte[] Value { get; set; }
但这些似乎都不正确。 MessageMetaData 只有 Offset 和 PartitionId。
那么,Avro Schema Id 应该去哪里呢?
模式 ID 实际上是在 avro 消息本身中编码的。查看 this 了解 encoders/decoders 是如何实现的。
一般情况下,当您向 Kafka 发送 Avro 消息时会发生什么:
- 编码器从要编码的对象中获取模式。
- 编码器向模式注册表询问此模式的 ID。如果架构已经注册,您将获得一个现有 ID,如果没有 - 注册表将注册架构和 return 新 ID。
- 对象编码如下:[magic byte][schema id][actual message] 其中 magic byte 只是一个
0x0
字节,用于区分那种消息,schema id 是一个4 字节整数值,其余为实际编码的消息。
当您解码返回的消息时,会发生以下情况:
- 解码器读取第一个字节并确保它是
0x0
。 - 解码器读取接下来的 4 个字节并将它们转换为整数值。这就是模式 ID 的解码方式。
- 现在,当解码器有模式 ID 时,它可能会向模式注册表询问此 ID 的实际模式。瞧!
如果您的密钥是 Avro 编码的,那么您的密钥将采用上述格式。这同样适用于价值。这样您的键和值可能都是 Avro 值并使用不同的模式。
编辑回答评论中的问题:
实际模式存储在模式存储库中(这实际上是模式存储库的全部要点 - 存储模式:))。 Avro 对象容器文件格式与上述格式无关。 KafkaAvroEncoder/Decoder 使用略有不同的消息格式(但实际消息的编码方式肯定完全相同)。
这些格式之间的主要区别在于对象容器文件携带实际模式并且可能包含与该模式对应的多条消息,而上述格式仅携带模式 ID 和一条与该模式对应的消息。
传递对象容器文件编码的消息对 follow/maintain 来说可能并不明显,因为一条 Kafka 消息将包含多条 Avro 消息。或者您可以确保一条 Kafka 消息仅包含一条 Avro 消息,但这会导致每条消息都携带架构。
Avro 模式可能非常大(我见过 600 KB 甚至更大的模式)并且在每条消息中携带模式会非常昂贵和浪费,所以这就是模式存储库开始的地方 - 模式仅被获取一次并在本地缓存,所有其他查找都只是快速的地图查找。