Kafka Streams 计算抛出异常
Kafka Streams count throwing Exception
我是 kafka 流的新手,我一直在使用 ktable 。我正在尝试使用 ktable 检测 kafka 流中的重复事件并删除重复事件。作为初始步骤,我尝试构建一个简单的 Ktable,但它抛出错误。
创建ktable的代码如下所示。
AvroDeserializer avroDeserializer = new AvroDeserializer();
AvroSerializer avroSerializer = new AvroSerializer();
Serde<GenericRecord> keySerde = Serdes.serdeFrom(avroSerializer, avroDeserializer);
StreamsBuilder builder = new StreamsBuilder();
KStream<String, GenericRecord> records = builder.stream(topics, Consumed.with(Serdes.String(), keySerde));
records.map((key, value) -> KeyValue.pair(String.valueOf(value.get("eventId")), value))
.map((windowedUserId, count) -> new KeyValue<>(windowedUserId, count))
.groupByKey()
.count(Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("store")
.withKeySerde(Serdes.String())
.withValueSerde(Serdes.Long()))
.toStream()
.to("topic",Produced.with(Serdes.String(),Serdes.Long()) );
但是每次我 运行 这都会抛出以下错误。
2021-06-22 20:29:33.247 ERROR 14680 --- [-StreamThread-1] org.apache.kafka.streams.KafkaStreams : stream-client [test-app-36b983af-8ed0-4a6b-be17-7a9ada214774] Encountered the following exception during processing and the registered exception handler opted to SHUTDOWN_CLIENT. The streams client is going to shut down now.
org.apache.kafka.streams.errors.StreamsException: ClassCastException while producing data to topic test-app-store-repartition. A serializer (key: org.apache.kafka.common.serialization.ByteArraySerializer / value: org.apache.kafka.common.serialization.ByteArraySerializer) is not compatible to the actual key or value type (key type: java.lang.String / value type: org.apache.avro.generic.GenericData$Record). Change the default Serdes in StreamConfig or provide correct Serdes via method parameters (for example if using the DSL, `#to(String topic, Produced<K, V> produced)` with `Produced.keySerde(WindowedSerdes.timeWindowedSerdeFrom(String.class))`).
谁能帮我理解这个问题。
在生成 -repartition
主题时错误来自 .groupByKey()
,并且它在您的 StreamsConfig
中使用默认的 Serdes
您需要使用 .groupByKey(Grouped.with(keySerde, valueSerde))
为其指定 Serdes,或者像错误所说的那样修改 StreamsConfig 中的默认值
我是 kafka 流的新手,我一直在使用 ktable 。我正在尝试使用 ktable 检测 kafka 流中的重复事件并删除重复事件。作为初始步骤,我尝试构建一个简单的 Ktable,但它抛出错误。
创建ktable的代码如下所示。
AvroDeserializer avroDeserializer = new AvroDeserializer();
AvroSerializer avroSerializer = new AvroSerializer();
Serde<GenericRecord> keySerde = Serdes.serdeFrom(avroSerializer, avroDeserializer);
StreamsBuilder builder = new StreamsBuilder();
KStream<String, GenericRecord> records = builder.stream(topics, Consumed.with(Serdes.String(), keySerde));
records.map((key, value) -> KeyValue.pair(String.valueOf(value.get("eventId")), value))
.map((windowedUserId, count) -> new KeyValue<>(windowedUserId, count))
.groupByKey()
.count(Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("store")
.withKeySerde(Serdes.String())
.withValueSerde(Serdes.Long()))
.toStream()
.to("topic",Produced.with(Serdes.String(),Serdes.Long()) );
但是每次我 运行 这都会抛出以下错误。
2021-06-22 20:29:33.247 ERROR 14680 --- [-StreamThread-1] org.apache.kafka.streams.KafkaStreams : stream-client [test-app-36b983af-8ed0-4a6b-be17-7a9ada214774] Encountered the following exception during processing and the registered exception handler opted to SHUTDOWN_CLIENT. The streams client is going to shut down now.
org.apache.kafka.streams.errors.StreamsException: ClassCastException while producing data to topic test-app-store-repartition. A serializer (key: org.apache.kafka.common.serialization.ByteArraySerializer / value: org.apache.kafka.common.serialization.ByteArraySerializer) is not compatible to the actual key or value type (key type: java.lang.String / value type: org.apache.avro.generic.GenericData$Record). Change the default Serdes in StreamConfig or provide correct Serdes via method parameters (for example if using the DSL, `#to(String topic, Produced<K, V> produced)` with `Produced.keySerde(WindowedSerdes.timeWindowedSerdeFrom(String.class))`).
谁能帮我理解这个问题。
在生成 -repartition
主题时错误来自 .groupByKey()
,并且它在您的 StreamsConfig
您需要使用 .groupByKey(Grouped.with(keySerde, valueSerde))
为其指定 Serdes,或者像错误所说的那样修改 StreamsConfig 中的默认值