Kafka Spark Streaming LocationStrategies java class def not found 异常
Kafka Spark Streaming LocationStrategies java class def not found exception
我正在尝试集成 kafka 消息代理和 spark 并遇到一个问题说
Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/spark/streaming/kafka010/LocationStrategies
下面是java火花代码
package com.test.spell;
import java.util.Arrays;
/**
* Hello world!
*
*/
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.regex.Pattern;
import org.apache.spark.api.java.function.*;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.spark.SparkConf;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaInputDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka010.ConsumerStrategies;
import org.apache.spark.streaming.kafka010.KafkaUtils;
import org.apache.spark.streaming.kafka010.LocationStrategies;
import scala.Tuple2;
public class App
{
private static final Pattern SPACE = Pattern.compile(" ");
public static void main( String[] args )
{
String brokers = "localhost:9092";
String topics = "spark-topic";
// Create context with a 2 seconds batch interval
SparkConf sparkConf = new SparkConf().setAppName("JavaDirectKafkaWordCount");
JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, Durations.seconds(2));
Set<String> topicsSet = new HashSet<>(Arrays.asList(topics.split(",")));
Map<String, Object> kafkaParams = new HashMap<>();
kafkaParams.put("metadata.broker.list", brokers);
// Create direct kafka stream with brokers and topics
JavaInputDStream<ConsumerRecord<String, String>> messages = KafkaUtils.createDirectStream(
jssc,
LocationStrategies.PreferConsistent(),
ConsumerStrategies.Subscribe(topicsSet, kafkaParams));
System.out.println("In programn");
// Get the lines, split them into words, count the words and print
JavaDStream<String> lines = messages.map(new Function<ConsumerRecord<String,String>, String>() {
@Override
public String call(ConsumerRecord<String, String> kafkaRecord) throws Exception {
return kafkaRecord.value();
}
});
JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
@Override
public Iterator<String> call(String line) throws Exception {
System.out.println(line);
return Arrays.asList(line.split(" ")).iterator();
}
});
/* JavaPairDStream<String,Integer> wordCounts = words.mapToPair(new PairFunction<String, String, Integer>() {
@Override
public Tuple2<String, Integer> call(String word) throws Exception {
return new Tuple2<>(word,1);
}
});*/
// wordCounts.print();
// Start the computation
jssc.start();
jssc.awaitTermination();
}
}
下面是我的pom.xml
我试了很多jar文件版本都找不到合适的。
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>3.8.1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.spark-project.spark</groupId>
<artifactId>unused</artifactId>
<version>1.0.0</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>2.10.3</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.8.2-beta</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.1.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.10</artifactId>
<version>0.9.0-incubating</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-10_2.10</artifactId>
<version>2.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka_2.10</artifactId>
<version>1.3.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-streaming -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
<version>2.3.1</version>
<scope>provided</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.10.0.0</version>
</dependency>
</dependencies>
</project>
我是 运行 我的 spark 工作如下:
./bin/spark-submit --class com.test.spell.spark.App \
--master local \
--driver-memory 4g \
--executor-memory 2g \
--executor-cores 1 \
--queue default \
/home/cwadmin/spark_programs/spell/target/spell-0.0.1-SNAPSHOT.jar
感觉上面的问题是jar文件使用错误引起的。有人可以帮我解决这个问题吗?我想知道应该在这里使用哪些正确的 jar 文件。如果有人分享一些关于这些程序的宝贵资源,如 Spark 和 Kafka 的集成,我们将不胜感激。
我已经尝试解决这个问题 2 天了,但无法解决这个问题
提前致谢。
- 首先,您需要使用相同版本的 Spark 依赖项 - 我看到您使用的是 2.1.0 用于 spark-core,2.3.1 用于 spark-streaming,2.0.0 用于 spark-streaming-kafka等
- 其次 - 您需要为这些依赖项使用相同版本的 Scala,并且它应该与用于编译您的 Spark 构建的 Scala 版本相同。
- 第三 - 您不需要明确指定 Kafka 库的依赖项。
- 您需要为您的应用程序构建一个 fat-jar,其中将包含必要的库(除了应标记为
provided
的 spark-core)。最简单的方法是使用 Maven Assembly 插件。
我正在尝试集成 kafka 消息代理和 spark 并遇到一个问题说
Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/spark/streaming/kafka010/LocationStrategies
下面是java火花代码
package com.test.spell;
import java.util.Arrays;
/**
* Hello world!
*
*/
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.regex.Pattern;
import org.apache.spark.api.java.function.*;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.spark.SparkConf;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaInputDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka010.ConsumerStrategies;
import org.apache.spark.streaming.kafka010.KafkaUtils;
import org.apache.spark.streaming.kafka010.LocationStrategies;
import scala.Tuple2;
public class App
{
private static final Pattern SPACE = Pattern.compile(" ");
public static void main( String[] args )
{
String brokers = "localhost:9092";
String topics = "spark-topic";
// Create context with a 2 seconds batch interval
SparkConf sparkConf = new SparkConf().setAppName("JavaDirectKafkaWordCount");
JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, Durations.seconds(2));
Set<String> topicsSet = new HashSet<>(Arrays.asList(topics.split(",")));
Map<String, Object> kafkaParams = new HashMap<>();
kafkaParams.put("metadata.broker.list", brokers);
// Create direct kafka stream with brokers and topics
JavaInputDStream<ConsumerRecord<String, String>> messages = KafkaUtils.createDirectStream(
jssc,
LocationStrategies.PreferConsistent(),
ConsumerStrategies.Subscribe(topicsSet, kafkaParams));
System.out.println("In programn");
// Get the lines, split them into words, count the words and print
JavaDStream<String> lines = messages.map(new Function<ConsumerRecord<String,String>, String>() {
@Override
public String call(ConsumerRecord<String, String> kafkaRecord) throws Exception {
return kafkaRecord.value();
}
});
JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
@Override
public Iterator<String> call(String line) throws Exception {
System.out.println(line);
return Arrays.asList(line.split(" ")).iterator();
}
});
/* JavaPairDStream<String,Integer> wordCounts = words.mapToPair(new PairFunction<String, String, Integer>() {
@Override
public Tuple2<String, Integer> call(String word) throws Exception {
return new Tuple2<>(word,1);
}
});*/
// wordCounts.print();
// Start the computation
jssc.start();
jssc.awaitTermination();
}
}
下面是我的pom.xml 我试了很多jar文件版本都找不到合适的。
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>3.8.1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.spark-project.spark</groupId>
<artifactId>unused</artifactId>
<version>1.0.0</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>2.10.3</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.8.2-beta</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.1.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.10</artifactId>
<version>0.9.0-incubating</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-10_2.10</artifactId>
<version>2.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka_2.10</artifactId>
<version>1.3.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-streaming -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
<version>2.3.1</version>
<scope>provided</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.10.0.0</version>
</dependency>
</dependencies>
</project>
我是 运行 我的 spark 工作如下:
./bin/spark-submit --class com.test.spell.spark.App \
--master local \
--driver-memory 4g \
--executor-memory 2g \
--executor-cores 1 \
--queue default \
/home/cwadmin/spark_programs/spell/target/spell-0.0.1-SNAPSHOT.jar
感觉上面的问题是jar文件使用错误引起的。有人可以帮我解决这个问题吗?我想知道应该在这里使用哪些正确的 jar 文件。如果有人分享一些关于这些程序的宝贵资源,如 Spark 和 Kafka 的集成,我们将不胜感激。
我已经尝试解决这个问题 2 天了,但无法解决这个问题
提前致谢。
- 首先,您需要使用相同版本的 Spark 依赖项 - 我看到您使用的是 2.1.0 用于 spark-core,2.3.1 用于 spark-streaming,2.0.0 用于 spark-streaming-kafka等
- 其次 - 您需要为这些依赖项使用相同版本的 Scala,并且它应该与用于编译您的 Spark 构建的 Scala 版本相同。
- 第三 - 您不需要明确指定 Kafka 库的依赖项。
- 您需要为您的应用程序构建一个 fat-jar,其中将包含必要的库(除了应标记为
provided
的 spark-core)。最简单的方法是使用 Maven Assembly 插件。