运行 来自本地 IDE 针对远程 Spark 集群

Running from a local IDE against a remote Spark cluster

我们在 Yarn 上有一个带有 Spark 运行ning 的 Kerberized 集群。目前,我们在本地用 Scala 编写 Spark 代码,然后构建一个胖 JAR,我们将其复制到集群,然后 运行 spark-submit。相反,我想在我的本地 PC 上编写 Spark 代码,并将其 运行 直接针对集群。有没有直接的方法来做到这一点? Spark 文档似乎没有任何此类模式。

仅供参考,我的本地机器是 运行ning Windows 集群是 运行ning CDH.

假设您的类路径中有正确的包(通过 SBT、Maven 等进行最简单的设置),您应该能够从任何地方 spark-submit--master 标志是真正决定作业分配方式的主要部分。需要考虑的一件事是,如果您的本地计算机未通过防火墙或其他网络防护措施与 YARN 集群隔离。 (因为您不希望人们在您的集群上随机 运行 应用程序)

在您的本地计算机上,您需要集群中的 Hadoop 配置文件和设置 $SPARK_HOME/conf 目录以容纳一些 Hadoop 相关设置。

来自 Spark on YARN 页。

Ensure that HADOOP_CONF_DIR or YARN_CONF_DIR points to the directory which contains the (client side) configuration files for the Hadoop cluster. These configs are used to write to HDFS and connect to the YARN ResourceManager. The configuration contained in this directory will be distributed to the YARN cluster so that all containers used by the application use the same configuration

这些值是根据$SPARK_HOME/conf/spark-env.sh

设置的

由于您已使用 Kerberos,请参阅 Long Running Spark Applciations

For long-running applications, such as Spark Streaming jobs, to write to HDFS, you must configure Kerberos authentication for Spark for Spark, and pass the Spark principal and keytab to the spark-submit script using the --principal and --keytab parameters

虽然 cricket007 的回答适用于 spark-submit,但这是我使用 IntelliJ 对 运行 远程集群所做的:

首先,确保客户端和服务器端的 JAR 是相同的。由于我们使用的是 CDH 7.1,我确保我所有的 JAR 都来自特定的发行版。

按照 cricket007 的回答中的描述设置 HADOOP_CONF_DIR 和 YARN_CONF_DIR。在 Spark conf 中适当设置 "spark.yarn.principal" 和 "spark.yarn.keytab"。

如果连接到 HDFS,请确保在 build.sbt 中设置了以下排除规则:

libraryDependencies += "org.apache.hadoop" % "hadoop-client" % "2.6.0-cdh5.7.1" excludeAll ExclusionRule(organization = "javax.servlet")

确保 spark-launcher 和 spark-yarn JAR 列在 build.sbt。

libraryDependencies += "org.apache.spark" %% "spark-launcher" % "1.6.0-cdh5.7.1"

libraryDependencies += "org.apache.spark" %% "spark-yarn" % "1.6.0-cdh5.7.1"

在服务器上找到 CDH JAR 并将它们复制到 HDFS 上的已知位置。将以下行添加到您的代码中:

final val CDH_JAR_PATH = "/opt/cloudera/parcels/CDH/jars"

