如何将位于 HDFS 上的类型安全配置文件添加到 spark-submit(集群模式)?

How to add a typesafe config file which is located on HDFS to spark-submit (cluster-mode)?

我有一个 Spark (Spark 1.5.2) 应用程序可以将数据从 Kafka 流式传输到 HDFS。我的应用程序包含两个 Typesafe 配置文件来配置某些东西,比如 Kafka 主题等。

现在我想 运行 我的应用程序在集群中使用 spark-submit(集群模式)。 我项目的所有依赖项的 jar 文件存储在 HDFS 上。 只要我的配置文件包含在 jar 文件中,一切正常。但这对于测试目的是不切实际的,因为我总是必须重建 jar。

因此我排除了项目的配置文件,并通过 "driver-class-path" 添加了它们。这适用于客户端模式,但如果我现在将配置文件移动到 HDFS 并且 运行 我的应用程序处于集群模式,它找不到设置。您可以在下面找到我的 spark-submit 命令:

/usr/local/spark/bin/spark-submit \
    --total-executor-cores 10 \
    --executor-memory 15g \
    --verbose \
    --deploy-mode cluster\
    --class com.hdp.speedlayer.SpeedLayerApp \
    --driver-class-path hdfs://iot-master:8020/user/spark/config \
    --master spark://spark-master:6066 \

我已经用 --file 参数试过了,但还是没用。有人知道我该如何解决这个问题吗?


我做了一些进一步的研究,发现它可能与 HDFS 路径有关。我将 HDFS 路径更改为“hdfs:///iot-master:8020//user//spark//config 但不幸的是,这也不起作用。但也许这可以帮助你。


Exception in thread "main" java.lang.reflect.InvocationTargetException
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:497)
    at org.apache.spark.deploy.worker.DriverWrapper$.main(DriverWrapper.scala:58)
    at org.apache.spark.deploy.worker.DriverWrapper.main(DriverWrapper.scala)
Caused by: java.lang.ExceptionInInitializerError
    at com.speedlayer.SpeedLayerApp.main(SpeedLayerApp.scala)
    ... 6 more
Caused by: com.typesafe.config.ConfigException$Missing: No configuration setting found for key 'application'
    at com.typesafe.config.impl.SimpleConfig.findKey(SimpleConfig.java:124)
    at com.typesafe.config.impl.SimpleConfig.find(SimpleConfig.java:145)
    at com.typesafe.config.impl.SimpleConfig.find(SimpleConfig.java:159)
    at com.typesafe.config.impl.SimpleConfig.find(SimpleConfig.java:164)

一个选项是使用 --files 标志和 HDFS 位置,并确保使用 spark.executor.extraClassPath 标志和 -Dconfig.file:[=17 将其添加到执行程序类路径中=]

Spark uses the following URL scheme to allow different strategies for disseminating jars:

  • file: - Absolute paths and file:/ URIs are served by the driver’s HTTP file server, and every executor pulls the file from the driver HTTP server.
  • hdfs:, http:, https:, ftp: - these pull down files and JARs from the URI as expected
  • local: - a URI starting with local:/ is expected to exist as a local file on each worker node. This means that no network IO will be incurred, and works well for large files/JARs that are pushed to each worker, or shared via NFS, GlusterFS, etc.


--files FILES           Comma-separated list of files to be placed in the working
                        directory of each executor.

运行 火花提交:

/usr/local/spark/bin/spark-submit \
--total-executor-cores 10 \
--executor-memory 15g \
--conf "spark.executor.extraClassPath=-Dconfig.file=application.conf"
--verbose \
--deploy-mode cluster\
--class com.hdp.speedlayer.SpeedLayerApp \
--driver-class-path hdfs://iot-master:8020/user/spark/config \
--files hdfs:/path/to/conf \
--master spark://spark-master:6066 \


  1. --files: 仅关联到计算机 运行 上的本地文件并执行 spark-submit 命令并转换为 conf.addFile()。因此 hdfs 文件将无法工作,除非您能够 运行 hdfs dfs -get <....> 之前检索文件。在我的例子中,我想从 oozie 运行 它所以我不知道它会在哪台机器上 运行 而且我不想在我的工作流程中添加复制文件操作。
  2. 引述@Yuval_Itzchakov引用的是--jars,它只处理罐子,因为它转换为conf.addJar()

据我所知,没有从 hdfs 加载配置文件的简单方法。


private val HDFS_IMPL_KEY = "fs.hdfs.impl"
def loadConf(pathToConf: String): Config = {
   val path = new Path(pathToConf)
   val confFile = File.createTempFile(path.getName, "tmp")
   getFileSystemByUri(path.toUri).copyToLocalFile(path, new Path(confFile.getAbsolutePath))


def getFileSystemByUri(uri: URI) : FileSystem  = {
   val hdfsConf = new Configuration()
   hdfsConf.set(HDFS_IMPL_KEY, classOf[org.apache.hadoop.hdfs.DistributedFileSystem].getName)
FileSystem.get(uri, hdfsConf)
