解决 Apache Spark 中的依赖问题

Resolving dependency problems in Apache Spark

构建和部署 Spark 应用程序时的常见问题是:

如何解决这些问题?

构建和部署 Spark 应用程序时,所有依赖项都需要兼容的版本。

  • Scala 版本。所有包都必须使用相同的主要(2.10、2.11、2.12)Scala 版本。

    考虑以下(不正确)build.sbt

    name := "Simple Project"
    
    version := "1.0"
    
    libraryDependencies ++= Seq(
       "org.apache.spark" % "spark-core_2.11" % "2.0.1",
       "org.apache.spark" % "spark-streaming_2.10" % "2.0.1",
       "org.apache.bahir" % "spark-streaming-twitter_2.11" % "2.0.1"
    )
    

    我们使用 spark-streaming 用于 Scala 2.10,而其余包用于 Scala 2.11。 有效 文件可能是

    name := "Simple Project"
    
    version := "1.0"
    
    libraryDependencies ++= Seq(
       "org.apache.spark" % "spark-core_2.11" % "2.0.1",
       "org.apache.spark" % "spark-streaming_2.11" % "2.0.1",
       "org.apache.bahir" % "spark-streaming-twitter_2.11" % "2.0.1"
    )
    

    但最好全局指定版本并使用 %%(为您附加 scala 版本):

    name := "Simple Project"
    
    version := "1.0"
    
    scalaVersion := "2.11.7"
    
    libraryDependencies ++= Seq(
       "org.apache.spark" %% "spark-core" % "2.0.1",
       "org.apache.spark" %% "spark-streaming" % "2.0.1",
       "org.apache.bahir" %% "spark-streaming-twitter" % "2.0.1"
    )
    

在 Maven 中类似:

    <project>
      <groupId>com.example</groupId>
      <artifactId>simple-project</artifactId>
      <modelVersion>4.0.0</modelVersion>
      <name>Simple Project</name>
      <packaging>jar</packaging>
      <version>1.0</version>
      <properties>
        <spark.version>2.0.1</spark.version>
      </properties> 
      <dependencies>
        <dependency> <!-- Spark dependency -->
          <groupId>org.apache.spark</groupId>
          <artifactId>spark-core_2.11</artifactId>
          <version>${spark.version}</version>
        </dependency>
        <dependency>
          <groupId>org.apache.spark</groupId>
          <artifactId>spark-streaming_2.11</artifactId>
          <version>${spark.version}</version>
        </dependency> 
        <dependency>
          <groupId>org.apache.bahir</groupId>
          <artifactId>spark-streaming-twitter_2.11</artifactId>
          <version>${spark.version}</version>
        </dependency>
      </dependencies>
    </project>
  • Spark 版本 所有包必须使用相同的主要 Spark 版本(1.6、2.0、2.1、...)。

    考虑以下(不正确)build.sbt:

    name := "Simple Project"
    
    version := "1.0"
    
    libraryDependencies ++= Seq(
       "org.apache.spark" % "spark-core_2.11" % "1.6.1",
       "org.apache.spark" % "spark-streaming_2.10" % "2.0.1",
       "org.apache.bahir" % "spark-streaming-twitter_2.11" % "2.0.1"
    )
    

    我们使用 spark-core 1.6,其余组件在 Spark 2.0 中。 有效 文件可能是

    name := "Simple Project"
    
    version := "1.0"
    
    libraryDependencies ++= Seq(
       "org.apache.spark" % "spark-core_2.11" % "2.0.1",
       "org.apache.spark" % "spark-streaming_2.10" % "2.0.1",
       "org.apache.bahir" % "spark-streaming-twitter_2.11" % "2.0.1"
    )
    

    但最好使用变量 (仍然不正确):

    name := "Simple Project"
    
    version := "1.0"
    
    val sparkVersion = "2.0.1"
    
    libraryDependencies ++= Seq(
       "org.apache.spark" % "spark-core_2.11" % sparkVersion,
       "org.apache.spark" % "spark-streaming_2.10" % sparkVersion,
       "org.apache.bahir" % "spark-streaming-twitter_2.11" % sparkVersion
    )
    

