序列化程序与 org.apache.avro.util.Utf8 不兼容
Serializer not compatible to org.apache.avro.util.Utf8
我正在尝试使用 kafkastreams 进行聚合,但出现如下错误
这是我正在做的事情:
KGroupedStream<String, Long> countrywiseAmount = ......;
KTable<String, CountSum> countrywiseAverageSum = countrywiseAmount
.aggregate(new Initializer<CountSum>() {
@Override
public CountSum apply() {
return new CountSum();
}
}, new Aggregator<String,Long,CountSum>() {
@Override
public CountSum apply(String country, Long amount, CountSum sumByCountry) {
sumByCountry.setCountry(country.toString());
sumByCountry.setCount(sumByCountry.getCount()+1);
sumByCountry.setSum(sumByCountry.getSum()+amount);
return sumByCountry;
}
}, Materialized.with(stringSerde, countSumSerde));
我得到的错误如下
Caused by: org.apache.kafka.streams.errors.StreamsException: A
serializer (key:
org.apache.kafka.common.serialization.StringSerializer / value:
org.apache.kafka.common.serialization.LongSerializer) is not
compatible to the actual key or value type (key type:
org.apache.avro.util.Utf8 / value type: java.lang.Long). Change the
default Serdes in StreamConfig or provide correct Serdes via method
parameters.
at org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:94)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:201)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:180)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:133)
at org.apache.kafka.streams.kstream.internals.KStreamFilter$KStreamFilterProcessor.process(KStreamFilter.java:43)
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:117)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:201)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:180)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:133)
at org.apache.kafka.streams.kstream.internals.KStreamMap$KStreamMapProcessor.process(KStreamMap.java:42)
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:117)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:201)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:180)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:133)
at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:87)
at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:363)
... 5 more
Caused by: java.lang.ClassCastException: org.apache.avro.util.Utf8 cannot be cast to java.lang.String
at org.apache.kafka.common.serialization.StringSerializer.serialize(StringSerializer.java:28)
at org.apache.kafka.common.serialization.Serializer.serialize(Serializer.java:62)
at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:162)
at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:103)
at org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:89)
有线索吗?
KTable 聚合逻辑没有问题,但是 KGroupedStream<String, Long> countrywiseChkAmount
有问题,在从 KStream<String, AvroModel>
创建它时,创建新的 KeyValue,键应该从 CharSequence 转换为 String。谢谢
我正在尝试使用 kafkastreams 进行聚合,但出现如下错误
这是我正在做的事情:
KGroupedStream<String, Long> countrywiseAmount = ......;
KTable<String, CountSum> countrywiseAverageSum = countrywiseAmount
.aggregate(new Initializer<CountSum>() {
@Override
public CountSum apply() {
return new CountSum();
}
}, new Aggregator<String,Long,CountSum>() {
@Override
public CountSum apply(String country, Long amount, CountSum sumByCountry) {
sumByCountry.setCountry(country.toString());
sumByCountry.setCount(sumByCountry.getCount()+1);
sumByCountry.setSum(sumByCountry.getSum()+amount);
return sumByCountry;
}
}, Materialized.with(stringSerde, countSumSerde));
我得到的错误如下
Caused by: org.apache.kafka.streams.errors.StreamsException: A serializer (key: org.apache.kafka.common.serialization.StringSerializer / value: org.apache.kafka.common.serialization.LongSerializer) is not compatible to the actual key or value type (key type: org.apache.avro.util.Utf8 / value type: java.lang.Long). Change the default Serdes in StreamConfig or provide correct Serdes via method parameters. at org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:94) at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:201) at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:180) at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:133) at org.apache.kafka.streams.kstream.internals.KStreamFilter$KStreamFilterProcessor.process(KStreamFilter.java:43) at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:117) at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:201) at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:180) at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:133) at org.apache.kafka.streams.kstream.internals.KStreamMap$KStreamMapProcessor.process(KStreamMap.java:42) at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:117) at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:201) at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:180) at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:133) at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:87) at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:363) ... 5 more Caused by: java.lang.ClassCastException: org.apache.avro.util.Utf8 cannot be cast to java.lang.String at org.apache.kafka.common.serialization.StringSerializer.serialize(StringSerializer.java:28) at org.apache.kafka.common.serialization.Serializer.serialize(Serializer.java:62) at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:162) at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:103) at org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:89)
有线索吗?
KTable 聚合逻辑没有问题,但是 KGroupedStream<String, Long> countrywiseChkAmount
有问题,在从 KStream<String, AvroModel>
创建它时,创建新的 KeyValue,键应该从 CharSequence 转换为 String。谢谢