带有 Kafka 的 Avro - 反序列化改变模式

Avro with Kafka - Deserializing with changing schema

基于 Avro 模式,我生成了一个 class(数据)来使用适合该模式的 class 之后,我对数据进行编码并使用 kafka

发送到其他应用程序 "A"
Data data; // <- The object was initialized before . Here it is only the declaration "for example"
EncoderFactory encoderFactory = EncoderFactory.get();
        ByteArrayOutputStream out = new ByteArrayOutputStream();
        BinaryEncoder encoder = encoderFactory. directBinaryEncoder(out, null);                    
        DatumWriter<Tloog> writer;                  
        writer = new SpecificDatumWriter<Data>( Data.class);
        writer.write(data, encoder);
        byte[] avroByteMessage = out.toByteArray();

另一方面(在应用程序 "A" 中)我通过实现 Deserializer

对数据进行反序列化
class DataDeserializer implements Deserializer<Data> {
    private String encoding = "UTF8";

    @Override
    public void configure(Map<String, ?> configs, boolean isKey) {
        // nothing to do
    }

    @Override
    public Tloog deserialize(String topic, byte[] data) {
        try {
            if (data == null)
            {
                return null;
            }
            else
            {
                        DatumReader<Tloog> reader = new SpecificDatumReader<Data>( Data.class);
                        DecoderFactory decoderFactory = DecoderFactory.get();
                        BinaryDecoder decoder = decoderFactory.binaryDecoder( data, null);
                        Data decoded = reader.read(null, decoder);
                        return decoded;
            }
        } catch (Exception e) {
            throw new SerializationException("Error when deserializing byte[] to string due to unsupported encoding " + encoding);
        }
    }

问题是这种方法需要使用 SpecificDatumReader,I.e.the 数据 class 应该与应用程序代码集成...这可能会有问题 - 模式可能会改变,因此数据 class应该重新生成并再次集成 2 个问题:

  1. 我应该在应用程序中使用 GenericDatumReader 吗?怎么做 正确。 (我可以简单地在应用程序中保存架构)
  2. 有没有一种简单的方法可以在数据更改时使用 SpecificDatumReader?怎么集成不麻烦?

谢谢

我使用 GenericDatumReader —— 好吧,实际上我从中推导出 reader class,但你明白了。为了使用它,我将我的模式保存在一个特殊的 Kafka 主题中——Schema 令人惊讶。消费者和生产者都在启动时阅读本主题并配置各自的解析器。

如果您这样做,您甚至可以让您的消费者和生产者即时更新他们的架构,而无需重新启动它们。这对我来说是一个设计目标——我不想为了添加或更改模式而必须重新启动我的应用程序。这就是为什么 SpecificDatumReader 对我不起作用,老实说为什么我首先使用 Avro 而不是像 Thrift.

这样的东西

更新

执行 Avro 的正常方法是将架构与记录一起存储在文件中。我不那样做,主要是因为我做不到。我使用 Kafka,所以我不能将模式直接与数据一起存储——我必须将模式存储在一个单独的主题中。

我这样做的方式是,首先加载我所有的模式。您可以从文本文件中读取它们;但正如我所说,我是从 Kafka 主题中阅读它们的。从 Kafka 读取它们后,我有一个这样的数组:

val schemaArray: Array[String] = Array(
  """{"name":"MyObj","type":"record","fields":[...]}""",
  """{"name":"MyOtherObj","type":"record","fields":[...]}"""
)

Scala 顺便说一句道歉,但这就是我得到的。

无论如何,您需要创建一个解析器,然后为每个模式解析它并创建 readers 和编写器,并将它们保存到地图:

val parser = new Schema.Parser()
val schemas = Map(schemaArray.map{s => parser.parse(s)}.map(s => (s.getName, s)):_*)
val readers = schemas.map(s => (s._1, new GenericDatumReader[GenericRecord](s._2)))
val writers = schemas.map(s => (s._1, new GenericDatumWriter[GenericRecord](s._2)))
var decoder: BinaryDecoder = null

我在解析实际记录之前完成了所有这些工作——这只是为了配置解析器。然后,要解码单个记录,我会这样做:

val byteArray: Array[Byte] = ... // <-- Avro encoded record
val schemaName: String = ... // <-- name of the Avro schema

val reader = readers.get(schemaName).get

decoder = DecoderFactory.get.binaryDecoder(byteArray, decoder)
val record = reader.read(null, decoder)