无法通过 Apicurio 反序列化器从 Avro 格式的 Integer 中反序列化 LocalDate

Cannot deserialize LocalDate from Integer in Avro format by Apicurio deserializer

我的设置:

  1. 斯特里姆齐·卡夫卡
  2. Apicurio 架构注册表
  3. 使用 Avro 格式进行生产和消费

在我尝试使用 LocalDate.

之前,所有这些都适用于简单类型(Stringlong

这是我在 Apicurio 中的 Avro 模式:

{
  "type" : "record",
  "name" : "AvroMessageBean",
  "namespace" : "com.company",
  "fields" : [ {
    "name" : "message",
    "type" : "string"
  }, {
    "name" : "time",
    "type" : "long"
  }, {        
    "name": "weekEndingDate",
    "type": {
        "type": "int",
        "logicalType": "date"
    }
  }]
}

在我的消费者中,我收到 GenericRecordvalue,如下所示:

{"message": "Hello (0)!", "time": 1615449454839, "weekEndingDate": 18697}

相关的消费者代码片段是这样的:

import io.apicurio.registry.utils.serde.AbstractKafkaSerDe;
import io.apicurio.registry.utils.serde.AvroKafkaDeserializer;
...
// set deserializer
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class.getName());
props.putIfAbsent(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, AvroKafkaDeserializer.class.getName());
props.putIfAbsent(AbstractKafkaSerDe.REGISTRY_URL_CONFIG_PARAM, REGISTRY_URL);

KafkaConsumer<Long, AvroMessageBean> consumer = new KafkaConsumer<>(props);

...
// consume
while (true) {
    ConsumerRecords<Long, AvroMessageBean> records = consumer.poll(Duration.of(1L, ChronoUnit.SECONDS));
    for (ConsumerRecord<Long, AvroMessageBean> record : records) {
        GenericRecord gRec = record.value();
        AvroMessageBean aMsg = (AvroMessageBean) SpecificData.get().deepCopy(AvroMessageBean.SCHEMA$, gRec);
        System.out.println("Consumed a message: " + aMsg.getMessage() + " @ " + new Date(aMsg.getTime()));
    }
    consumer.commitSync();
}

运行时异常:

Exception in thread "main" java.lang.ClassCastException: java.lang.Integer cannot be cast to java.time.LocalDate
    at com.ibm.AvroMessageBean.put(AvroMessageBean.java:131)
    at org.apache.avro.generic.GenericData.setField(GenericData.java:818)
    at org.apache.avro.generic.GenericData.setField(GenericData.java:841)
    at org.apache.avro.generic.GenericData.deepCopyRaw(GenericData.java:1286)
    at org.apache.avro.generic.GenericData.deepCopy(GenericData.java:1223)
    at com.ibm.AvroMessageBeanConsumer.main(AvroMessageBeanConsumer.java:75)

这是AvroMessageBean.SCHEMA$的内容:

{"type":"record","name":"AvroMessageBean","namespace":"com.company","fields":[{"name":"message","type":"string"},{"name":"time","type":"long"},{"name":"weekEndingDate","type":{"type":"int","logicalType":"date"}}]}

想知道为什么 logicalType 被忽略了?

回答我自己的问题,感谢 Apicirio 开发人员 (Fabian Martinez),我找到了解决方案

消费者有一个特殊的 属性,它直接处理业务对象的反序列化,而不是 GenericRecord。还有消费者认可 logicalType 并妥善处理:

import io.apicurio.registry.utils.serde.avro.AvroDatumProvider;
...
// Deserialize directly to AvroMessageBean
props.putIfAbsent(AvroDatumProvider.REGISTRY_USE_SPECIFIC_AVRO_READER_CONFIG_PARAM, true);
...
...
AvroMessageBean aMsg = record.value();
// AvroMessageBean aMsg = (AvroMessageBean) SpecificData.get().deepCopy(AvroMessageBean.SCHEMA$, gRec);
System.out.println("Consumed a message: " + aMsg.getMessage() + " @ " + new Date(aMsg.getTime()) + " @ " + aMsg.getWeekEndingDate());