将哪些元数据添加到 avro 记录的前面以与 kafka 一起使用?

What metadata to add to the front of an avro record to work with kafka?

我有自己的 avro 序列化器,但是 kafka 抛出这个错误:

Error deserializing Avro message for id -1","Unknown magic byte!"]},"recordProcessingError":null,"productionError":null} (processing.8236092859265547583.KsqlTopic.source.deserializer:44)

我知道我需要在前面添加 "Magic Byte" 和架构 ID,但我找不到任何关于此 "magic byte" 或架构 ID 格式的信息。我尝试了以下内容,但现在出现此错误:

"message": "Kafka error: Error retrieving Avro schema for id 5202538"

你能帮我吗?

这是我的(糟糕的)尝试:

    var avroRecordWithoutTheMetadata = data.ToArray().ToList();
    var schemaIdBytes = BitConverter.GetBytes(Program.schemaID).ToList();
    var magicByte = new byte();

    var correctAvroRecord = new List<byte>() {magicByte};
    correctAvroRecord.AddRange(schemaIdBytes);
    correctAvroRecord.AddRange(avroRecordWithoutTheMetadata);

    var result = correctAvroRecord.ToArray();

i cant find anything about this "magic byte" or the format of the schema id

魔法字节只是 0x0,架构 ID 是接下来的 4 个字节的整数。

https://docs.confluent.io/current/schema-registry/docs/serializer-formatter.html#wire-format

或者,只需使用 existing serializers provided by Confluent