为什么 AWS EMR 上的 Spark 不从应用程序 fat jar 加载 class?

Why Spark on AWS EMR doesn't load class from application fat jar?

我的 spark 应用程序无法在 AWS EMR 集群上运行。我注意到这是因为一些 类 是从 EMR 设置的路径加载的,而不是从我的应用程序 jar 加载的。例如

java.lang.NoSuchMethodError: org.apache.avro.Schema$Field.<init>(Ljava/lang/String;Lorg/apache/avro/Schema;Ljava/lang/String;Ljava/lang/Object;)V
        at com.sksamuel.avro4s.SchemaFor$.fieldBuilder(SchemaFor.scala:424)
        at com.sksamuel.avro4s.SchemaFor$.fieldBuilder(SchemaFor.scala:406)

这里org.apache.avro.Schema是从"jar:file:/usr/lib/spark/jars/avro-1.7.7.jar!/org/apache/avro/Schema.class"[加载的=25=]

com.sksamuel.avro4s 取决于 avro 1.8.1。我的应用程序构建为 fat jar 并具有 avro 1.8.1。为什么没有加载?而不是从 EMR 集类路径中选择 1.7.7。

这只是一个例子。我在我的应用程序中包含的其他库也看到了同样的情况。可能是 Spark 依赖于 1.7.7 并且在包含其他依赖项时我不得不遮蔽。但是为什么我的应用程序 jar 中包含的 类 没有首先加载?

经过一番阅读后,我意识到这就是 class 加载在 Spark 中的工作方式。有一个钩子可以改变这种行为spark.executor.userClassPathFirst。当我尝试并将其标记为实验性时,它并没有完全起作用。我想最好的方法是隐藏依赖关系。考虑到 Spark 及其组件拉动的库数量,这可能与复杂的 Spark 应用程序有很大差异。

我和你有同样的例外。基于 ,我能够按照您的建议通过隐藏 avro 依赖项来解决此异常:

assemblyShadeRules in assembly := Seq( ShadeRule.rename("org.apache.avro.**" -> "latest_avro.@1").inAll )

如果有帮助,这是我的完整 build.sbt(除了项目信息):

val sparkVersion = "2.1.0"
val confluentVersion = "3.2.1"

resolvers += "Confluent" at "http://packages.confluent.io/maven"

libraryDependencies ++= Seq(
  "org.scala-lang" % "scala-library" % scalaVersion.value % "provided",
  "org.scala-lang" % "scala-reflect" % scalaVersion.value % "provided",
  "org.apache.spark" %% "spark-streaming" % sparkVersion % "provided",
  "org.apache.spark" % "spark-streaming-kafka-0-10_2.11" % sparkVersion,
  "org.apache.spark" %% "spark-sql" % sparkVersion % "provided" excludeAll ExclusionRule(organization = "org.scala-lang"),
  "org.apache.avro" % "avro" % "1.8.1" % "provided",
  "com.databricks" %% "spark-avro" % "3.2.0",
  "com.sksamuel.avro4s" %% "avro4s-core" % "1.6.4",
  "io.confluent" % "kafka-avro-serializer" % confluentVersion
)

logBuffered in Test := false

assemblyShadeRules in assembly := Seq(
  ShadeRule.rename("shapeless.**" -> "new_shapeless.@1").inAll,
  ShadeRule.rename("org.apache.avro.**" -> "latest_avro.@1").inAll
)

assemblyMergeStrategy in assembly := {
  case PathList("META-INF", xs @ _*) => MergeStrategy.discard
  case x =>
    val oldStrategy = (assemblyMergeStrategy in assembly).value
    oldStrategy(x)
}