spark-submit 适用于 yarn-cluster 模式,但 SparkLauncher 不适用,具有相同的参数

spark-submit works for yarn-cluster mode but SparkLauncher doesn't, with same params

我可以通过 spark-submit 提交 spark 作业,但是当我尝试使用 SparkLauncher 以编程方式执行相同操作时,它什么也没给我(我什至没有在 UI 上看到 Spark 作业)

场景如下:

我有一台托管 hdfs 集群的服务器(比如主机名:cr-hdbc101.dev.local:7123)。我将一个胖罐子推到我试图执行的服务器上。 以下 spark-submit 按预期工作,并以 yarn-cluster 模式提交 spark 作业

spark-submit \
      --verbose \
      --class com.digital.StartSparkJob \
      --master yarn \
      --deploy-mode cluster \
      --num-executors 2 \
      --driver-memory 2g \
      --executor-memory 3g \
      --executor-cores 4 \
      /usr/share/Deployments/Consolidateservice.jar "<arg_to_main>"

但是下面这段 SparkLauncher 代码不起作用

val sparkLauncher = new SparkLauncher()
    sparkLauncher
      .setSparkHome("/opt/cloudera/parcels/CDH-5.7.2-1.cdh5.7.2.p0.18/lib/spark")
      .setAppResource("/usr/share/Deployments/Consolidateservice.jar")
      .setMaster("yarn-cluster")
      .setVerbose(true)
      .setMainClass("com.digital.StartSparkJob")
      .setDeployMode("cluster")
      .setConf("spark.driver.cores", "2")
      .setConf("spark.driver.memory", "2g")
      .setConf("spark.executor.cores", "4")
      .setConf("spark.executor.memory", "3g")
      .addAppArgs(<arg_to_main>)
      .startApplication()

我认为 SparkLauncher 可能没有获得正确的环境变量来使用,所以我将以下内容发送给 SparkLauncher,但无济于事(基本上我将 spark-env.sh 中的所有内容都传递给 SparkLauncher)

val env: java.util.Map[String, String] = new java.util.HashMap[String, String]
    env.put("SPARK_CONF_DIR", "/etc/spark/conf.cloudera.spark_on_yarn")
    env.put("HADOOP_HOME", "/opt/cloudera/parcels/CDH-5.7.2-1.cdh5.7.2.p0.18/lib/hadoop")
    env.put("YARN_CONF_DIR", "/etc/spark/conf.cloudera.spark_on_yarn/yarn-conf")
    env.put("SPARK_LIBRARY_PATH", "/opt/cloudera/parcels/CDH-5.7.2-1.cdh5.7.2.p0.18/lib/spark/lib")
    env.put("SCALA_LIBRARY_PATH", "/opt/cloudera/parcels/CDH-5.7.2-1.cdh5.7.2.p0.18/lib/spark/lib")
    env.put("LD_LIBRARY_PATH", "/opt/cloudera/parcels/CDH-5.7.2-1.cdh5.7.2.p0.18/lib/hadoop/lib/native")
    env.put("SPARK_DIST_CLASSPATH", "/etc/spark/conf.cloudera.spark_on_yarn/classpath.txt")

    val sparkLauncher = new SparkLauncher(env)
    sparkLauncher
      .setSparkHome("/opt/cloudera/parcels/CDH-5.7.2-1.cdh5.7.2.p0.18/lib/spark")...

更令人沮丧的是,当我对 yarn-client 模式使用相同的 SparkLauncher 代码时,它工作得非常好。

有人可以指出我遗漏了什么吗,我只是觉得我在盯着这个问题而不认识它。

注意:主要的 class(com.digital.StartSparkJob) 和 SparkLauncher 代码都是我推送到服务器的 fat jar 的一部分。我只是用外部 API 调用 SparkLauncher 代码,这又应该在集群上打开驱动程序 JVM

Spark版本:1.6.0,scala版本:2.10.5

我什至没有在 Spark 上获取日志-UI...sparkApp 甚至没有 运行。因此,我 运行 将 sparkLauncher 作为一个进程(使用 .launch().waitFor() )以便我可以捕获错误日志。

我使用 .getInputStream 和 .getErrorStream 捕获了日志,发现传递给集群的用户是错误的。我的集群仅适用于用户 "abcd"。

在启动 SparkLauncher 之前,我确实设置了 System.setProperty("HADOOP_USER_NAME", "abcd"),并在 spark-default.conf 中添加了 "spark.yarn.appMasterEnv.HADOOP_USER_NAME=abcd"。但是看起来他们没有被移植到集群。

因此我将 HADOOP_USER_NAME 作为 childArg 传递给了 SparkLauncher

val env: java.util.Map[String, String] = new java.util.HashMap[String, String]
    env.put("SPARK_CONF_DIR", "/etc/spark/conf.cloudera.spark_on_yarn")
    env.put("YARN_CONF_DIR", "/etc/spark/conf.cloudera.spark_on_yarn/yarn-conf")
    env.put("HADOOP_USER_NAME", "abcd")

try {
val sparkLauncher = new SparkLauncher(env)...