final val hadoopJars: Seq[String] = Seq[String](
"hadoop-annotations-2.6.0-cdh5.7.1.jar"
, "hadoop-ant-2.6.0-cdh5.7.1.jar"
, "hadoop-ant-2.6.0-mr1-cdh5.7.1.jar"
, "hadoop-archive-logs-2.6.0-cdh5.7.1.jar"
, "hadoop-archives-2.6.0-cdh5.7.1.jar"
, "hadoop-auth-2.6.0-cdh5.7.1.jar"
, "hadoop-aws-2.6.0-cdh5.7.1.jar"
, "hadoop-azure-2.6.0-cdh5.7.1.jar"
, "hadoop-capacity-scheduler-2.6.0-mr1-cdh5.7.1.jar"
, "hadoop-common-2.6.0-cdh5.7.1.jar"
, "hadoop-core-2.6.0-mr1-cdh5.7.1.jar"
, "hadoop-datajoin-2.6.0-cdh5.7.1.jar"
, "hadoop-distcp-2.6.0-cdh5.7.1.jar"
, "hadoop-examples-2.6.0-mr1-cdh5.7.1.jar"
, "hadoop-examples.jar"
, "hadoop-extras-2.6.0-cdh5.7.1.jar"
, "hadoop-fairscheduler-2.6.0-mr1-cdh5.7.1.jar"
, "hadoop-gridmix-2.6.0-cdh5.7.1.jar"
, "hadoop-gridmix-2.6.0-mr1-cdh5.7.1.jar"
, "hadoop-hdfs-2.6.0-cdh5.7.1.jar"
, "hadoop-hdfs-nfs-2.6.0-cdh5.7.1.jar"
, "hadoop-kms-2.6.0-cdh5.7.1.jar"
, "hadoop-mapreduce-client-app-2.6.0-cdh5.7.1.jar"
, "hadoop-mapreduce-client-common-2.6.0-cdh5.7.1.jar"
, "hadoop-mapreduce-client-core-2.6.0-cdh5.7.1.jar"
, "hadoop-mapreduce-client-hs-2.6.0-cdh5.7.1.jar"
, "hadoop-mapreduce-client-hs-plugins-2.6.0-cdh5.7.1.jar"
, "hadoop-mapreduce-client-jobclient-2.6.0-cdh5.7.1.jar"
, "hadoop-mapreduce-client-nativetask-2.6.0-cdh5.7.1.jar"
, "hadoop-mapreduce-client-shuffle-2.6.0-cdh5.7.1.jar"
, "hadoop-nfs-2.6.0-cdh5.7.1.jar"
, "hadoop-openstack-2.6.0-cdh5.7.1.jar"
, "hadoop-rumen-2.6.0-cdh5.7.1.jar"
, "hadoop-sls-2.6.0-cdh5.7.1.jar"
, "hadoop-streaming-2.6.0-cdh5.7.1.jar"
, "hadoop-streaming-2.6.0-mr1-cdh5.7.1.jar"
, "hadoop-tools-2.6.0-mr1-cdh5.7.1.jar"
, "hadoop-yarn-api-2.6.0-cdh5.7.1.jar"
, "hadoop-yarn-applications-distributedshell-2.6.0-cdh5.7.1.jar"
, "hadoop-yarn-applications-unmanaged-am-launcher-2.6.0-cdh5.7.1.jar"
, "hadoop-yarn-client-2.6.0-cdh5.7.1.jar"
, "hadoop-yarn-common-2.6.0-cdh5.7.1.jar"
, "hadoop-yarn-registry-2.6.0-cdh5.7.1.jar"
, "hadoop-yarn-server-applicationhistoryservice-2.6.0-cdh5.7.1.jar"
, "hadoop-yarn-server-common-2.6.0-cdh5.7.1.jar"
, "hadoop-yarn-server-nodemanager-2.6.0-cdh5.7.1.jar"
, "hadoop-yarn-server-resourcemanager-2.6.0-cdh5.7.1.jar"
, "hadoop-yarn-server-web-proxy-2.6.0-cdh5.7.1.jar"
, "hbase-hadoop2-compat-1.2.0-cdh5.7.1.jar"
, "hbase-hadoop-compat-1.2.0-cdh5.7.1.jar")

final val sparkJars: Seq[String] = Seq[String](
"spark-1.6.0-cdh5.7.1-yarn-shuffle.jar",
"spark-assembly-1.6.0-cdh5.7.1-hadoop2.6.0-cdh5.7.1.jar",
"spark-avro_2.10-1.1.0-cdh5.7.1.jar",
"spark-bagel_2.10-1.6.0-cdh5.7.1.jar",
"spark-catalyst_2.10-1.6.0-cdh5.7.1.jar",
"spark-core_2.10-1.6.0-cdh5.7.1.jar",
"spark-examples-1.6.0-cdh5.7.1-hadoop2.6.0-cdh5.7.1.jar",
"spark-graphx_2.10-1.6.0-cdh5.7.1.jar",
"spark-hive_2.10-1.6.0-cdh5.7.1.jar",
"spark-launcher_2.10-1.6.0-cdh5.7.1.jar",
"spark-mllib_2.10-1.6.0-cdh5.7.1.jar",
"spark-network-common_2.10-1.6.0-cdh5.7.1.jar",
"spark-network-shuffle_2.10-1.6.0-cdh5.7.1.jar",
"spark-repl_2.10-1.6.0-cdh5.7.1.jar",
"spark-sql_2.10-1.6.0-cdh5.7.1.jar",
"spark-streaming-flume-sink_2.10-1.6.0-cdh5.7.1.jar",
"spark-streaming-flume_2.10-1.6.0-cdh5.7.1.jar",
"spark-streaming-kafka_2.10-1.6.0-cdh5.7.1.jar",
"spark-streaming_2.10-1.6.0-cdh5.7.1.jar",
"spark-unsafe_2.10-1.6.0-cdh5.7.1.jar",
"spark-yarn_2.10-1.6.0-cdh5.7.1.jar")

def getClassPath(jarNames: Seq[String], pathPrefix: String): String = {
jarNames.foldLeft("")((cp, name) => s"$cp:$pathPrefix/$name").drop(1)

}

创建 SparkConf 时添加这些行:

.set("spark.driver.extraClassPath", getClassPath(sparkJars ++ hadoopJars, CDH_JAR_PATH))
.set("spark.executor.extraClassPath", getClassPath(sparkJars ++ hadoopJars, CDH_JAR_PATH))
.set("spark.yarn.jars", "hdfs://$YOUR_MACHINE/PATH_TO_JARS/*")

您的程序现在应该可以运行了。