在应用映射函数时获取 Kafka Streams Class 在同一个 Class 上抛出异常
Getting Kafka Streams Class Cast Exception on the same Class while applying map function
UserRecord.java(由 Maven Avro 插件自动生成)
UserRecord extends SpecificRecordBase implement SpecificRecord
UserRecordSerde.java
UserRecordSerde extends SpecificAvroSerde
application.yml
spring.cloud.stream.bindings.input.destination: userTopic
spring.cloud.stream.bindings.input.consumer.useNativeDecoding: true
spring.cloud.stream.kafka.streams.bindings.input.consumer.valueSerde:UserRecordSerde
spring.cloud.stream.kafka.streams.bindings.input.consumer.keySerde: LongSerdespring.cloud.stream.kafka.streams.binder.configuration.default.key.serde: LongSerde
spring.cloud.stream.kafka.streams.binder.configuration.default.value.serde: SpecificAvroSerde
Class - StreamListener - 原始流在 avro
中带有空键和 UserRecord 对象
@StreamListener
public KStream<Long, ArrayList<UserRecord>> handleUserRecords (@Input KStream<?, UserRecord> userRecordStream) { <br/>
Map<String, Object> serdeConfig = new HashMap();
serdeConfig.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://localhost:8081");
serdeConfig.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, true); <br/>
Serde<ArrayList<UserRecord>> userRecordListSerde = new SpecificAvroSerde();
userRecordListSerde.configure(serdeConfig, false); <br/>
return userRecordStream
.map((key, value) -> new KeyValue(value.getUserID, value)
.groupByKey(Serialized.with(Serdes.Long(), userRecordSerde))
.aggregate(ArrayList::new, Long key, UserRecord value, ArrayList agg ->
{
agg.add(value);
return agg;
}, userRecordListSerde)
.toStream();
}
异常
java.lang.ClassCastException: com.example.UserRecord cannot be cast to com.example.UserRecord
at org.apache.kafka.streams.kstream.internals.KStreamMap$KStreamMapProcessor.process(KStreamMap.java:41)
at org.apache.kafka.streams.processor.internals.ProcessorNode.run(ProcessorNode.java:46)
at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:208)
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:124)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:85)
at org.apache.kafka.streams.kstream.internals.KStreamMap$KStreamMapProcessor.process(KStreamMap.java:42)
at org.apache.kafka.streams.processor.internals.ProcessorNode.run(ProcessorNode.java:46)
at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:208)
你为什么不从配置中完全删除它? spring.cloud.stream.kafka.streams.bindings.input.consumer.valueSerde:UserRecordSerde
并通过默认配置直接使用 SpecificAvroSerde
。
spring.cloud.stream.kafka.streams.binder.configuration.default.value.serde: io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
。
UserRecord.java(由 Maven Avro 插件自动生成)
UserRecord extends SpecificRecordBase implement SpecificRecord
UserRecordSerde.java
UserRecordSerde extends SpecificAvroSerde
application.yml
spring.cloud.stream.bindings.input.destination: userTopic
spring.cloud.stream.bindings.input.consumer.useNativeDecoding: true
spring.cloud.stream.kafka.streams.bindings.input.consumer.valueSerde:UserRecordSerde
spring.cloud.stream.kafka.streams.bindings.input.consumer.keySerde: LongSerdespring.cloud.stream.kafka.streams.binder.configuration.default.key.serde: LongSerde
spring.cloud.stream.kafka.streams.binder.configuration.default.value.serde: SpecificAvroSerde
Class - StreamListener - 原始流在 avro
中带有空键和 UserRecord 对象@StreamListener
public KStream<Long, ArrayList<UserRecord>> handleUserRecords (@Input KStream<?, UserRecord> userRecordStream) { <br/>
Map<String, Object> serdeConfig = new HashMap();
serdeConfig.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://localhost:8081");
serdeConfig.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, true); <br/>
Serde<ArrayList<UserRecord>> userRecordListSerde = new SpecificAvroSerde();
userRecordListSerde.configure(serdeConfig, false); <br/>
return userRecordStream
.map((key, value) -> new KeyValue(value.getUserID, value)
.groupByKey(Serialized.with(Serdes.Long(), userRecordSerde))
.aggregate(ArrayList::new, Long key, UserRecord value, ArrayList agg ->
{
agg.add(value);
return agg;
}, userRecordListSerde)
.toStream();
}
异常
java.lang.ClassCastException: com.example.UserRecord cannot be cast to com.example.UserRecord
at org.apache.kafka.streams.kstream.internals.KStreamMap$KStreamMapProcessor.process(KStreamMap.java:41)
at org.apache.kafka.streams.processor.internals.ProcessorNode.run(ProcessorNode.java:46)
at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:208)
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:124)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:85)
at org.apache.kafka.streams.kstream.internals.KStreamMap$KStreamMapProcessor.process(KStreamMap.java:42)
at org.apache.kafka.streams.processor.internals.ProcessorNode.run(ProcessorNode.java:46)
at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:208)
你为什么不从配置中完全删除它? spring.cloud.stream.kafka.streams.bindings.input.consumer.valueSerde:UserRecordSerde
并通过默认配置直接使用 SpecificAvroSerde
。
spring.cloud.stream.kafka.streams.binder.configuration.default.value.serde: io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
。