为 KafkaProducer 执行 spark-submit 时出现 NoClassDefFoundError

NoClassDefFoundError while executing spark-submit for KafkaProducer

我在 Intellij 中使用 Scala 编写了一个 Kafka Producer,并将两个参数作为文件传递。我使用了以下代码。

   package kafkaProducer

    import java.util.Properties

    import org.apache.kafka.clients.producer._
    import org.apache.spark._

    import scala.io.Source


    object kafkaProducerScala extends App {
          val conf = new SparkConf().
        setMaster(args(0)).
        setAppName("kafkaProducerScala")
        val sc = new SparkContext(conf)
         sc.setLogLevel("ERROR")


        val props = new Properties ()
        props.put ("bootstrap.servers", "localhost:9092")
        props.put ("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
        props.put ("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
        val producer = new KafkaProducer[String, String] (props)

        val topic = "KafkaTopics"
               for (line2 <- Source.fromFile (args (2) ).getLines) {
        val c = line2.toInt

        for (line <- Source.fromFile (args (1) ).getLines) {
        val a = line.toInt
        val b = if (a > c) {
        var d = a
        println(d)
        val record = new ProducerRecord[String, String] (topic, d.toString)
        producer.send (record)
                          }
        }
          producer.close ()

      }

      }

以下是 build.sbt 文件

name := "KafkaProducer"

version := "0.1"

scalaVersion := "2.12.7"

libraryDependencies += "org.apache.kafka" %% "kafka" % "2.0.1"
resolvers += Resolver.mavenLocal

// https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients
libraryDependencies += "org.apache.kafka" % "kafka-clients" % "2.0.1"

libraryDependencies += "org.apache.spark" %% "spark-core" % "2.4.0"

我的目标是在 Kafka Consumer 中获取输出。我完全明白了。 然后我为 spark-submit 创建了一个 .jar 文件。

我给出了以下 spark-submit 命令

C:\spark-2.3.1-bin-hadoop2.7\bin>spark-submit --class kafkaProducer.kafkaProducerScala C:\Users\Shaheel\IdeaProjects\KafkaProducer\target\scala-2.12\k
afkaproducer_2.12-0.1.jar local C:\Users\Shaheel\Desktop\demo.txt C:\Users\Shaheel\Desktop\condition.properties

但我收到以下错误

2018-11-28 17:53:58 WARN  NativeCodeLoader:62 - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/kafka/clients/producer/KafkaProducer
        at java.lang.Class.getDeclaredMethods0(Native Method)
        at java.lang.Class.privateGetDeclaredMethods(Unknown Source)
        at java.lang.Class.privateGetMethodRecursive(Unknown Source)
        at java.lang.Class.getMethod0(Unknown Source)
        at java.lang.Class.getMethod(Unknown Source)
        at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:42)
        at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:894)
        at org.apache.spark.deploy.SparkSubmit$.doRunMain(SparkSubmit.scala:198)
        at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:228)
        at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:137)
        at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.lang.ClassNotFoundException: org.apache.kafka.clients.producer.KafkaProducer
        at java.net.URLClassLoader.findClass(Unknown Source)
        at java.lang.ClassLoader.loadClass(Unknown Source)
        at java.lang.ClassLoader.loadClass(Unknown Source)
        ... 11 more
2018-11-28 17:53:58 INFO  ShutdownHookManager:54 - Shutdown hook called
2018-11-28 17:53:58 INFO  ShutdownHookManager:54 - Deleting directory C:\Users\Shaheel\AppData\Local\Temp\spark-96060579-36cc-4c68-b85e-429acad4fd38

帮我解决一下

sparks 类路径中没有 kafka jar。你要么必须使用 --jars 将它传递给提交,要么将它打包到你自己的 jar (fat-jar)

您使用的 Scala 版本为 2.12.7,而 Spark 仍在使用 Scala 版本 2.11 构建

Spark runs on both Windows and UNIX-like systems (e.g. Linux, Mac OS). It’s easy to run locally on one machine — all you need is to have java installed on your system PATH, or the JAVA_HOME environment variable pointing to a Java installation.

Spark runs on Java 8+, Python 2.7+/3.4+ and R 3.1+. For the Scala API, Spark 2.4.0 uses Scala 2.11. You will need to use a compatible Scala version (2.11.x).

Note that support for Java 7, Python 2.6 and old Hadoop versions before 2.6.5 were removed as of Spark 2.2.0. Support for Scala 2.10 was removed as of 2.3.0.

以上摘录直接摘自 Apache Spark(v2.4.0) 的文档页面。 将 Scala 版本更改为 2.11.12 并将 sbt-assembly 插件添加到 plugins.sbt 文件中。然后,您需要做的就是 运行 项目根目录(src 和 build.sbt 所在的位置)中的命令 sbt assembly 和创建的 jar 将包含 kafka-client 的依赖项

更正后的 build.sbt 如下:

val sparkVersion="2.4.0"

name := "KafkaProducer"

version := "0.1"

scalaVersion := "2.11.12"

libraryDependencies ++= Seq("org.apache.kafka" % "kafka-clients" % "2.0.1",
"org.apache.spark" %% "spark-core" % sparkVersion % Provided)

Apache Spark 的依赖项始终在 Provided 范围内使用,因为 Spark 在运行时将它们提供给代码。