Kafka Streams 计算抛出异常

Kafka Streams count throwing Exception

我是 kafka 流的新手,我一直在使用 ktable 。我正在尝试使用 ktable 检测 kafka 流中的重复事件并删除重复事件。作为初始步骤,我尝试构建一个简单的 Ktable,但它抛出错误。

创建ktable的代码如下所示。

        AvroDeserializer  avroDeserializer = new AvroDeserializer();

        AvroSerializer avroSerializer = new AvroSerializer();

        Serde<GenericRecord> keySerde =  Serdes.serdeFrom(avroSerializer, avroDeserializer);
       
        StreamsBuilder builder = new StreamsBuilder();


        KStream<String, GenericRecord> records = builder.stream(topics, Consumed.with(Serdes.String(), keySerde));

        records.map((key, value) -> KeyValue.pair(String.valueOf(value.get("eventId")), value))
                .map((windowedUserId, count) -> new KeyValue<>(windowedUserId, count))
                .groupByKey()
                .count(Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("store")
                        .withKeySerde(Serdes.String())
                        .withValueSerde(Serdes.Long()))
                .toStream()
                .to("topic",Produced.with(Serdes.String(),Serdes.Long()) );

但是每次我 运行 这都会抛出以下错误。

2021-06-22 20:29:33.247 ERROR 14680 --- [-StreamThread-1] org.apache.kafka.streams.KafkaStreams    : stream-client [test-app-36b983af-8ed0-4a6b-be17-7a9ada214774] Encountered the following exception during processing and the registered exception handler opted to SHUTDOWN_CLIENT. The streams client is going to shut down now. 

org.apache.kafka.streams.errors.StreamsException: ClassCastException while producing data to topic test-app-store-repartition. A serializer (key: org.apache.kafka.common.serialization.ByteArraySerializer / value: org.apache.kafka.common.serialization.ByteArraySerializer) is not compatible to the actual key or value type (key type: java.lang.String / value type: org.apache.avro.generic.GenericData$Record). Change the default Serdes in StreamConfig or provide correct Serdes via method parameters (for example if using the DSL, `#to(String topic, Produced<K, V> produced)` with `Produced.keySerde(WindowedSerdes.timeWindowedSerdeFrom(String.class))`).

谁能帮我理解这个问题。

在生成 -repartition 主题时错误来自 .groupByKey(),并且它在您的 StreamsConfig

中使用默认的 Serdes

您需要使用 .groupByKey(Grouped.with(keySerde, valueSerde)) 为其指定 Serdes,或者像错误所说的那样修改 StreamsConfig 中的默认值