卡夫卡流:org.apache.kafka.clients.consumer.ConsumerConfig.addDeserializerToConfig

Kafka Stream: org.apache.kafka.clients.consumer.ConsumerConfig.addDeserializerToConfig

我正在学习 Kafka Streams,我遇到了一个错误,我已经尝试了一些东西但注意到了效果

输入:value_1、value_2、value_3 ..................

public static void main(String[] args) throws InterruptedException {
            String host = "127.0.0.1:9092";
            String consumer_group = "firstGroup1";
            String topic = "test1";
    
    //        create properties
            Properties properties = new Properties();
            properties.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, host);
            properties.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, consumer_group);
            properties.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.StringSerde.class.getName());
            properties.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.StringSerde.class.getName());
    
    
    
            //        create a topology
            StreamsBuilder builder = new StreamsBuilder();
    
    
            //        input topic
            KStream<String, String> inputtopic = builder.stream(topic);
    
            //        filter the value
            KStream<String, String> filtered_stream = inputtopic.filter((k, v) -> ((v.equalsIgnoreCase("value_5")) || (v.equalsIgnoreCase("value_7")) || (v.equalsIgnoreCase("value_9"))));
            filtered_stream.foreach((k, v) -> System.out.println(v));
            //        output topic set
            filtered_stream.to("prime_value");
    
    
    //        build a topology
            KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), properties);
    
    
    //        start our stream system
            kafkaStreams.start();
    
    
        }

错误信息

    1800 [main] INFO org.apache.kafka.streams.processor.internals.assignment.AssignorConfiguration - stream-thread [firstGroup1-d1244e8e-dbc1-4139-8876-ca75cb89c609-StreamThread-1-consumer] Cooperative rebalancing enabled now
1852 [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version: 3.0.0
1852 [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId: 8cb0a5e9d3441962
1852 [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka startTimeMs: 1641673582569
Exception in thread "main" java.lang.NoSuchMethodError: org.apache.kafka.clients.consumer.ConsumerConfig.addDeserializerToConfig(Ljava/util/Map;Lorg/apache/kafka/common/serialization/Deserializer;Lorg/apache/kafka/common/serialization/Deserializer;)Ljava/util/Map;
    at org.apache.kafka.streams.processor.internals.StreamThread$InternalConsumerConfig.<init>(StreamThread.java:537)
    at org.apache.kafka.streams.processor.internals.StreamThread$InternalConsumerConfig.<init>(StreamThread.java:535)
    at org.apache.kafka.streams.processor.internals.StreamThread.<init>(StreamThread.java:527)
    at org.apache.kafka.streams.processor.internals.StreamThread.create(StreamThread.java:406)
    at org.apache.kafka.streams.KafkaStreams.createAndAddStreamThread(KafkaStreams.java:897)
    at org.apache.kafka.streams.KafkaStreams.<init>(KafkaStreams.java:887)
    at org.apache.kafka.streams.KafkaStreams.<init>(KafkaStreams.java:783)
    at org.apache.kafka.streams.KafkaStreams.<init>(KafkaStreams.java:693)
    at com.example.kafkastreams.main(kafkastreams.java:47)

第 47 行:{ KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), properties);}

您的 kafka-clientskafka-streams 版本似乎不匹配 - 它们必须是同一版本。

使用Spring启动时;您不应该为 kafka 依赖项添加版本;引导将引入两个库的正确版本。