Avro 向后兼容性无法按预期工作

Avro backward compatibility doesn't work as expected

我有两个 Avro 模式 V1 和 V2,它们在 spark 中读取如下:

import org.apache.spark.sql.avro.functions._

val jsonFormatSchema = new String(Files.readAllBytes(Paths.get("./examples/src/main/resources/V1.avsc")))

val df = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribe", "topic1")
  .load()

val output = df
  .select(from_avro($"value", jsonFormatSchema) as $"avroFields")

V1 有两个字段“一”和“二”

{
  "name": "test",
  "namespace": "foo.bar",
  "type": "record",
  "fields": [
    {
      "name": "one",
      "type": [
        "null",
        "string"
      ],
      "default": null
    },
    {
      "name": "two",
      "type": [
        "null",
        "string"
      ],
      "default": null
    }
  ]
}

具有新字段的 V2:“三”

{
  "name": "test",
  "namespace": "foo.bar",
  "type": "record",
  "fields": [
    {
      "name": "one",
      "type": [
        "null",
        "string"
      ],
      "default": null
    },
    {
      "name": "two",
      "type": [
        "null",
        "string"
      ],
      "default": null
    },
    {
      "name": "three",
      "type": [
        "null",
        "string"
      ],
      "default": null
    }
  ]
}

场景:作者使用V1写入,Reader使用V2解码avro记录。我的期望是看到字段 3 填充了默认值 null。但是我 运行 从我的 spark 工作中进入以下异常。

我是不是漏掉了什么?我的理解是avro支持向后兼容。

Exception in thread "main" java.io.EOFException
  at org.apache.avro.io.BinaryDecoder.ensureBounds(BinaryDecoder.java:473)
  at org.apache.avro.io.BinaryDecoder.readInt(BinaryDecoder.java:128)
  at org.apache.avro.io.BinaryDecoder.readIndex(BinaryDecoder.java:423)
  at org.apache.avro.io.ResolvingDecoder.doAction(ResolvingDecoder.java:290)
  at org.apache.avro.io.parsing.Parser.advance(Parser.java:88)
  at org.apache.avro.io.ResolvingDecoder.readIndex(ResolvingDecoder.java:267)
  at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:179)
  at org.apache.avro.specific.SpecificDatumReader.readField(SpecificDatumReader.java:116)
  at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:222)
  at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:175)
  at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
  at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:145)

您总是必须使用写入的 exact 模式解码 Avro。这是因为 Avro 使用未标记的数据更加紧凑,并且需要 writers 模式出现在解码时间。

因此,当您使用 V2 模式进行读取时,它会查找字段 three(或者可能是该字段的空标记)并抛出错误。

你可以做的是将解码数据(使用 writers 模式解码)映射到 reader 模式,Java 有一个 API 对应:SpecificDatumReader(Schema writer, Schema reader)

Protocol Buffers 或 Thrift 做你想做的,它们是标记格式。 Avro 期望模式与数据一起传输,例如在 Avro 文件中。