Kafka:ClassCastException:class org.apache.avro.generic.GenericData$Record 无法转换为 class
Kafka : ClassCastException: class org.apache.avro.generic.GenericData$Record cannot be cast to class
我在尝试将 record.value() 转换为 java 对象时在消费者中遇到此异常:
ClassCastException: class org.apache.avro.generic.GenericData$Record cannot be cast to class [...].PublicActivityRecord (org.apache.avro.generic.GenericData$Record and [...].PublicActivityRecord are in unnamed module of loader 'app')
生产者 发送 java 对象,这是一个名为 PublicActivityRecord
的用户定义类型,如下所示:
KafkaProducer<String, PublicActivityRecord> producer = new KafkaProducer<>(createKafkaProperties());
[...]
this.producer.send(new ProducerRecord<String, PublicActivityRecord>(myTopic, activityRecord));
this.producer.flush();
此时我可以在调试模式下看到 ProducerRecord
的值确实是 PublicActivityRecord
.
类型
在注册表服务器上,我可以在日志中看到生产者发送模式的 POST 请求:
Registering new schema: subject DEV-INF_9325_activityRecord_01-value, version null, id null, type null, schema size 7294 (io.confluent.kafka.schemaregistry.rest.resources.SubjectVersionsResource:262)
[2022-01-28 07:01:35,575] INFO 192.168.36.30 - - [28/janv./2022:06:01:34 +0000] "POST /subjects/DEV-INF_9325_activityRecord_01-value/versions HTTP/1.1" 200 8 "-" "Java/11.0.2" POSTsT (io.confluent.rest-utils.requests:62)
消费者方面:
protected KafkaConsumer<String, PublicActivityRecord> consumer;
[...]
consumer = new KafkaConsumer<>(consumerProperties);
consumer.subscribe(Stream.of(kafkaConfig.getTopicActivityRecord()).collect(Collectors.toList()));
final ConsumerRecords<String, PublicActivityRecord> records = consumer.poll(duration);
records.forEach(record -> {
[...]
PublicActivityRecord activityRecord = record.value();
此处发生 ClassCastException。
在调试模式下,我可以看到 record.value
确实是 GenericData$Record
类型。并且不能转换为 PublicActivityRecord。
serializer/deserilizer键值相同:
key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
value.deserializer=io.confluent.kafka.serializers.KafkaAvroDeserializer
并且在 schema-registry 日志中,我可以看到消费者的 GET 请求:
"GET /schemas/ids/3?fetchMaxId=false HTTP/1.1" 200 8447 "-" "Java/11.0.7" GETsT (io.confluent.rest-utils.requests:62)
所以我检查过:
- 生产者发送一条我自己类型的消息
PublicActivityRecord
- kafka broker 收到消息
- 生产者将模式发布到模式注册表
- 消息被消费者拿走
- 模式是由消费者从模式注册表中获取的
- 消息的值是意外的
GenericData$Record
这导致我的结果是我的消费者出了问题。
所以问题是:为什么消费者得到 GenericData
记录而不是预期的 PublicActivityRecord
?
任何线索将不胜感激!
默认情况下,只返回一般记录。您需要设置
value.deserializer.specific.avro.reader=true
我在尝试将 record.value() 转换为 java 对象时在消费者中遇到此异常:
ClassCastException: class org.apache.avro.generic.GenericData$Record cannot be cast to class [...].PublicActivityRecord (org.apache.avro.generic.GenericData$Record and [...].PublicActivityRecord are in unnamed module of loader 'app')
生产者 发送 java 对象,这是一个名为 PublicActivityRecord
的用户定义类型,如下所示:
KafkaProducer<String, PublicActivityRecord> producer = new KafkaProducer<>(createKafkaProperties());
[...]
this.producer.send(new ProducerRecord<String, PublicActivityRecord>(myTopic, activityRecord));
this.producer.flush();
此时我可以在调试模式下看到 ProducerRecord
的值确实是 PublicActivityRecord
.
在注册表服务器上,我可以在日志中看到生产者发送模式的 POST 请求:
Registering new schema: subject DEV-INF_9325_activityRecord_01-value, version null, id null, type null, schema size 7294 (io.confluent.kafka.schemaregistry.rest.resources.SubjectVersionsResource:262)
[2022-01-28 07:01:35,575] INFO 192.168.36.30 - - [28/janv./2022:06:01:34 +0000] "POST /subjects/DEV-INF_9325_activityRecord_01-value/versions HTTP/1.1" 200 8 "-" "Java/11.0.2" POSTsT (io.confluent.rest-utils.requests:62)
消费者方面:
protected KafkaConsumer<String, PublicActivityRecord> consumer;
[...]
consumer = new KafkaConsumer<>(consumerProperties);
consumer.subscribe(Stream.of(kafkaConfig.getTopicActivityRecord()).collect(Collectors.toList()));
final ConsumerRecords<String, PublicActivityRecord> records = consumer.poll(duration);
records.forEach(record -> {
[...]
PublicActivityRecord activityRecord = record.value();
此处发生 ClassCastException。
在调试模式下,我可以看到 record.value
确实是 GenericData$Record
类型。并且不能转换为 PublicActivityRecord。
serializer/deserilizer键值相同:
key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
value.deserializer=io.confluent.kafka.serializers.KafkaAvroDeserializer
并且在 schema-registry 日志中,我可以看到消费者的 GET 请求:
"GET /schemas/ids/3?fetchMaxId=false HTTP/1.1" 200 8447 "-" "Java/11.0.7" GETsT (io.confluent.rest-utils.requests:62)
所以我检查过:
- 生产者发送一条我自己类型的消息
PublicActivityRecord
- kafka broker 收到消息
- 生产者将模式发布到模式注册表
- 消息被消费者拿走
- 模式是由消费者从模式注册表中获取的
- 消息的值是意外的
GenericData$Record
这导致我的结果是我的消费者出了问题。
所以问题是:为什么消费者得到 GenericData
记录而不是预期的 PublicActivityRecord
?
任何线索将不胜感激!
默认情况下,只返回一般记录。您需要设置
value.deserializer.specific.avro.reader=true