java.lang.ClassNotFoundException: org.apache.kafka.common.metrics.MetricsContext
java.lang.ClassNotFoundException: org.apache.kafka.common.metrics.MetricsContext
现在正在学习Kafka。根据视频课程,我正在尝试使用 Kafka Streams 构建 Twitter 推文过滤器:
public class StreamsFilterTweet {
public static void main(String[] args) {
// create properties
Properties properties = new Properties();
properties.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
properties.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "demo-kafka-streams");
properties.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.StringSerde.class.getName());
properties.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.StringSerde.class.getName());
// create a topology
StreamsBuilder streamsBuilder = new StreamsBuilder();
// input topic
KStream<String,String> inputTopic = streamsBuilder.stream("twitter_tweets");
KStream<String,String> filteredStream = inputTopic.filter(
(k,jsonValue)->extractUserFollowersInTweets(jsonValue)>10000
// filter for tweets which has a user of over 10000 followers
);
filteredStream.to("important_tweets");
// build topology
KafkaStreams kafkaStreams = new KafkaStreams(streamsBuilder.build(), properties);
// start our streams application
kafkaStreams.start();
}
private static JsonParser jsonParser = new JsonParser();
private static int extractUserFollowersInTweets(String tweetJson) {
try {
return jsonParser.parse(tweetJson)
.getAsJsonObject()
.get("user")
.getAsJsonObject()
.get("followers_count")
.getAsInt();
} catch (NullPointerException e){
return 0;
}
}
}
当我尝试 运行 此代码时,出现此错误:
Exception in thread "main" java.lang.NoClassDefFoundError:
org/apache/kafka/common/metrics/MetricsContext at
tutorial4.StreamsFilterTweet.main(StreamsFilterTweet.java:35) Caused
by: java.lang.ClassNotFoundException:
org.apache.kafka.common.metrics.MetricsContext at
java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:581)
at
java.base/jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(ClassLoaders.java:178)
at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:521)
... 1 more
根据堆栈跟踪,这一行导致了问题:
KafkaStreams kafkaStreams = new KafkaStreams(streamsBuilder.build(), properties);
我正在使用 Maven 并具有以下依赖项:
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.0.0</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<version>1.7.25</version>
</dependency>
<dependency>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.1</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<version>2.6.0</version>
</dependency>
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
<version>2.8.6</version>
</dependency>
我找不到为什么会出现这个问题。我该如何解决这个问题?
我找到了解决这个问题的方法:添加了更多旧版本的 Kafka Stream 库:
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<version>2.0.0</version>
</dependency>
问题不在于 kafka-streams
的版本。
您在依赖项 kafka-streams
和 kafka-clients
之间存在冲突。所以你需要将kafka-clients
升级到2.6.0
版本或者将kafka-streams
降级到2.0.0
版本。
Kafka Streams 依赖于kafka-clients
,所以如果你将其中两个声明为不同版本的依赖,就会出现这样的错误。
最后,正如我在您的代码中看到的那样,您可以从 POM
.
中删除 kafka-clients
依赖项
现在正在学习Kafka。根据视频课程,我正在尝试使用 Kafka Streams 构建 Twitter 推文过滤器:
public class StreamsFilterTweet {
public static void main(String[] args) {
// create properties
Properties properties = new Properties();
properties.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
properties.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "demo-kafka-streams");
properties.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.StringSerde.class.getName());
properties.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.StringSerde.class.getName());
// create a topology
StreamsBuilder streamsBuilder = new StreamsBuilder();
// input topic
KStream<String,String> inputTopic = streamsBuilder.stream("twitter_tweets");
KStream<String,String> filteredStream = inputTopic.filter(
(k,jsonValue)->extractUserFollowersInTweets(jsonValue)>10000
// filter for tweets which has a user of over 10000 followers
);
filteredStream.to("important_tweets");
// build topology
KafkaStreams kafkaStreams = new KafkaStreams(streamsBuilder.build(), properties);
// start our streams application
kafkaStreams.start();
}
private static JsonParser jsonParser = new JsonParser();
private static int extractUserFollowersInTweets(String tweetJson) {
try {
return jsonParser.parse(tweetJson)
.getAsJsonObject()
.get("user")
.getAsJsonObject()
.get("followers_count")
.getAsInt();
} catch (NullPointerException e){
return 0;
}
}
}
当我尝试 运行 此代码时,出现此错误:
Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/kafka/common/metrics/MetricsContext at tutorial4.StreamsFilterTweet.main(StreamsFilterTweet.java:35) Caused by: java.lang.ClassNotFoundException: org.apache.kafka.common.metrics.MetricsContext at java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:581) at java.base/jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(ClassLoaders.java:178) at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:521) ... 1 more
根据堆栈跟踪,这一行导致了问题:
KafkaStreams kafkaStreams = new KafkaStreams(streamsBuilder.build(), properties);
我正在使用 Maven 并具有以下依赖项:
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.0.0</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<version>1.7.25</version>
</dependency>
<dependency>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.1</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<version>2.6.0</version>
</dependency>
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
<version>2.8.6</version>
</dependency>
我找不到为什么会出现这个问题。我该如何解决这个问题?
我找到了解决这个问题的方法:添加了更多旧版本的 Kafka Stream 库:
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<version>2.0.0</version>
</dependency>
问题不在于 kafka-streams
的版本。
您在依赖项 kafka-streams
和 kafka-clients
之间存在冲突。所以你需要将kafka-clients
升级到2.6.0
版本或者将kafka-streams
降级到2.0.0
版本。
Kafka Streams 依赖于kafka-clients
,所以如果你将其中两个声明为不同版本的依赖,就会出现这样的错误。
最后,正如我在您的代码中看到的那样,您可以从 POM
.
kafka-clients
依赖项