无法通过 Apicurio 反序列化器从 Avro 格式的 Integer 中反序列化 LocalDate
Cannot deserialize LocalDate from Integer in Avro format by Apicurio deserializer
我的设置:
- 斯特里姆齐·卡夫卡
- Apicurio 架构注册表
- 使用 Avro 格式进行生产和消费
在我尝试使用 LocalDate
.
之前,所有这些都适用于简单类型(String
、long
)
这是我在 Apicurio 中的 Avro 模式:
{
"type" : "record",
"name" : "AvroMessageBean",
"namespace" : "com.company",
"fields" : [ {
"name" : "message",
"type" : "string"
}, {
"name" : "time",
"type" : "long"
}, {
"name": "weekEndingDate",
"type": {
"type": "int",
"logicalType": "date"
}
}]
}
在我的消费者中,我收到 GenericRecord
和 value
,如下所示:
{"message": "Hello (0)!", "time": 1615449454839, "weekEndingDate": 18697}
相关的消费者代码片段是这样的:
import io.apicurio.registry.utils.serde.AbstractKafkaSerDe;
import io.apicurio.registry.utils.serde.AvroKafkaDeserializer;
...
// set deserializer
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class.getName());
props.putIfAbsent(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, AvroKafkaDeserializer.class.getName());
props.putIfAbsent(AbstractKafkaSerDe.REGISTRY_URL_CONFIG_PARAM, REGISTRY_URL);
KafkaConsumer<Long, AvroMessageBean> consumer = new KafkaConsumer<>(props);
...
// consume
while (true) {
ConsumerRecords<Long, AvroMessageBean> records = consumer.poll(Duration.of(1L, ChronoUnit.SECONDS));
for (ConsumerRecord<Long, AvroMessageBean> record : records) {
GenericRecord gRec = record.value();
AvroMessageBean aMsg = (AvroMessageBean) SpecificData.get().deepCopy(AvroMessageBean.SCHEMA$, gRec);
System.out.println("Consumed a message: " + aMsg.getMessage() + " @ " + new Date(aMsg.getTime()));
}
consumer.commitSync();
}
运行时异常:
Exception in thread "main" java.lang.ClassCastException: java.lang.Integer cannot be cast to java.time.LocalDate
at com.ibm.AvroMessageBean.put(AvroMessageBean.java:131)
at org.apache.avro.generic.GenericData.setField(GenericData.java:818)
at org.apache.avro.generic.GenericData.setField(GenericData.java:841)
at org.apache.avro.generic.GenericData.deepCopyRaw(GenericData.java:1286)
at org.apache.avro.generic.GenericData.deepCopy(GenericData.java:1223)
at com.ibm.AvroMessageBeanConsumer.main(AvroMessageBeanConsumer.java:75)
这是AvroMessageBean.SCHEMA$
的内容:
{"type":"record","name":"AvroMessageBean","namespace":"com.company","fields":[{"name":"message","type":"string"},{"name":"time","type":"long"},{"name":"weekEndingDate","type":{"type":"int","logicalType":"date"}}]}
想知道为什么 logicalType
被忽略了?
回答我自己的问题,感谢 Apicirio 开发人员 (Fabian Martinez),我找到了解决方案
消费者有一个特殊的 属性,它直接处理业务对象的反序列化,而不是 GenericRecord
。还有消费者认可 logicalType
并妥善处理:
import io.apicurio.registry.utils.serde.avro.AvroDatumProvider;
...
// Deserialize directly to AvroMessageBean
props.putIfAbsent(AvroDatumProvider.REGISTRY_USE_SPECIFIC_AVRO_READER_CONFIG_PARAM, true);
...
...
AvroMessageBean aMsg = record.value();
// AvroMessageBean aMsg = (AvroMessageBean) SpecificData.get().deepCopy(AvroMessageBean.SCHEMA$, gRec);
System.out.println("Consumed a message: " + aMsg.getMessage() + " @ " + new Date(aMsg.getTime()) + " @ " + aMsg.getWeekEndingDate());
我的设置:
- 斯特里姆齐·卡夫卡
- Apicurio 架构注册表
- 使用 Avro 格式进行生产和消费
在我尝试使用 LocalDate
.
String
、long
)
这是我在 Apicurio 中的 Avro 模式:
{
"type" : "record",
"name" : "AvroMessageBean",
"namespace" : "com.company",
"fields" : [ {
"name" : "message",
"type" : "string"
}, {
"name" : "time",
"type" : "long"
}, {
"name": "weekEndingDate",
"type": {
"type": "int",
"logicalType": "date"
}
}]
}
在我的消费者中,我收到 GenericRecord
和 value
,如下所示:
{"message": "Hello (0)!", "time": 1615449454839, "weekEndingDate": 18697}
相关的消费者代码片段是这样的:
import io.apicurio.registry.utils.serde.AbstractKafkaSerDe;
import io.apicurio.registry.utils.serde.AvroKafkaDeserializer;
...
// set deserializer
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class.getName());
props.putIfAbsent(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, AvroKafkaDeserializer.class.getName());
props.putIfAbsent(AbstractKafkaSerDe.REGISTRY_URL_CONFIG_PARAM, REGISTRY_URL);
KafkaConsumer<Long, AvroMessageBean> consumer = new KafkaConsumer<>(props);
...
// consume
while (true) {
ConsumerRecords<Long, AvroMessageBean> records = consumer.poll(Duration.of(1L, ChronoUnit.SECONDS));
for (ConsumerRecord<Long, AvroMessageBean> record : records) {
GenericRecord gRec = record.value();
AvroMessageBean aMsg = (AvroMessageBean) SpecificData.get().deepCopy(AvroMessageBean.SCHEMA$, gRec);
System.out.println("Consumed a message: " + aMsg.getMessage() + " @ " + new Date(aMsg.getTime()));
}
consumer.commitSync();
}
运行时异常:
Exception in thread "main" java.lang.ClassCastException: java.lang.Integer cannot be cast to java.time.LocalDate
at com.ibm.AvroMessageBean.put(AvroMessageBean.java:131)
at org.apache.avro.generic.GenericData.setField(GenericData.java:818)
at org.apache.avro.generic.GenericData.setField(GenericData.java:841)
at org.apache.avro.generic.GenericData.deepCopyRaw(GenericData.java:1286)
at org.apache.avro.generic.GenericData.deepCopy(GenericData.java:1223)
at com.ibm.AvroMessageBeanConsumer.main(AvroMessageBeanConsumer.java:75)
这是AvroMessageBean.SCHEMA$
的内容:
{"type":"record","name":"AvroMessageBean","namespace":"com.company","fields":[{"name":"message","type":"string"},{"name":"time","type":"long"},{"name":"weekEndingDate","type":{"type":"int","logicalType":"date"}}]}
想知道为什么 logicalType
被忽略了?
回答我自己的问题,感谢 Apicirio 开发人员 (Fabian Martinez),我找到了解决方案
消费者有一个特殊的 属性,它直接处理业务对象的反序列化,而不是 GenericRecord
。还有消费者认可 logicalType
并妥善处理:
import io.apicurio.registry.utils.serde.avro.AvroDatumProvider;
...
// Deserialize directly to AvroMessageBean
props.putIfAbsent(AvroDatumProvider.REGISTRY_USE_SPECIFIC_AVRO_READER_CONFIG_PARAM, true);
...
...
AvroMessageBean aMsg = record.value();
// AvroMessageBean aMsg = (AvroMessageBean) SpecificData.get().deepCopy(AvroMessageBean.SCHEMA$, gRec);
System.out.println("Consumed a message: " + aMsg.getMessage() + " @ " + new Date(aMsg.getTime()) + " @ " + aMsg.getWeekEndingDate());