为 KafkaProducer 执行 spark-submit 时出现 NoClassDefFoundError
NoClassDefFoundError while executing spark-submit for KafkaProducer
我在 Intellij 中使用 Scala 编写了一个 Kafka Producer,并将两个参数作为文件传递。我使用了以下代码。
package kafkaProducer
import java.util.Properties
import org.apache.kafka.clients.producer._
import org.apache.spark._
import scala.io.Source
object kafkaProducerScala extends App {
val conf = new SparkConf().
setMaster(args(0)).
setAppName("kafkaProducerScala")
val sc = new SparkContext(conf)
sc.setLogLevel("ERROR")
val props = new Properties ()
props.put ("bootstrap.servers", "localhost:9092")
props.put ("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
props.put ("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
val producer = new KafkaProducer[String, String] (props)
val topic = "KafkaTopics"
for (line2 <- Source.fromFile (args (2) ).getLines) {
val c = line2.toInt
for (line <- Source.fromFile (args (1) ).getLines) {
val a = line.toInt
val b = if (a > c) {
var d = a
println(d)
val record = new ProducerRecord[String, String] (topic, d.toString)
producer.send (record)
}
}
producer.close ()
}
}
以下是 build.sbt 文件
name := "KafkaProducer"
version := "0.1"
scalaVersion := "2.12.7"
libraryDependencies += "org.apache.kafka" %% "kafka" % "2.0.1"
resolvers += Resolver.mavenLocal
// https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients
libraryDependencies += "org.apache.kafka" % "kafka-clients" % "2.0.1"
libraryDependencies += "org.apache.spark" %% "spark-core" % "2.4.0"
我的目标是在 Kafka Consumer 中获取输出。我完全明白了。
然后我为 spark-submit 创建了一个 .jar 文件。
我给出了以下 spark-submit 命令
C:\spark-2.3.1-bin-hadoop2.7\bin>spark-submit --class kafkaProducer.kafkaProducerScala C:\Users\Shaheel\IdeaProjects\KafkaProducer\target\scala-2.12\k
afkaproducer_2.12-0.1.jar local C:\Users\Shaheel\Desktop\demo.txt C:\Users\Shaheel\Desktop\condition.properties
但我收到以下错误
2018-11-28 17:53:58 WARN NativeCodeLoader:62 - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/kafka/clients/producer/KafkaProducer
at java.lang.Class.getDeclaredMethods0(Native Method)
at java.lang.Class.privateGetDeclaredMethods(Unknown Source)
at java.lang.Class.privateGetMethodRecursive(Unknown Source)
at java.lang.Class.getMethod0(Unknown Source)
at java.lang.Class.getMethod(Unknown Source)
at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:42)
at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:894)
at org.apache.spark.deploy.SparkSubmit$.doRunMain(SparkSubmit.scala:198)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:228)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:137)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.lang.ClassNotFoundException: org.apache.kafka.clients.producer.KafkaProducer
at java.net.URLClassLoader.findClass(Unknown Source)
at java.lang.ClassLoader.loadClass(Unknown Source)
at java.lang.ClassLoader.loadClass(Unknown Source)
... 11 more
2018-11-28 17:53:58 INFO ShutdownHookManager:54 - Shutdown hook called
2018-11-28 17:53:58 INFO ShutdownHookManager:54 - Deleting directory C:\Users\Shaheel\AppData\Local\Temp\spark-96060579-36cc-4c68-b85e-429acad4fd38
帮我解决一下
sparks 类路径中没有 kafka jar。你要么必须使用 --jars
将它传递给提交,要么将它打包到你自己的 jar (fat-jar)
您使用的 Scala 版本为 2.12.7,而 Spark 仍在使用 Scala 版本 2.11 构建
Spark runs on both Windows and UNIX-like systems (e.g. Linux, Mac OS). It’s easy to run locally on one machine — all you need is to have java installed on your system PATH, or the JAVA_HOME environment variable pointing to a Java installation.
Spark runs on Java 8+, Python 2.7+/3.4+ and R 3.1+. For the Scala API, Spark 2.4.0 uses Scala 2.11. You will need to use a compatible Scala version (2.11.x).
Note that support for Java 7, Python 2.6 and old Hadoop versions before 2.6.5 were removed as of Spark 2.2.0. Support for Scala 2.10 was removed as of 2.3.0.
以上摘录直接摘自 Apache Spark(v2.4.0) 的文档页面。
将 Scala 版本更改为 2.11.12 并将 sbt-assembly 插件添加到 plugins.sbt 文件中。然后,您需要做的就是 运行 项目根目录(src 和 build.sbt 所在的位置)中的命令 sbt assembly
和创建的 jar 将包含 kafka-client 的依赖项
更正后的 build.sbt 如下:
val sparkVersion="2.4.0"
name := "KafkaProducer"
version := "0.1"
scalaVersion := "2.11.12"
libraryDependencies ++= Seq("org.apache.kafka" % "kafka-clients" % "2.0.1",
"org.apache.spark" %% "spark-core" % sparkVersion % Provided)
Apache Spark 的依赖项始终在 Provided 范围内使用,因为 Spark 在运行时将它们提供给代码。
我在 Intellij 中使用 Scala 编写了一个 Kafka Producer,并将两个参数作为文件传递。我使用了以下代码。
package kafkaProducer
import java.util.Properties
import org.apache.kafka.clients.producer._
import org.apache.spark._
import scala.io.Source
object kafkaProducerScala extends App {
val conf = new SparkConf().
setMaster(args(0)).
setAppName("kafkaProducerScala")
val sc = new SparkContext(conf)
sc.setLogLevel("ERROR")
val props = new Properties ()
props.put ("bootstrap.servers", "localhost:9092")
props.put ("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
props.put ("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
val producer = new KafkaProducer[String, String] (props)
val topic = "KafkaTopics"
for (line2 <- Source.fromFile (args (2) ).getLines) {
val c = line2.toInt
for (line <- Source.fromFile (args (1) ).getLines) {
val a = line.toInt
val b = if (a > c) {
var d = a
println(d)
val record = new ProducerRecord[String, String] (topic, d.toString)
producer.send (record)
}
}
producer.close ()
}
}
以下是 build.sbt 文件
name := "KafkaProducer"
version := "0.1"
scalaVersion := "2.12.7"
libraryDependencies += "org.apache.kafka" %% "kafka" % "2.0.1"
resolvers += Resolver.mavenLocal
// https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients
libraryDependencies += "org.apache.kafka" % "kafka-clients" % "2.0.1"
libraryDependencies += "org.apache.spark" %% "spark-core" % "2.4.0"
我的目标是在 Kafka Consumer 中获取输出。我完全明白了。 然后我为 spark-submit 创建了一个 .jar 文件。
我给出了以下 spark-submit 命令
C:\spark-2.3.1-bin-hadoop2.7\bin>spark-submit --class kafkaProducer.kafkaProducerScala C:\Users\Shaheel\IdeaProjects\KafkaProducer\target\scala-2.12\k
afkaproducer_2.12-0.1.jar local C:\Users\Shaheel\Desktop\demo.txt C:\Users\Shaheel\Desktop\condition.properties
但我收到以下错误
2018-11-28 17:53:58 WARN NativeCodeLoader:62 - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/kafka/clients/producer/KafkaProducer
at java.lang.Class.getDeclaredMethods0(Native Method)
at java.lang.Class.privateGetDeclaredMethods(Unknown Source)
at java.lang.Class.privateGetMethodRecursive(Unknown Source)
at java.lang.Class.getMethod0(Unknown Source)
at java.lang.Class.getMethod(Unknown Source)
at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:42)
at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:894)
at org.apache.spark.deploy.SparkSubmit$.doRunMain(SparkSubmit.scala:198)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:228)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:137)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.lang.ClassNotFoundException: org.apache.kafka.clients.producer.KafkaProducer
at java.net.URLClassLoader.findClass(Unknown Source)
at java.lang.ClassLoader.loadClass(Unknown Source)
at java.lang.ClassLoader.loadClass(Unknown Source)
... 11 more
2018-11-28 17:53:58 INFO ShutdownHookManager:54 - Shutdown hook called
2018-11-28 17:53:58 INFO ShutdownHookManager:54 - Deleting directory C:\Users\Shaheel\AppData\Local\Temp\spark-96060579-36cc-4c68-b85e-429acad4fd38
帮我解决一下
sparks 类路径中没有 kafka jar。你要么必须使用 --jars
将它传递给提交,要么将它打包到你自己的 jar (fat-jar)
您使用的 Scala 版本为 2.12.7,而 Spark 仍在使用 Scala 版本 2.11 构建
Spark runs on both Windows and UNIX-like systems (e.g. Linux, Mac OS). It’s easy to run locally on one machine — all you need is to have java installed on your system PATH, or the JAVA_HOME environment variable pointing to a Java installation.
Spark runs on Java 8+, Python 2.7+/3.4+ and R 3.1+. For the Scala API, Spark 2.4.0 uses Scala 2.11. You will need to use a compatible Scala version (2.11.x).
Note that support for Java 7, Python 2.6 and old Hadoop versions before 2.6.5 were removed as of Spark 2.2.0. Support for Scala 2.10 was removed as of 2.3.0.
以上摘录直接摘自 Apache Spark(v2.4.0) 的文档页面。
将 Scala 版本更改为 2.11.12 并将 sbt-assembly 插件添加到 plugins.sbt 文件中。然后,您需要做的就是 运行 项目根目录(src 和 build.sbt 所在的位置)中的命令 sbt assembly
和创建的 jar 将包含 kafka-client 的依赖项
更正后的 build.sbt 如下:
val sparkVersion="2.4.0"
name := "KafkaProducer"
version := "0.1"
scalaVersion := "2.11.12"
libraryDependencies ++= Seq("org.apache.kafka" % "kafka-clients" % "2.0.1",
"org.apache.spark" %% "spark-core" % sparkVersion % Provided)
Apache Spark 的依赖项始终在 Provided 范围内使用,因为 Spark 在运行时将它们提供给代码。