解决 Apache Spark 中的依赖问题
Resolving dependency problems in Apache Spark
构建和部署 Spark 应用程序时的常见问题是:
java.lang.ClassNotFoundException
.
object x is not a member of package y
编译错误。
java.lang.NoSuchMethodError
如何解决这些问题?
构建和部署 Spark 应用程序时,所有依赖项都需要兼容的版本。
Scala 版本。所有包都必须使用相同的主要(2.10、2.11、2.12)Scala 版本。
考虑以下(不正确)build.sbt
:
name := "Simple Project"
version := "1.0"
libraryDependencies ++= Seq(
"org.apache.spark" % "spark-core_2.11" % "2.0.1",
"org.apache.spark" % "spark-streaming_2.10" % "2.0.1",
"org.apache.bahir" % "spark-streaming-twitter_2.11" % "2.0.1"
)
我们使用 spark-streaming
用于 Scala 2.10,而其余包用于 Scala 2.11。 有效 文件可能是
name := "Simple Project"
version := "1.0"
libraryDependencies ++= Seq(
"org.apache.spark" % "spark-core_2.11" % "2.0.1",
"org.apache.spark" % "spark-streaming_2.11" % "2.0.1",
"org.apache.bahir" % "spark-streaming-twitter_2.11" % "2.0.1"
)
但最好全局指定版本并使用 %%
(为您附加 scala 版本):
name := "Simple Project"
version := "1.0"
scalaVersion := "2.11.7"
libraryDependencies ++= Seq(
"org.apache.spark" %% "spark-core" % "2.0.1",
"org.apache.spark" %% "spark-streaming" % "2.0.1",
"org.apache.bahir" %% "spark-streaming-twitter" % "2.0.1"
)
在 Maven 中类似:
<project>
<groupId>com.example</groupId>
<artifactId>simple-project</artifactId>
<modelVersion>4.0.0</modelVersion>
<name>Simple Project</name>
<packaging>jar</packaging>
<version>1.0</version>
<properties>
<spark.version>2.0.1</spark.version>
</properties>
<dependencies>
<dependency> <!-- Spark dependency -->
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.bahir</groupId>
<artifactId>spark-streaming-twitter_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
</dependencies>
</project>
Spark 版本 所有包必须使用相同的主要 Spark 版本(1.6、2.0、2.1、...)。
考虑以下(不正确)build.sbt:
name := "Simple Project"
version := "1.0"
libraryDependencies ++= Seq(
"org.apache.spark" % "spark-core_2.11" % "1.6.1",
"org.apache.spark" % "spark-streaming_2.10" % "2.0.1",
"org.apache.bahir" % "spark-streaming-twitter_2.11" % "2.0.1"
)
我们使用 spark-core
1.6,其余组件在 Spark 2.0 中。 有效 文件可能是
name := "Simple Project"
version := "1.0"
libraryDependencies ++= Seq(
"org.apache.spark" % "spark-core_2.11" % "2.0.1",
"org.apache.spark" % "spark-streaming_2.10" % "2.0.1",
"org.apache.bahir" % "spark-streaming-twitter_2.11" % "2.0.1"
)
但最好使用变量
(仍然不正确):
name := "Simple Project"
version := "1.0"
val sparkVersion = "2.0.1"
libraryDependencies ++= Seq(
"org.apache.spark" % "spark-core_2.11" % sparkVersion,
"org.apache.spark" % "spark-streaming_2.10" % sparkVersion,
"org.apache.bahir" % "spark-streaming-twitter_2.11" % sparkVersion
)
在 Maven 中类似:
<project>
<groupId>com.example</groupId>
<artifactId>simple-project</artifactId>
<modelVersion>4.0.0</modelVersion>
<name>Simple Project</name>
<packaging>jar</packaging>
<version>1.0</version>
<properties>
<spark.version>2.0.1</spark.version>
<scala.version>2.11</scala.version>
</properties>
<dependencies>
<dependency> <!-- Spark dependency -->
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_${scala.version}</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_${scala.version}</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.bahir</groupId>
<artifactId>spark-streaming-twitter_${scala.version}</artifactId>
<version>${spark.version}</version>
</dependency>
</dependencies>
</project>
Spark 依赖项中使用的 Spark 版本必须与 Spark 安装的 Spark 版本相匹配。例如,如果您在集群上使用 1.6.1,则必须使用 1.6.1 来构建 jars。不总是接受次要版本不匹配。
用于构建 jar 的 Scala 版本必须与用于构建部署的 Spark 的 Scala 版本相匹配。默认情况下(可下载的二进制文件和默认构建):
- Spark 1.x -> Scala 2.10
- Spark 2.x -> Scala 2.11
如果包含在 fat jar 中,工作节点上应该可以访问其他包。有许多选项,包括:
--jars
spark-submit
参数 - 分发本地 jar
文件。
--packages
spark-submit
的参数 - 从 Maven 存储库中获取依赖项。
在集群节点中提交时,您应该在 --jars
中包含应用程序 jar
。
除了 user7337271 已经给出的非常广泛的答案之外,如果问题是由于缺少外部依赖项造成的,您可以使用您的依赖项构建一个 jar,例如maven assembly plugin
在这种情况下,请确保在您的构建系统中将所有核心 spark 依赖项标记为 "provided",并且如前所述,确保它们与您的运行时 spark 版本相关联。
应用程序的依赖项类 应在启动命令的application-jar 选项中指定。
找到更多详细信息
摘自文档:
application-jar: Path to a bundled jar including your application and
all dependencies. The URL must be globally visible inside of your
cluster, for instance, an hdfs:// path or a file:// path that is
present on all nodes
Apache Spark 的类路径是动态构建的(以适应每个应用程序的用户代码),这使得它容易受到此类问题的影响。 @user7337271 的回答是正确的,但还有一些问题,具体取决于您使用的 集群管理器 ("master")。
首先,一个 Spark 应用程序由这些组件组成(每个组件都是一个单独的 JVM,因此其类路径中可能包含不同的 类):
- Driver:这是 您的 应用程序创建
SparkSession
(或 SparkContext
)并连接到集群执行实际工作的经理
- Cluster Manager:作为集群的"entry point",负责为每个应用分配executor。 Spark 支持几种不同的类型:standalone、YARN 和 Mesos,我们将在下面对其进行描述。
- Executors:这些是集群节点上的进程,执行实际工作(运行ning Spark tasks)
Apache Spark cluster mode overview 中的这张图表描述了它们之间的关系:
现在 - 哪个 类 应该驻留在每个组件中?
这可以通过下图来回答:
让我们慢慢解析:
Spark 代码 是 Spark 的库。它们应该存在于 ALL 三个组件中,因为它们包含让 Spark 执行它们之间通信的粘合剂。顺便说一句 - Spark 作者做出了一个设计决定,将所有组件的代码包含在所有组件中(例如,在驱动程序的执行器中也包含应该只 运行 的代码)以简化这一点 - 所以 Spark 的 "fat jar"(在 1.6 及以下的版本中)或 "archive"(在 2.0 中,详情如下)包含所有组件的必要代码,并且应该在所有组件中可用。
Driver-Only Code 这是用户代码,不包含应在执行器上使用的任何内容,即不在任何转换中使用的代码在 RDD / DataFrame / Dataset 上。这不一定必须与分布式用户代码分开,但可以这样做。
分布式代码这是用驱动程序代码编译的用户代码,但也必须在执行器上执行——实际转换使用的所有内容都必须是包含在这个罐子里。
现在我们明白了,如何我们如何让 类 在每个组件中正确加载,它们应该遵循什么规则?
Spark 代码:正如前面的答案所述,您必须使用相同的 Scala 和 所有组件中的 Spark 个版本。
1.1 在独立 模式下,有一个"pre-existing" Spark 安装,应用程序(驱动程序)可以连接到该安装。这意味着 所有驱动程序必须使用相同的 Spark 版本 运行ning 在主机和执行程序上。
1.2 在YARN / Mesos中,每个应用程序可以使用不同的Spark版本,但同一个应用程序的所有组件必须使用相同的版本。这意味着如果您使用版本 X 来编译和打包驱动程序应用程序,则在启动 SparkSession 时应提供相同的版本(例如,在使用 YARN 时通过 spark.yarn.archive
或 spark.yarn.jars
参数)。您提供的 jars / archive 应该包括所有 Spark 依赖项(包括传递依赖项),并且它会在应用程序启动时由集群管理器发送给每个执行程序。
驱动程序代码:这完全取决于 - 驱动程序代码可以作为一堆 jar 或 "fat jar" 提供,只要它包括所有 Spark 依赖项 + 所有用户代码
分布式代码:除了出现在驱动程序上之外,此代码还必须发送给执行程序(同样,连同其所有传递依赖项) .这是使用 spark.jars
参数完成的。
总而言之,这里是构建和部署 Spark 应用程序的建议方法(在本例中 - 使用 YARN):
- 使用您的分布式代码创建一个库,将其打包为 "regular" jar(带有描述其依赖项的 .pom 文件)和 "fat jar"(包含其所有传递依赖项) ).
- 创建驱动程序应用程序,编译依赖于您的分布式代码库和 Apache Spark(具有特定版本)
- 将驱动程序应用程序打包成一个 fat jar 以部署到驱动程序
- 启动
SparkSession
时,将正确版本的分布式代码作为 spark.jars
参数的值传递
- 将包含下载的 Spark 二进制文件的
lib/
文件夹下的所有 jar 的存档文件(例如 gzip)的位置作为 spark.yarn.archive
的值传递
我觉得这个问题必须解决一个汇编插件。
你需要建立一个胖罐子。
例如在 sbt 中:
- 使用代码
addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.14.0")
添加文件 $PROJECT_ROOT/project/assembly.sbt
- 到build.sbt
added some libraries
libraryDependencies ++= Seq("com.some.company" %% "some-lib" % "1.0.0")`
- 在 sbt 控制台中输入 "assembly",然后部署程序集 jar
如果您需要更多信息,请转至 https://github.com/sbt/sbt-assembly
在项目中添加spark-2.4.0-bin-hadoop2.7\spark-2.4.0-bin-hadoop2.7\jars的所有jar文件。 spark-2.4.0-bin-hadoop2.7 可以从 https://spark.apache.org/downloads.html
下载
我有以下 build.sbt
lazy val root = (project in file(".")).
settings(
name := "spark-samples",
version := "1.0",
scalaVersion := "2.11.12",
mainClass in Compile := Some("StreamingExample")
)
libraryDependencies ++= Seq(
"org.apache.spark" %% "spark-core" % "2.4.0",
"org.apache.spark" %% "spark-streaming" % "2.4.0",
"org.apache.spark" %% "spark-sql" % "2.4.0",
"com.couchbase.client" %% "spark-connector" % "2.2.0"
)
// META-INF discarding
assemblyMergeStrategy in assembly := {
case PathList("META-INF", xs @ _*) => MergeStrategy.discard
case x => MergeStrategy.first
}
我已经使用 sbt assembly 插件为我的应用程序创建了一个 fat jar,但是当 运行 使用 spark-submit 时它失败并显示错误:
java.lang.NoClassDefFoundError: rx/Completable$OnSubscribe
at com.couchbase.spark.connection.CouchbaseConnection.streamClient(CouchbaseConnection.scala:154)
我可以看到 class 存在于我的 fat jar 中:
jar tf target/scala-2.11/spark-samples-assembly-1.0.jar | grep 'Completable$OnSubscribe'
rx/Completable$OnSubscribe.class
不确定我在这里遗漏了什么,有什么线索吗?
构建和部署 Spark 应用程序时的常见问题是:
java.lang.ClassNotFoundException
.object x is not a member of package y
编译错误。java.lang.NoSuchMethodError
如何解决这些问题?
构建和部署 Spark 应用程序时,所有依赖项都需要兼容的版本。
Scala 版本。所有包都必须使用相同的主要(2.10、2.11、2.12)Scala 版本。
考虑以下(不正确)
build.sbt
:name := "Simple Project" version := "1.0" libraryDependencies ++= Seq( "org.apache.spark" % "spark-core_2.11" % "2.0.1", "org.apache.spark" % "spark-streaming_2.10" % "2.0.1", "org.apache.bahir" % "spark-streaming-twitter_2.11" % "2.0.1" )
我们使用
spark-streaming
用于 Scala 2.10,而其余包用于 Scala 2.11。 有效 文件可能是name := "Simple Project" version := "1.0" libraryDependencies ++= Seq( "org.apache.spark" % "spark-core_2.11" % "2.0.1", "org.apache.spark" % "spark-streaming_2.11" % "2.0.1", "org.apache.bahir" % "spark-streaming-twitter_2.11" % "2.0.1" )
但最好全局指定版本并使用
%%
(为您附加 scala 版本):name := "Simple Project" version := "1.0" scalaVersion := "2.11.7" libraryDependencies ++= Seq( "org.apache.spark" %% "spark-core" % "2.0.1", "org.apache.spark" %% "spark-streaming" % "2.0.1", "org.apache.bahir" %% "spark-streaming-twitter" % "2.0.1" )
在 Maven 中类似:
<project>
<groupId>com.example</groupId>
<artifactId>simple-project</artifactId>
<modelVersion>4.0.0</modelVersion>
<name>Simple Project</name>
<packaging>jar</packaging>
<version>1.0</version>
<properties>
<spark.version>2.0.1</spark.version>
</properties>
<dependencies>
<dependency> <!-- Spark dependency -->
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.bahir</groupId>
<artifactId>spark-streaming-twitter_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
</dependencies>
</project>
Spark 版本 所有包必须使用相同的主要 Spark 版本(1.6、2.0、2.1、...)。
考虑以下(不正确)build.sbt:
name := "Simple Project" version := "1.0" libraryDependencies ++= Seq( "org.apache.spark" % "spark-core_2.11" % "1.6.1", "org.apache.spark" % "spark-streaming_2.10" % "2.0.1", "org.apache.bahir" % "spark-streaming-twitter_2.11" % "2.0.1" )
我们使用
spark-core
1.6,其余组件在 Spark 2.0 中。 有效 文件可能是name := "Simple Project" version := "1.0" libraryDependencies ++= Seq( "org.apache.spark" % "spark-core_2.11" % "2.0.1", "org.apache.spark" % "spark-streaming_2.10" % "2.0.1", "org.apache.bahir" % "spark-streaming-twitter_2.11" % "2.0.1" )
但最好使用变量 (仍然不正确):
name := "Simple Project" version := "1.0" val sparkVersion = "2.0.1" libraryDependencies ++= Seq( "org.apache.spark" % "spark-core_2.11" % sparkVersion, "org.apache.spark" % "spark-streaming_2.10" % sparkVersion, "org.apache.bahir" % "spark-streaming-twitter_2.11" % sparkVersion )
在 Maven 中类似:
<project>
<groupId>com.example</groupId>
<artifactId>simple-project</artifactId>
<modelVersion>4.0.0</modelVersion>
<name>Simple Project</name>
<packaging>jar</packaging>
<version>1.0</version>
<properties>
<spark.version>2.0.1</spark.version>
<scala.version>2.11</scala.version>
</properties>
<dependencies>
<dependency> <!-- Spark dependency -->
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_${scala.version}</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_${scala.version}</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.bahir</groupId>
<artifactId>spark-streaming-twitter_${scala.version}</artifactId>
<version>${spark.version}</version>
</dependency>
</dependencies>
</project>
Spark 依赖项中使用的 Spark 版本必须与 Spark 安装的 Spark 版本相匹配。例如,如果您在集群上使用 1.6.1,则必须使用 1.6.1 来构建 jars。不总是接受次要版本不匹配。
用于构建 jar 的 Scala 版本必须与用于构建部署的 Spark 的 Scala 版本相匹配。默认情况下(可下载的二进制文件和默认构建):
- Spark 1.x -> Scala 2.10
- Spark 2.x -> Scala 2.11
如果包含在 fat jar 中,工作节点上应该可以访问其他包。有许多选项,包括:
--jars
spark-submit
参数 - 分发本地jar
文件。--packages
spark-submit
的参数 - 从 Maven 存储库中获取依赖项。
在集群节点中提交时,您应该在
--jars
中包含应用程序jar
。
除了 user7337271 已经给出的非常广泛的答案之外,如果问题是由于缺少外部依赖项造成的,您可以使用您的依赖项构建一个 jar,例如maven assembly plugin
在这种情况下,请确保在您的构建系统中将所有核心 spark 依赖项标记为 "provided",并且如前所述,确保它们与您的运行时 spark 版本相关联。
应用程序的依赖项类 应在启动命令的application-jar 选项中指定。
找到更多详细信息摘自文档:
application-jar: Path to a bundled jar including your application and all dependencies. The URL must be globally visible inside of your cluster, for instance, an hdfs:// path or a file:// path that is present on all nodes
Apache Spark 的类路径是动态构建的(以适应每个应用程序的用户代码),这使得它容易受到此类问题的影响。 @user7337271 的回答是正确的,但还有一些问题,具体取决于您使用的 集群管理器 ("master")。
首先,一个 Spark 应用程序由这些组件组成(每个组件都是一个单独的 JVM,因此其类路径中可能包含不同的 类):
- Driver:这是 您的 应用程序创建
SparkSession
(或SparkContext
)并连接到集群执行实际工作的经理 - Cluster Manager:作为集群的"entry point",负责为每个应用分配executor。 Spark 支持几种不同的类型:standalone、YARN 和 Mesos,我们将在下面对其进行描述。
- Executors:这些是集群节点上的进程,执行实际工作(运行ning Spark tasks)
Apache Spark cluster mode overview 中的这张图表描述了它们之间的关系:
现在 - 哪个 类 应该驻留在每个组件中?
这可以通过下图来回答:
让我们慢慢解析:
Spark 代码 是 Spark 的库。它们应该存在于 ALL 三个组件中,因为它们包含让 Spark 执行它们之间通信的粘合剂。顺便说一句 - Spark 作者做出了一个设计决定,将所有组件的代码包含在所有组件中(例如,在驱动程序的执行器中也包含应该只 运行 的代码)以简化这一点 - 所以 Spark 的 "fat jar"(在 1.6 及以下的版本中)或 "archive"(在 2.0 中,详情如下)包含所有组件的必要代码,并且应该在所有组件中可用。
Driver-Only Code 这是用户代码,不包含应在执行器上使用的任何内容,即不在任何转换中使用的代码在 RDD / DataFrame / Dataset 上。这不一定必须与分布式用户代码分开,但可以这样做。
分布式代码这是用驱动程序代码编译的用户代码,但也必须在执行器上执行——实际转换使用的所有内容都必须是包含在这个罐子里。
现在我们明白了,如何我们如何让 类 在每个组件中正确加载,它们应该遵循什么规则?
Spark 代码:正如前面的答案所述,您必须使用相同的 Scala 和 所有组件中的 Spark 个版本。
1.1 在独立 模式下,有一个"pre-existing" Spark 安装,应用程序(驱动程序)可以连接到该安装。这意味着 所有驱动程序必须使用相同的 Spark 版本 运行ning 在主机和执行程序上。
1.2 在YARN / Mesos中,每个应用程序可以使用不同的Spark版本,但同一个应用程序的所有组件必须使用相同的版本。这意味着如果您使用版本 X 来编译和打包驱动程序应用程序,则在启动 SparkSession 时应提供相同的版本(例如,在使用 YARN 时通过
spark.yarn.archive
或spark.yarn.jars
参数)。您提供的 jars / archive 应该包括所有 Spark 依赖项(包括传递依赖项),并且它会在应用程序启动时由集群管理器发送给每个执行程序。驱动程序代码:这完全取决于 - 驱动程序代码可以作为一堆 jar 或 "fat jar" 提供,只要它包括所有 Spark 依赖项 + 所有用户代码
分布式代码:除了出现在驱动程序上之外,此代码还必须发送给执行程序(同样,连同其所有传递依赖项) .这是使用
spark.jars
参数完成的。
总而言之,这里是构建和部署 Spark 应用程序的建议方法(在本例中 - 使用 YARN):
- 使用您的分布式代码创建一个库,将其打包为 "regular" jar(带有描述其依赖项的 .pom 文件)和 "fat jar"(包含其所有传递依赖项) ).
- 创建驱动程序应用程序,编译依赖于您的分布式代码库和 Apache Spark(具有特定版本)
- 将驱动程序应用程序打包成一个 fat jar 以部署到驱动程序
- 启动
SparkSession
时,将正确版本的分布式代码作为 - 将包含下载的 Spark 二进制文件的
lib/
文件夹下的所有 jar 的存档文件(例如 gzip)的位置作为spark.yarn.archive
的值传递
spark.jars
参数的值传递
我觉得这个问题必须解决一个汇编插件。 你需要建立一个胖罐子。 例如在 sbt 中:
- 使用代码
addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.14.0")
添加文件 - 到build.sbt
added some libraries
libraryDependencies ++= Seq("com.some.company" %% "some-lib" % "1.0.0")` - 在 sbt 控制台中输入 "assembly",然后部署程序集 jar
$PROJECT_ROOT/project/assembly.sbt
如果您需要更多信息,请转至 https://github.com/sbt/sbt-assembly
在项目中添加spark-2.4.0-bin-hadoop2.7\spark-2.4.0-bin-hadoop2.7\jars的所有jar文件。 spark-2.4.0-bin-hadoop2.7 可以从 https://spark.apache.org/downloads.html
下载我有以下 build.sbt
lazy val root = (project in file(".")).
settings(
name := "spark-samples",
version := "1.0",
scalaVersion := "2.11.12",
mainClass in Compile := Some("StreamingExample")
)
libraryDependencies ++= Seq(
"org.apache.spark" %% "spark-core" % "2.4.0",
"org.apache.spark" %% "spark-streaming" % "2.4.0",
"org.apache.spark" %% "spark-sql" % "2.4.0",
"com.couchbase.client" %% "spark-connector" % "2.2.0"
)
// META-INF discarding
assemblyMergeStrategy in assembly := {
case PathList("META-INF", xs @ _*) => MergeStrategy.discard
case x => MergeStrategy.first
}
我已经使用 sbt assembly 插件为我的应用程序创建了一个 fat jar,但是当 运行 使用 spark-submit 时它失败并显示错误:
java.lang.NoClassDefFoundError: rx/Completable$OnSubscribe
at com.couchbase.spark.connection.CouchbaseConnection.streamClient(CouchbaseConnection.scala:154)
我可以看到 class 存在于我的 fat jar 中:
jar tf target/scala-2.11/spark-samples-assembly-1.0.jar | grep 'Completable$OnSubscribe'
rx/Completable$OnSubscribe.class
不确定我在这里遗漏了什么,有什么线索吗?