为什么 AWS EMR 上的 Spark 不从应用程序 fat jar 加载 class?
Why Spark on AWS EMR doesn't load class from application fat jar?
我的 spark 应用程序无法在 AWS EMR 集群上运行。我注意到这是因为一些 类 是从 EMR 设置的路径加载的,而不是从我的应用程序 jar 加载的。例如
java.lang.NoSuchMethodError: org.apache.avro.Schema$Field.<init>(Ljava/lang/String;Lorg/apache/avro/Schema;Ljava/lang/String;Ljava/lang/Object;)V
at com.sksamuel.avro4s.SchemaFor$.fieldBuilder(SchemaFor.scala:424)
at com.sksamuel.avro4s.SchemaFor$.fieldBuilder(SchemaFor.scala:406)
这里org.apache.avro.Schema是从"jar:file:/usr/lib/spark/jars/avro-1.7.7.jar!/org/apache/avro/Schema.class"[加载的=25=]
而 com.sksamuel.avro4s
取决于 avro 1.8.1。我的应用程序构建为 fat jar 并具有 avro 1.8.1。为什么没有加载?而不是从 EMR 集类路径中选择 1.7.7。
这只是一个例子。我在我的应用程序中包含的其他库也看到了同样的情况。可能是 Spark 依赖于 1.7.7 并且在包含其他依赖项时我不得不遮蔽。但是为什么我的应用程序 jar 中包含的 类 没有首先加载?
经过一番阅读后,我意识到这就是 class 加载在 Spark 中的工作方式。有一个钩子可以改变这种行为spark.executor.userClassPathFirst。当我尝试并将其标记为实验性时,它并没有完全起作用。我想最好的方法是隐藏依赖关系。考虑到 Spark 及其组件拉动的库数量,这可能与复杂的 Spark 应用程序有很大差异。
我和你有同样的例外。基于 ,我能够按照您的建议通过隐藏 avro 依赖项来解决此异常:
assemblyShadeRules in assembly := Seq(
ShadeRule.rename("org.apache.avro.**" -> "latest_avro.@1").inAll
)
如果有帮助,这是我的完整 build.sbt(除了项目信息):
val sparkVersion = "2.1.0"
val confluentVersion = "3.2.1"
resolvers += "Confluent" at "http://packages.confluent.io/maven"
libraryDependencies ++= Seq(
"org.scala-lang" % "scala-library" % scalaVersion.value % "provided",
"org.scala-lang" % "scala-reflect" % scalaVersion.value % "provided",
"org.apache.spark" %% "spark-streaming" % sparkVersion % "provided",
"org.apache.spark" % "spark-streaming-kafka-0-10_2.11" % sparkVersion,
"org.apache.spark" %% "spark-sql" % sparkVersion % "provided" excludeAll ExclusionRule(organization = "org.scala-lang"),
"org.apache.avro" % "avro" % "1.8.1" % "provided",
"com.databricks" %% "spark-avro" % "3.2.0",
"com.sksamuel.avro4s" %% "avro4s-core" % "1.6.4",
"io.confluent" % "kafka-avro-serializer" % confluentVersion
)
logBuffered in Test := false
assemblyShadeRules in assembly := Seq(
ShadeRule.rename("shapeless.**" -> "new_shapeless.@1").inAll,
ShadeRule.rename("org.apache.avro.**" -> "latest_avro.@1").inAll
)
assemblyMergeStrategy in assembly := {
case PathList("META-INF", xs @ _*) => MergeStrategy.discard
case x =>
val oldStrategy = (assemblyMergeStrategy in assembly).value
oldStrategy(x)
}
我的 spark 应用程序无法在 AWS EMR 集群上运行。我注意到这是因为一些 类 是从 EMR 设置的路径加载的,而不是从我的应用程序 jar 加载的。例如
java.lang.NoSuchMethodError: org.apache.avro.Schema$Field.<init>(Ljava/lang/String;Lorg/apache/avro/Schema;Ljava/lang/String;Ljava/lang/Object;)V
at com.sksamuel.avro4s.SchemaFor$.fieldBuilder(SchemaFor.scala:424)
at com.sksamuel.avro4s.SchemaFor$.fieldBuilder(SchemaFor.scala:406)
这里org.apache.avro.Schema是从"jar:file:/usr/lib/spark/jars/avro-1.7.7.jar!/org/apache/avro/Schema.class"[加载的=25=]
而 com.sksamuel.avro4s
取决于 avro 1.8.1。我的应用程序构建为 fat jar 并具有 avro 1.8.1。为什么没有加载?而不是从 EMR 集类路径中选择 1.7.7。
这只是一个例子。我在我的应用程序中包含的其他库也看到了同样的情况。可能是 Spark 依赖于 1.7.7 并且在包含其他依赖项时我不得不遮蔽。但是为什么我的应用程序 jar 中包含的 类 没有首先加载?
经过一番阅读后,我意识到这就是 class 加载在 Spark 中的工作方式。有一个钩子可以改变这种行为spark.executor.userClassPathFirst。当我尝试并将其标记为实验性时,它并没有完全起作用。我想最好的方法是隐藏依赖关系。考虑到 Spark 及其组件拉动的库数量,这可能与复杂的 Spark 应用程序有很大差异。
我和你有同样的例外。基于
assemblyShadeRules in assembly := Seq(
ShadeRule.rename("org.apache.avro.**" -> "latest_avro.@1").inAll
)
如果有帮助,这是我的完整 build.sbt(除了项目信息):
val sparkVersion = "2.1.0"
val confluentVersion = "3.2.1"
resolvers += "Confluent" at "http://packages.confluent.io/maven"
libraryDependencies ++= Seq(
"org.scala-lang" % "scala-library" % scalaVersion.value % "provided",
"org.scala-lang" % "scala-reflect" % scalaVersion.value % "provided",
"org.apache.spark" %% "spark-streaming" % sparkVersion % "provided",
"org.apache.spark" % "spark-streaming-kafka-0-10_2.11" % sparkVersion,
"org.apache.spark" %% "spark-sql" % sparkVersion % "provided" excludeAll ExclusionRule(organization = "org.scala-lang"),
"org.apache.avro" % "avro" % "1.8.1" % "provided",
"com.databricks" %% "spark-avro" % "3.2.0",
"com.sksamuel.avro4s" %% "avro4s-core" % "1.6.4",
"io.confluent" % "kafka-avro-serializer" % confluentVersion
)
logBuffered in Test := false
assemblyShadeRules in assembly := Seq(
ShadeRule.rename("shapeless.**" -> "new_shapeless.@1").inAll,
ShadeRule.rename("org.apache.avro.**" -> "latest_avro.@1").inAll
)
assemblyMergeStrategy in assembly := {
case PathList("META-INF", xs @ _*) => MergeStrategy.discard
case x =>
val oldStrategy = (assemblyMergeStrategy in assembly).value
oldStrategy(x)
}