在 Maven 中类似:

    <project>
      <groupId>com.example</groupId>
      <artifactId>simple-project</artifactId>
      <modelVersion>4.0.0</modelVersion>
      <name>Simple Project</name>
      <packaging>jar</packaging>
      <version>1.0</version>
      <properties>
        <spark.version>2.0.1</spark.version>
        <scala.version>2.11</scala.version>
      </properties> 
      <dependencies>
        <dependency> <!-- Spark dependency -->
          <groupId>org.apache.spark</groupId>
          <artifactId>spark-core_${scala.version}</artifactId>
          <version>${spark.version}</version>
        </dependency>
        <dependency>
          <groupId>org.apache.spark</groupId>
          <artifactId>spark-streaming_${scala.version}</artifactId>
          <version>${spark.version}</version>
        </dependency> 
        <dependency>
          <groupId>org.apache.bahir</groupId>
          <artifactId>spark-streaming-twitter_${scala.version}</artifactId>
          <version>${spark.version}</version>
        </dependency>
      </dependencies>
    </project>
  • Spark 依赖项中使用的 Spark 版本必须与 Spark 安装的 Spark 版本相匹配。例如,如果您在集群上使用 1.6.1,则必须使用 1.6.1 来构建 jars。不总是接受次要版本不匹配。

  • 用于构建 jar 的 Scala 版本必须与用于构建部署的 Spark 的 Scala 版本相匹配。默认情况下(可下载的二进制文件和默认构建):

    • Spark 1.x -> Scala 2.10
    • Spark 2.x -> Scala 2.11
  • 如果包含在 fat jar 中,工作节点上应该可以访问其他包。有许多选项,包括:

    • --jars spark-submit 参数 - 分发本地 jar 文件。
    • --packages spark-submit 的参数 - 从 Maven 存储库中获取依赖项。

    在集群节点中提交时,您应该在 --jars 中包含应用程序 jar

除了 user7337271 已经给出的非常广泛的答案之外,如果问题是由于缺少外部依赖项造成的,您可以使用您的依赖项构建一个 jar,例如maven assembly plugin

在这种情况下,请确保在您的构建系统中将所有核心 spark 依赖项标记为 "provided",并且如前所述,确保它们与您的运行时 spark 版本相关联。

应用程序的依赖项类 应在启动命令的application-jar 选项中指定。

可以在 Spark documentation

找到更多详细信息

摘自文档:

application-jar: Path to a bundled jar including your application and all dependencies. The URL must be globally visible inside of your cluster, for instance, an hdfs:// path or a file:// path that is present on all nodes

Apache Spark 的类路径是动态构建的(以适应每个应用程序的用户代码),这使得它容易受到此类问题的影响。 @user7337271 的回答是正确的,但还有一些问题,具体取决于您使用的 集群管理器 ("master")。

