卡夫卡流:org.apache.kafka.clients.consumer.ConsumerConfig.addDeserializerToConfig
Kafka Stream: org.apache.kafka.clients.consumer.ConsumerConfig.addDeserializerToConfig
我正在学习 Kafka Streams,我遇到了一个错误,我已经尝试了一些东西但注意到了效果
输入:value_1、value_2、value_3 ..................
public static void main(String[] args) throws InterruptedException {
String host = "127.0.0.1:9092";
String consumer_group = "firstGroup1";
String topic = "test1";
// create properties
Properties properties = new Properties();
properties.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, host);
properties.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, consumer_group);
properties.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.StringSerde.class.getName());
properties.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.StringSerde.class.getName());
// create a topology
StreamsBuilder builder = new StreamsBuilder();
// input topic
KStream<String, String> inputtopic = builder.stream(topic);
// filter the value
KStream<String, String> filtered_stream = inputtopic.filter((k, v) -> ((v.equalsIgnoreCase("value_5")) || (v.equalsIgnoreCase("value_7")) || (v.equalsIgnoreCase("value_9"))));
filtered_stream.foreach((k, v) -> System.out.println(v));
// output topic set
filtered_stream.to("prime_value");
// build a topology
KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), properties);
// start our stream system
kafkaStreams.start();
}
错误信息
1800 [main] INFO org.apache.kafka.streams.processor.internals.assignment.AssignorConfiguration - stream-thread [firstGroup1-d1244e8e-dbc1-4139-8876-ca75cb89c609-StreamThread-1-consumer] Cooperative rebalancing enabled now
1852 [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version: 3.0.0
1852 [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId: 8cb0a5e9d3441962
1852 [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka startTimeMs: 1641673582569
Exception in thread "main" java.lang.NoSuchMethodError: org.apache.kafka.clients.consumer.ConsumerConfig.addDeserializerToConfig(Ljava/util/Map;Lorg/apache/kafka/common/serialization/Deserializer;Lorg/apache/kafka/common/serialization/Deserializer;)Ljava/util/Map;
at org.apache.kafka.streams.processor.internals.StreamThread$InternalConsumerConfig.<init>(StreamThread.java:537)
at org.apache.kafka.streams.processor.internals.StreamThread$InternalConsumerConfig.<init>(StreamThread.java:535)
at org.apache.kafka.streams.processor.internals.StreamThread.<init>(StreamThread.java:527)
at org.apache.kafka.streams.processor.internals.StreamThread.create(StreamThread.java:406)
at org.apache.kafka.streams.KafkaStreams.createAndAddStreamThread(KafkaStreams.java:897)
at org.apache.kafka.streams.KafkaStreams.<init>(KafkaStreams.java:887)
at org.apache.kafka.streams.KafkaStreams.<init>(KafkaStreams.java:783)
at org.apache.kafka.streams.KafkaStreams.<init>(KafkaStreams.java:693)
at com.example.kafkastreams.main(kafkastreams.java:47)
第 47 行:{ KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), properties);}
您的 kafka-clients
和 kafka-streams
版本似乎不匹配 - 它们必须是同一版本。
使用Spring启动时;您不应该为 kafka 依赖项添加版本;引导将引入两个库的正确版本。
我正在学习 Kafka Streams,我遇到了一个错误,我已经尝试了一些东西但注意到了效果
输入:value_1、value_2、value_3 ..................
public static void main(String[] args) throws InterruptedException {
String host = "127.0.0.1:9092";
String consumer_group = "firstGroup1";
String topic = "test1";
// create properties
Properties properties = new Properties();
properties.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, host);
properties.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, consumer_group);
properties.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.StringSerde.class.getName());
properties.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.StringSerde.class.getName());
// create a topology
StreamsBuilder builder = new StreamsBuilder();
// input topic
KStream<String, String> inputtopic = builder.stream(topic);
// filter the value
KStream<String, String> filtered_stream = inputtopic.filter((k, v) -> ((v.equalsIgnoreCase("value_5")) || (v.equalsIgnoreCase("value_7")) || (v.equalsIgnoreCase("value_9"))));
filtered_stream.foreach((k, v) -> System.out.println(v));
// output topic set
filtered_stream.to("prime_value");
// build a topology
KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), properties);
// start our stream system
kafkaStreams.start();
}
错误信息
1800 [main] INFO org.apache.kafka.streams.processor.internals.assignment.AssignorConfiguration - stream-thread [firstGroup1-d1244e8e-dbc1-4139-8876-ca75cb89c609-StreamThread-1-consumer] Cooperative rebalancing enabled now
1852 [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version: 3.0.0
1852 [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId: 8cb0a5e9d3441962
1852 [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka startTimeMs: 1641673582569
Exception in thread "main" java.lang.NoSuchMethodError: org.apache.kafka.clients.consumer.ConsumerConfig.addDeserializerToConfig(Ljava/util/Map;Lorg/apache/kafka/common/serialization/Deserializer;Lorg/apache/kafka/common/serialization/Deserializer;)Ljava/util/Map;
at org.apache.kafka.streams.processor.internals.StreamThread$InternalConsumerConfig.<init>(StreamThread.java:537)
at org.apache.kafka.streams.processor.internals.StreamThread$InternalConsumerConfig.<init>(StreamThread.java:535)
at org.apache.kafka.streams.processor.internals.StreamThread.<init>(StreamThread.java:527)
at org.apache.kafka.streams.processor.internals.StreamThread.create(StreamThread.java:406)
at org.apache.kafka.streams.KafkaStreams.createAndAddStreamThread(KafkaStreams.java:897)
at org.apache.kafka.streams.KafkaStreams.<init>(KafkaStreams.java:887)
at org.apache.kafka.streams.KafkaStreams.<init>(KafkaStreams.java:783)
at org.apache.kafka.streams.KafkaStreams.<init>(KafkaStreams.java:693)
at com.example.kafkastreams.main(kafkastreams.java:47)
第 47 行:{ KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), properties);}
您的 kafka-clients
和 kafka-streams
版本似乎不匹配 - 它们必须是同一版本。
使用Spring启动时;您不应该为 kafka 依赖项添加版本;引导将引入两个库的正确版本。