将通用 avro 记录序列化为 Array[Byte] 将模式保留在对象中

Serializing generic avro records as an Array[Byte] keeps the schema in the object

情况

我目前正在使用 AVRO 和架构存储库编写 consumer/producer。

据我所知,我序列化这些数据的选择是使用 Confluent 的 avro 序列化器,或者使用 Twitter 的双射。

似乎双射看起来最直接。

所以我想生成以下格式的日期 ProducerRecord[String,Array[Byte]],这归结为 [一些字符串 ID,序列化的 GenericRecord]

(注意:我要使用通用记录,因为此代码库必须处理从 Json/csv/... 解析的数千个模式)

问题:

我序列化和使用 AVRO 的全部原因是您不需要在数据本身中有一个架构(就像您使用 Json/XML/...)。
然而,当检查主题中的数据时,我看到整个方案与数据一起包含在内。我是不是在做一些根本性的错误,这是设计使然,还是我应该改用 confluent 序列化程序?

代码:

  def jsonStringToAvro(jString: String, schema: Schema): GenericRecord = {
    val converter = new JsonAvroConverter
    val genericRecord = converter.convertToGenericDataRecord(jString.replaceAll("\\/","_").getBytes(), schema)

    genericRecord
  }
def serializeAsByteArray(avroRecord: GenericRecord): Array[Byte] = {
    //val genericRecordInjection = GenericAvroCodecs.toBinary(avroRecord.getSchema)
    val r: Array[Byte] = GenericAvroCodecs.toBinary(avroRecord.getSchema).apply(avroRecord)

    r
  }

//schema comes from a rest call to the schema repository
new ProducerRecord[String, Array[Byte]](topic, myStringKeyGoesHere, serializeAsByteArray(jsonStringToAvro(jsonObjectAsStringGoesHere, schema)))


        producer.send(producerRecord, new Callback {...})

如果您查看 Confluent source code ,您会发现与架构存储库交互的操作顺序是

  1. 从 Avro 记录中获取架构,并计算其 ID。理想情况下 POST-ing 架构到存储库,或者以其他方式散列它应该给你一个 ID。
  2. 分配一个 ByteBuffer
  3. 将返回的ID写入缓冲区
  4. 将 Avro 对象值(不包括模式)作为字节写入缓冲区
  5. 将该字节缓冲区发送到 Kafka

目前,您的双射用法将在字节中包含架构,而不是用 ID 替换它