updateStateByKey,noClassDefFoundError

updateStateByKey, noClassDefFoundError

我在使用 updateStateByKey() 函数时遇到问题。我有以下简单代码(根据书本编写:"Learning Spark - Lighting Fast Data Analysis"):

object hello {
  def updateStateFunction(newValues: Seq[Int], runningCount: Option[Int]): Option[Int] = {
    Some(runningCount.getOrElse(0) + newValues.size)
  }


  def main(args: Array[String]) {
    val conf = new SparkConf().setMaster("local[5]").setAppName("AndrzejApp")
    val ssc = new StreamingContext(conf, Seconds(4))
    ssc.checkpoint("/")

    val lines7 = ssc.socketTextStream("localhost", 9997)
    val keyValueLine7 = lines7.map(line => (line.split(" ")(0), line.split(" ")(1).toInt))


    val statefullStream = keyValueLine7.updateStateByKey(updateStateFunction _)
    ssc.start()
    ssc.awaitTermination()
  }

}

我的build.sbt是:

name := "stream-correlator-spark"

version := "1.0"

scalaVersion := "2.11.4"

libraryDependencies ++= Seq(
  "org.apache.spark" %% "spark-core" % "1.3.1" % "provided",
  "org.apache.spark" %% "spark-streaming" % "1.3.1" % "provided"
)

当我使用 sbt assembly 命令构建它时,一切正常。当我在本地模式下在 spark 集群上 运行 时出现错误:

线程中的异常 "main" java.lang.NoClassDefFoundError: org/apache/spark/streaming/dstream/DStream$ 在你好$.main(helo.scala:25) ...

第 25 行是:

val statefullStream = keyValueLine7.updateStateByKey(updateStateFunction _)

我觉得这可能是一些兼容版本问题,但我不知道可能是什么原因以及如何解决这个问题。

非常感谢您的帮助!

当你在 SBT 中写 "provided" 时,这意味着你的库是由环境提供的,不需要包含在包中。 尝试从 "spark-streaming" 库中删除 "provided" 标记。

当您需要将您的应用程序提交到 spark 集群时,您可以添加 "provided" 到 运行。使用 "provided" 的好处是结果 fat jar 将不包含来自提供的依赖项的 类,与没有 "provided" 相比,这将产生一个更小的 fat jar。在我的例子中,结果 jar 将在没有 "provided" 的情况下约为 90M,然后在 "provided".

的情况下缩小到 30+M