带有 Kafka 的 Avro - 反序列化改变模式
Avro with Kafka - Deserializing with changing schema
基于 Avro 模式,我生成了一个 class(数据)来使用适合该模式的 class
之后,我对数据进行编码并使用 kafka
发送到其他应用程序 "A"
Data data; // <- The object was initialized before . Here it is only the declaration "for example"
EncoderFactory encoderFactory = EncoderFactory.get();
ByteArrayOutputStream out = new ByteArrayOutputStream();
BinaryEncoder encoder = encoderFactory. directBinaryEncoder(out, null);
DatumWriter<Tloog> writer;
writer = new SpecificDatumWriter<Data>( Data.class);
writer.write(data, encoder);
byte[] avroByteMessage = out.toByteArray();
另一方面(在应用程序 "A" 中)我通过实现 Deserializer
对数据进行反序列化
class DataDeserializer implements Deserializer<Data> {
private String encoding = "UTF8";
@Override
public void configure(Map<String, ?> configs, boolean isKey) {
// nothing to do
}
@Override
public Tloog deserialize(String topic, byte[] data) {
try {
if (data == null)
{
return null;
}
else
{
DatumReader<Tloog> reader = new SpecificDatumReader<Data>( Data.class);
DecoderFactory decoderFactory = DecoderFactory.get();
BinaryDecoder decoder = decoderFactory.binaryDecoder( data, null);
Data decoded = reader.read(null, decoder);
return decoded;
}
} catch (Exception e) {
throw new SerializationException("Error when deserializing byte[] to string due to unsupported encoding " + encoding);
}
}
问题是这种方法需要使用 SpecificDatumReader,I.e.the 数据 class 应该与应用程序代码集成...这可能会有问题 - 模式可能会改变,因此数据 class应该重新生成并再次集成
2 个问题:
- 我应该在应用程序中使用 GenericDatumReader 吗?怎么做
正确。 (我可以简单地在应用程序中保存架构)
- 有没有一种简单的方法可以在数据更改时使用 SpecificDatumReader?怎么集成不麻烦?
谢谢
我使用 GenericDatumReader
—— 好吧,实际上我从中推导出 reader class,但你明白了。为了使用它,我将我的模式保存在一个特殊的 Kafka 主题中——Schema
令人惊讶。消费者和生产者都在启动时阅读本主题并配置各自的解析器。
如果您这样做,您甚至可以让您的消费者和生产者即时更新他们的架构,而无需重新启动它们。这对我来说是一个设计目标——我不想为了添加或更改模式而必须重新启动我的应用程序。这就是为什么 SpecificDatumReader
对我不起作用,老实说为什么我首先使用 Avro
而不是像 Thrift
.
这样的东西
更新
执行 Avro 的正常方法是将架构与记录一起存储在文件中。我不那样做,主要是因为我做不到。我使用 Kafka
,所以我不能将模式直接与数据一起存储——我必须将模式存储在一个单独的主题中。
我这样做的方式是,首先加载我所有的模式。您可以从文本文件中读取它们;但正如我所说,我是从 Kafka
主题中阅读它们的。从 Kafka 读取它们后,我有一个这样的数组:
val schemaArray: Array[String] = Array(
"""{"name":"MyObj","type":"record","fields":[...]}""",
"""{"name":"MyOtherObj","type":"record","fields":[...]}"""
)
为 Scala
顺便说一句道歉,但这就是我得到的。
无论如何,您需要创建一个解析器,然后为每个模式解析它并创建 readers 和编写器,并将它们保存到地图:
val parser = new Schema.Parser()
val schemas = Map(schemaArray.map{s => parser.parse(s)}.map(s => (s.getName, s)):_*)
val readers = schemas.map(s => (s._1, new GenericDatumReader[GenericRecord](s._2)))
val writers = schemas.map(s => (s._1, new GenericDatumWriter[GenericRecord](s._2)))
var decoder: BinaryDecoder = null
我在解析实际记录之前完成了所有这些工作——这只是为了配置解析器。然后,要解码单个记录,我会这样做:
val byteArray: Array[Byte] = ... // <-- Avro encoded record
val schemaName: String = ... // <-- name of the Avro schema
val reader = readers.get(schemaName).get
decoder = DecoderFactory.get.binaryDecoder(byteArray, decoder)
val record = reader.read(null, decoder)
基于 Avro 模式,我生成了一个 class(数据)来使用适合该模式的 class 之后,我对数据进行编码并使用 kafka
发送到其他应用程序 "A"Data data; // <- The object was initialized before . Here it is only the declaration "for example"
EncoderFactory encoderFactory = EncoderFactory.get();
ByteArrayOutputStream out = new ByteArrayOutputStream();
BinaryEncoder encoder = encoderFactory. directBinaryEncoder(out, null);
DatumWriter<Tloog> writer;
writer = new SpecificDatumWriter<Data>( Data.class);
writer.write(data, encoder);
byte[] avroByteMessage = out.toByteArray();
另一方面(在应用程序 "A" 中)我通过实现 Deserializer
对数据进行反序列化class DataDeserializer implements Deserializer<Data> {
private String encoding = "UTF8";
@Override
public void configure(Map<String, ?> configs, boolean isKey) {
// nothing to do
}
@Override
public Tloog deserialize(String topic, byte[] data) {
try {
if (data == null)
{
return null;
}
else
{
DatumReader<Tloog> reader = new SpecificDatumReader<Data>( Data.class);
DecoderFactory decoderFactory = DecoderFactory.get();
BinaryDecoder decoder = decoderFactory.binaryDecoder( data, null);
Data decoded = reader.read(null, decoder);
return decoded;
}
} catch (Exception e) {
throw new SerializationException("Error when deserializing byte[] to string due to unsupported encoding " + encoding);
}
}
问题是这种方法需要使用 SpecificDatumReader,I.e.the 数据 class 应该与应用程序代码集成...这可能会有问题 - 模式可能会改变,因此数据 class应该重新生成并再次集成 2 个问题:
- 我应该在应用程序中使用 GenericDatumReader 吗?怎么做 正确。 (我可以简单地在应用程序中保存架构)
- 有没有一种简单的方法可以在数据更改时使用 SpecificDatumReader?怎么集成不麻烦?
谢谢
我使用 GenericDatumReader
—— 好吧,实际上我从中推导出 reader class,但你明白了。为了使用它,我将我的模式保存在一个特殊的 Kafka 主题中——Schema
令人惊讶。消费者和生产者都在启动时阅读本主题并配置各自的解析器。
如果您这样做,您甚至可以让您的消费者和生产者即时更新他们的架构,而无需重新启动它们。这对我来说是一个设计目标——我不想为了添加或更改模式而必须重新启动我的应用程序。这就是为什么 SpecificDatumReader
对我不起作用,老实说为什么我首先使用 Avro
而不是像 Thrift
.
更新
执行 Avro 的正常方法是将架构与记录一起存储在文件中。我不那样做,主要是因为我做不到。我使用 Kafka
,所以我不能将模式直接与数据一起存储——我必须将模式存储在一个单独的主题中。
我这样做的方式是,首先加载我所有的模式。您可以从文本文件中读取它们;但正如我所说,我是从 Kafka
主题中阅读它们的。从 Kafka 读取它们后,我有一个这样的数组:
val schemaArray: Array[String] = Array(
"""{"name":"MyObj","type":"record","fields":[...]}""",
"""{"name":"MyOtherObj","type":"record","fields":[...]}"""
)
为 Scala
顺便说一句道歉,但这就是我得到的。
无论如何,您需要创建一个解析器,然后为每个模式解析它并创建 readers 和编写器,并将它们保存到地图:
val parser = new Schema.Parser()
val schemas = Map(schemaArray.map{s => parser.parse(s)}.map(s => (s.getName, s)):_*)
val readers = schemas.map(s => (s._1, new GenericDatumReader[GenericRecord](s._2)))
val writers = schemas.map(s => (s._1, new GenericDatumWriter[GenericRecord](s._2)))
var decoder: BinaryDecoder = null
我在解析实际记录之前完成了所有这些工作——这只是为了配置解析器。然后,要解码单个记录,我会这样做:
val byteArray: Array[Byte] = ... // <-- Avro encoded record
val schemaName: String = ... // <-- name of the Avro schema
val reader = readers.get(schemaName).get
decoder = DecoderFactory.get.binaryDecoder(byteArray, decoder)
val record = reader.read(null, decoder)