在应用映射函数时获取 Kafka Streams Class 在同一个 Class 上抛出异常

Getting Kafka Streams Class Cast Exception on the same Class while applying map function

UserRecord.java(由 Maven Avro 插件自动生成)

UserRecord extends SpecificRecordBase implement SpecificRecord

UserRecordSerde.java

UserRecordSerde extends SpecificAvroSerde

application.yml

spring.cloud.stream.bindings.input.destination: userTopic
spring.cloud.stream.bindings.input.consumer.useNativeDecoding: true
spring.cloud.stream.kafka.streams.bindings.input.consumer.valueSerde:UserRecordSerde
spring.cloud.stream.kafka.streams.bindings.input.consumer.keySerde: LongSerdespring.cloud.stream.kafka.streams.binder.configuration.default.key.serde: LongSerde
spring.cloud.stream.kafka.streams.binder.configuration.default.value.serde: SpecificAvroSerde

Class - StreamListener - 原始流在 avro

中带有空键和 UserRecord 对象
@StreamListener
        public KStream<Long, ArrayList<UserRecord>> handleUserRecords (@Input KStream<?, UserRecord> userRecordStream) { <br/>
        Map<String, Object> serdeConfig = new HashMap();
        serdeConfig.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://localhost:8081");
        serdeConfig.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, true); <br/>
        Serde<ArrayList<UserRecord>> userRecordListSerde = new SpecificAvroSerde();
        userRecordListSerde.configure(serdeConfig, false); <br/>
        return userRecordStream
            .map((key, value) -> new KeyValue(value.getUserID, value)
            .groupByKey(Serialized.with(Serdes.Long(), userRecordSerde))
            .aggregate(ArrayList::new, Long key, UserRecord value, ArrayList agg ->
            {
               agg.add(value);
               return agg;
            }, userRecordListSerde)
        .toStream();
    }

异常

java.lang.ClassCastException: com.example.UserRecord cannot be cast to com.example.UserRecord
    at org.apache.kafka.streams.kstream.internals.KStreamMap$KStreamMapProcessor.process(KStreamMap.java:41)
    at org.apache.kafka.streams.processor.internals.ProcessorNode.run(ProcessorNode.java:46)
    at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:208)
    at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:124)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:85)
    at org.apache.kafka.streams.kstream.internals.KStreamMap$KStreamMapProcessor.process(KStreamMap.java:42)
    at org.apache.kafka.streams.processor.internals.ProcessorNode.run(ProcessorNode.java:46)
    at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:208)

你为什么不从配置中完全删除它? spring.cloud.stream.kafka.streams.bindings.input.consumer.valueSerde:UserRecordSerde 并通过默认配置直接使用 SpecificAvroSerdespring.cloud.stream.kafka.streams.binder.configuration.default.value.serde: io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde