Avro 向后兼容性无法按预期工作
Avro backward compatibility doesn't work as expected
我有两个 Avro 模式 V1 和 V2,它们在 spark 中读取如下:
import org.apache.spark.sql.avro.functions._
val jsonFormatSchema = new String(Files.readAllBytes(Paths.get("./examples/src/main/resources/V1.avsc")))
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "topic1")
.load()
val output = df
.select(from_avro($"value", jsonFormatSchema) as $"avroFields")
V1 有两个字段“一”和“二”
{
"name": "test",
"namespace": "foo.bar",
"type": "record",
"fields": [
{
"name": "one",
"type": [
"null",
"string"
],
"default": null
},
{
"name": "two",
"type": [
"null",
"string"
],
"default": null
}
]
}
具有新字段的 V2:“三”
{
"name": "test",
"namespace": "foo.bar",
"type": "record",
"fields": [
{
"name": "one",
"type": [
"null",
"string"
],
"default": null
},
{
"name": "two",
"type": [
"null",
"string"
],
"default": null
},
{
"name": "three",
"type": [
"null",
"string"
],
"default": null
}
]
}
场景:作者使用V1写入,Reader使用V2解码avro记录。我的期望是看到字段 3 填充了默认值 null。但是我 运行 从我的 spark 工作中进入以下异常。
我是不是漏掉了什么?我的理解是avro支持向后兼容。
Exception in thread "main" java.io.EOFException
at org.apache.avro.io.BinaryDecoder.ensureBounds(BinaryDecoder.java:473)
at org.apache.avro.io.BinaryDecoder.readInt(BinaryDecoder.java:128)
at org.apache.avro.io.BinaryDecoder.readIndex(BinaryDecoder.java:423)
at org.apache.avro.io.ResolvingDecoder.doAction(ResolvingDecoder.java:290)
at org.apache.avro.io.parsing.Parser.advance(Parser.java:88)
at org.apache.avro.io.ResolvingDecoder.readIndex(ResolvingDecoder.java:267)
at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:179)
at org.apache.avro.specific.SpecificDatumReader.readField(SpecificDatumReader.java:116)
at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:222)
at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:175)
at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:145)
您总是必须使用写入的 exact 模式解码 Avro。这是因为 Avro 使用未标记的数据更加紧凑,并且需要 writers 模式出现在解码时间。
因此,当您使用 V2 模式进行读取时,它会查找字段 three
(或者可能是该字段的空标记)并抛出错误。
你可以做的是将解码数据(使用 writers 模式解码)映射到 reader 模式,Java 有一个 API 对应:SpecificDatumReader(Schema writer, Schema reader)
。
Protocol Buffers 或 Thrift 做你想做的,它们是标记格式。 Avro 期望模式与数据一起传输,例如在 Avro 文件中。
我有两个 Avro 模式 V1 和 V2,它们在 spark 中读取如下:
import org.apache.spark.sql.avro.functions._
val jsonFormatSchema = new String(Files.readAllBytes(Paths.get("./examples/src/main/resources/V1.avsc")))
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "topic1")
.load()
val output = df
.select(from_avro($"value", jsonFormatSchema) as $"avroFields")
V1 有两个字段“一”和“二”
{
"name": "test",
"namespace": "foo.bar",
"type": "record",
"fields": [
{
"name": "one",
"type": [
"null",
"string"
],
"default": null
},
{
"name": "two",
"type": [
"null",
"string"
],
"default": null
}
]
}
具有新字段的 V2:“三”
{
"name": "test",
"namespace": "foo.bar",
"type": "record",
"fields": [
{
"name": "one",
"type": [
"null",
"string"
],
"default": null
},
{
"name": "two",
"type": [
"null",
"string"
],
"default": null
},
{
"name": "three",
"type": [
"null",
"string"
],
"default": null
}
]
}
场景:作者使用V1写入,Reader使用V2解码avro记录。我的期望是看到字段 3 填充了默认值 null。但是我 运行 从我的 spark 工作中进入以下异常。
我是不是漏掉了什么?我的理解是avro支持向后兼容。
Exception in thread "main" java.io.EOFException
at org.apache.avro.io.BinaryDecoder.ensureBounds(BinaryDecoder.java:473)
at org.apache.avro.io.BinaryDecoder.readInt(BinaryDecoder.java:128)
at org.apache.avro.io.BinaryDecoder.readIndex(BinaryDecoder.java:423)
at org.apache.avro.io.ResolvingDecoder.doAction(ResolvingDecoder.java:290)
at org.apache.avro.io.parsing.Parser.advance(Parser.java:88)
at org.apache.avro.io.ResolvingDecoder.readIndex(ResolvingDecoder.java:267)
at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:179)
at org.apache.avro.specific.SpecificDatumReader.readField(SpecificDatumReader.java:116)
at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:222)
at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:175)
at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:145)
您总是必须使用写入的 exact 模式解码 Avro。这是因为 Avro 使用未标记的数据更加紧凑,并且需要 writers 模式出现在解码时间。
因此,当您使用 V2 模式进行读取时,它会查找字段 three
(或者可能是该字段的空标记)并抛出错误。
你可以做的是将解码数据(使用 writers 模式解码)映射到 reader 模式,Java 有一个 API 对应:SpecificDatumReader(Schema writer, Schema reader)
。
Protocol Buffers 或 Thrift 做你想做的,它们是标记格式。 Avro 期望模式与数据一起传输,例如在 Avro 文件中。