首先,一个 Spark 应用程序由这些组件组成(每个组件都是一个单独的 JVM,因此其类路径中可能包含不同的 类):

  1. Driver:这是 您的 应用程序创建 SparkSession(或 SparkContext)并连接到集群执行实际工作的经理
  2. Cluster Manager:作为集群的"entry point",负责为每个应用分配executor。 Spark 支持几种不同的类型:standalone、YARN 和 Mesos,我们将在下面对其进行描述。
  3. Executors:这些是集群节点上的进程,执行实际工作(运行ning Spark tasks

Apache Spark cluster mode overview 中的这张图表描述了它们之间的关系:

现在 - 哪个 类 应该驻留在每个组件中?

这可以通过下图来回答:

让我们慢慢解析:

  1. Spark 代码 是 Spark 的库。它们应该存在于 ALL 三个组件中,因为它们包含让 Spark 执行它们之间通信的粘合剂。顺便说一句 - Spark 作者做出了一个设计决定,将所有组件的代码包含在所有组件中(例如,在驱动程序的执行器中也包含应该只 运行 的代码)以简化这一点 - 所以 Spark 的 "fat jar"(在 1.6 及以下的版本中)或 "archive"(在 2.0 中,详情如下)包含所有组件的必要代码,并且应该在所有组件中可用。

  2. Driver-Only Code 这是用户代码,不包含应在执行器上使用的任何内容,即不在任何转换中使用的代码在 RDD / DataFrame / Dataset 上。这不一定必须与分布式用户代码分开,但可以这样做。

  3. 分布式代码这是用驱动程序代码编译的用户代码,但也必须在执行器上执行——实际转换使用的所有内容都必须是包含在这个罐子里。

现在我们明白了,如何我们如何让 类 在每个组件中正确加载,它们应该遵循什么规则?

  1. Spark 代码:正如前面的答案所述,您必须使用相同的 Scala所有组件中的 Spark 个版本。

    1.1 在独立 模式下,有一个"pre-existing" Spark 安装,应用程序(驱动程序)可以连接到该安装。这意味着 所有驱动程序必须使用相同的 Spark 版本 运行ning 在主机和执行程序上。

    1.2 在YARN / Mesos中,每个应用程序可以使用不同的Spark版本,但同一个应用程序的所有组件必须使用相同的版本。这意味着如果您使用版本 X 来编译和打包驱动程序应用程序,则在启动 SparkSession 时应提供相同的版本(例如,在使用 YARN 时通过 spark.yarn.archivespark.yarn.jars 参数)。您提供的 jars / archive 应该包括所有 Spark 依赖项(包括传递依赖项),并且它会在应用程序启动时由集群管理器发送给每个执行程序。

  2. 驱动程序代码:这完全取决于 - 驱动程序代码可以作为一堆 jar 或 "fat jar" 提供,只要它包括所有 Spark 依赖项 + 所有用户代码

  3. 分布式代码:除了出现在驱动程序上之外,此代码还必须发送给执行程序(同样,连同其所有传递依赖项) .这是使用 spark.jars 参数完成的。

总而言之,这里是构建和部署 Spark 应用程序的建议方法(在本例中 - 使用 YARN):

  • 使用您的分布式代码创建一个库,将其打包为 "regular" jar(带有描述其依赖项的 .pom 文件)和 "fat jar"(包含其所有传递依赖项) ).
  • 创建驱动程序应用程序,编译依赖于您的分布式代码库和 Apache Spark(具有特定版本)
  • 将驱动程序应用程序打包成一个 fat jar 以部署到驱动程序
  • 启动 SparkSession
  • 时,将正确版本的分布式代码作为 spark.jars 参数的值传递
  • 将包含下载的 Spark 二进制文件的 lib/ 文件夹下的所有 jar 的存档文件(例如 gzip)的位置作为 spark.yarn.archive
  • 的值传递

我觉得这个问题必须解决一个汇编插件。 你需要建立一个胖罐子。 例如在 sbt 中:

  • 使用代码 addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.14.0")
  • 添加文件 $PROJECT_ROOT/project/assembly.sbt
  • 到build.sbtadded some librarieslibraryDependencies ++= Seq("com.some.company" %% "some-lib" % "1.0.0")`
  • 在 sbt 控制台中输入 "assembly",然后部署程序集 jar

如果您需要更多信息,请转至 https://github.com/sbt/sbt-assembly

在项目中添加spark-2.4.0-bin-hadoop2.7\spark-2.4.0-bin-hadoop2.7\jars的所有jar文件。 spark-2.4.0-bin-hadoop2.7 可以从 https://spark.apache.org/downloads.html

下载

我有以下 build.sbt

lazy val root = (project in file(".")).
  settings(
    name := "spark-samples",
    version := "1.0",
    scalaVersion := "2.11.12",
    mainClass in Compile := Some("StreamingExample")        
  )

libraryDependencies ++= Seq(
  "org.apache.spark" %% "spark-core" % "2.4.0",
  "org.apache.spark" %% "spark-streaming" % "2.4.0",
  "org.apache.spark" %% "spark-sql" % "2.4.0",
  "com.couchbase.client" %% "spark-connector" % "2.2.0" 
)

// META-INF discarding
assemblyMergeStrategy in assembly := {
       case PathList("META-INF", xs @ _*) => MergeStrategy.discard
       case x => MergeStrategy.first
   }

我已经使用 sbt assembly 插件为我的应用程序创建了一个 fat jar,但是当 运行 使用 spark-submit 时它失败并显示错误:

java.lang.NoClassDefFoundError: rx/Completable$OnSubscribe
    at com.couchbase.spark.connection.CouchbaseConnection.streamClient(CouchbaseConnection.scala:154)

我可以看到 class 存在于我的 fat jar 中:

jar tf target/scala-2.11/spark-samples-assembly-1.0.jar | grep 'Completable$OnSubscribe'
rx/Completable$OnSubscribe.class

不确定我在这里遗漏了什么,有什么线索吗?