创建新的 StreamingContext 时出现 AbstractMethodError

AbstractMethodError upon creation of new StreamingContext

我在尝试实例化 Spark Streaming 的新 StreamingContext 时遇到问题。

我正在尝试创建一个新的 StreamingContext,但抛出了 AbstractMethodError 错误。 我一直在调试堆栈跟踪,发现当在 StreamingListenerBus 中创建第三个 Spark ListenerBus 时,应用程序停止并抛出此错误。

下面是我要执行的代码

package platform.etl

import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}

object ClickGeneratorStreaming {
  def main(args: Array[String]): Unit = {

    val conf = new SparkConf().setAppName("ClickGeneratorStreaming").setMaster("local[*]")
    val ssc = new StreamingContext(conf, Seconds(10)

  }
}

这是堆栈跟踪

Exception in thread "main" java.lang.AbstractMethodError
    at org.apache.spark.util.ListenerBus$class.$init$(ListenerBus.scala:35)
    at org.apache.spark.streaming.scheduler.StreamingListenerBus.<init>(StreamingListenerBus.scala:30)
    at org.apache.spark.streaming.scheduler.JobScheduler.<init>(JobScheduler.scala:56)
    at org.apache.spark.streaming.StreamingContext.<init>(StreamingContext.scala:183)
    at org.apache.spark.streaming.StreamingContext.<init>(StreamingContext.scala:84)
    at platform.etl.ClickGeneratorStreaming$.main(ClickGeneratorStreaming.scala:10)
    at platform.etl.ClickGeneratorStreaming.main(ClickGeneratorStreaming.scala)

我的build.sbt

name := "spark"

version := "0.1"

scalaVersion := "2.11.0"

val sparkVersion = "2.3.0.2.6.5.0-292"
val sparkKafkaVersion = "2.3.0"
val argonautVersion = "6.2"

resolvers += "jitpack" at "https://jitpack.io"
resolvers += "horton" at "http://repo.hortonworks.com/content/repositories/releases"
resolvers += "horton2" at "http://repo.hortonworks.com/content/groups/public"


libraryDependencies += "org.apache.hadoop" % "hadoop-aws" % "2.7.3.2.6.5.0-292" excludeAll ExclusionRule(organization = "javax.servlet")
libraryDependencies += "com.amazonaws" % "aws-java-sdk" % "1.7.4"
libraryDependencies += "com.softwaremill.sttp" %% "core" % "1.2.0-RC2"
libraryDependencies += "com.softwaremill.retry" %% "retry" % "0.3.0"
libraryDependencies += "org.scalatest" %% "scalatest" % "3.0.5" % Test
libraryDependencies += "com.github.scopt" %% "scopt" % "3.7.0"
libraryDependencies += "io.argonaut" %% "argonaut" % argonautVersion
libraryDependencies += "io.argonaut" %% "argonaut-monocle" % argonautVersion
libraryDependencies += "com.github.scopt" %% "scopt" % "3.7.0"
libraryDependencies += "com.github.mrpowers" % "spark-fast-tests" % "v2.3.0_0.11.0" % "test"
libraryDependencies += "org.scalactic" %% "scalactic" % "3.0.5"
libraryDependencies += "org.scalatest" %% "scalatest" % "3.0.5" % "test"
libraryDependencies += "com.datastax.spark" %% "spark-cassandra-connector" % "2.3.0"
libraryDependencies += "org.elasticsearch" % "elasticsearch-spark-20_2.11" % "5.2.2"
libraryDependencies += "com.redislabs" % "spark-redis" % "2.3.1-M2"
libraryDependencies +=  "org.scalaj" %% "scalaj-http" % "2.4.1"
libraryDependencies += "org.apache.spark" %% "spark-core" % sparkVersion
libraryDependencies += "org.apache.spark" %% "spark-sql" % sparkVersion 
libraryDependencies += "org.apache.spark" %% "spark-hive" % sparkVersion
libraryDependencies += "org.apache.spark" %% "spark-sql-kafka-0-10" % sparkVersion
libraryDependencies += "org.apache.spark" % "spark-streaming-kafka-0-10_2.11" % sparkVersion


assemblyMergeStrategy in assembly := {
  case PathList("javax", "servlet", xs @ _*)         => MergeStrategy.first
  case PathList("META-INF", xs @ _*) => MergeStrategy.discard
  case "application.conf"            => MergeStrategy.concat
  case "reference.conf"              => MergeStrategy.concat
  case _ => MergeStrategy.first
}

我的plugins.sbt

addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.14.3")

找到问题了。 看起来我忘了在我的 build.sbt 上添加 spark-streaming 依赖项,并且出于某种原因,它找到了一种方法来使用对我的导入的依赖项,使其使用不同版本的 spark-streaming 这与我的 spark 版本不兼容。

为了解决这个问题,我只是在 build.sbt

处添加了一个换行符
libraryDependencies += "org.apache.spark" %% "spark-streaming" % sparkVersion

它现在可以完美运行了。