如何将位于 HDFS 上的类型安全配置文件添加到 spark-submit(集群模式)?

How to add a typesafe config file which is located on HDFS to spark-submit (cluster-mode)?

我有一个 Spark (Spark 1.5.2) 应用程序可以将数据从 Kafka 流式传输到 HDFS。我的应用程序包含两个 Typesafe 配置文件来配置某些东西,比如 Kafka 主题等。

现在我想 运行 我的应用程序在集群中使用 spark-submit(集群模式)。 我项目的所有依赖项的 jar 文件存储在 HDFS 上。 只要我的配置文件包含在 jar 文件中,一切正常。但这对于测试目的是不切实际的,因为我总是必须重建 jar。

因此我排除了项目的配置文件,并通过 "driver-class-path" 添加了它们。这适用于客户端模式,但如果我现在将配置文件移动到 HDFS 并且 运行 我的应用程序处于集群模式,它找不到设置。您可以在下面找到我的 spark-submit 命令:

/usr/local/spark/bin/spark-submit \
    --total-executor-cores 10 \
    --executor-memory 15g \
    --verbose \
    --deploy-mode cluster\
    --class com.hdp.speedlayer.SpeedLayerApp \
    --driver-class-path hdfs://iot-master:8020/user/spark/config \
    --master spark://spark-master:6066 \
    hdfs://iot-master:8020/user/spark/speed-layer-CONFIG.jar

我已经用 --file 参数试过了,但还是没用。有人知道我该如何解决这个问题吗?

更新:

我做了一些进一步的研究,发现它可能与 HDFS 路径有关。我将 HDFS 路径更改为“hdfs:///iot-master:8020//user//spark//config 但不幸的是,这也不起作用。但也许这可以帮助你。

下面你还可以看到我在集群模式下运行驱动程序时得到的错误:

Exception in thread "main" java.lang.reflect.InvocationTargetException
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:497)
    at org.apache.spark.deploy.worker.DriverWrapper$.main(DriverWrapper.scala:58)
    at org.apache.spark.deploy.worker.DriverWrapper.main(DriverWrapper.scala)
Caused by: java.lang.ExceptionInInitializerError
    at com.speedlayer.SpeedLayerApp.main(SpeedLayerApp.scala)
    ... 6 more
Caused by: com.typesafe.config.ConfigException$Missing: No configuration setting found for key 'application'
    at com.typesafe.config.impl.SimpleConfig.findKey(SimpleConfig.java:124)
    at com.typesafe.config.impl.SimpleConfig.find(SimpleConfig.java:145)
    at com.typesafe.config.impl.SimpleConfig.find(SimpleConfig.java:159)
    at com.typesafe.config.impl.SimpleConfig.find(SimpleConfig.java:164)
...

一个选项是使用 --files 标志和 HDFS 位置,并确保使用 spark.executor.extraClassPath 标志和 -Dconfig.file:[=17 将其添加到执行程序类路径中=]

Spark uses the following URL scheme to allow different strategies for disseminating jars:

  • file: - Absolute paths and file:/ URIs are served by the driver’s HTTP file server, and every executor pulls the file from the driver HTTP server.
  • hdfs:, http:, https:, ftp: - these pull down files and JARs from the URI as expected
  • local: - a URI starting with local:/ is expected to exist as a local file on each worker node. This means that no network IO will be incurred, and works well for large files/JARs that are pushed to each worker, or shared via NFS, GlusterFS, etc.

另外,看spark-submit的帮助文档可以看到:

--files FILES           Comma-separated list of files to be placed in the working
                        directory of each executor.

运行 火花提交:

/usr/local/spark/bin/spark-submit \
--total-executor-cores 10 \
--executor-memory 15g \
--conf "spark.executor.extraClassPath=-Dconfig.file=application.conf"
--verbose \
--deploy-mode cluster\
--class com.hdp.speedlayer.SpeedLayerApp \
--driver-class-path hdfs://iot-master:8020/user/spark/config \
--files hdfs:/path/to/conf \
--master spark://spark-master:6066 \
hdfs://iot-master:8020/user/spark/speed-layer-CONFIG.jar

为了达到相同的结果,我发现了以下内容:

  1. --files: 仅关联到计算机 运行 上的本地文件并执行 spark-submit 命令并转换为 conf.addFile()。因此 hdfs 文件将无法工作,除非您能够 运行 hdfs dfs -get <....> 之前检索文件。在我的例子中,我想从 oozie 运行 它所以我不知道它会在哪台机器上 运行 而且我不想在我的工作流程中添加复制文件操作。
  2. 引述@Yuval_Itzchakov引用的是--jars,它只处理罐子,因为它转换为conf.addJar()

据我所知,没有从 hdfs 加载配置文件的简单方法。

我的方法是将路径传递到我的应用程序并读取配置文件并将其合并到参考文件中:

private val HDFS_IMPL_KEY = "fs.hdfs.impl"
def loadConf(pathToConf: String): Config = {
   val path = new Path(pathToConf)
   val confFile = File.createTempFile(path.getName, "tmp")
   confFile.deleteOnExit()
   getFileSystemByUri(path.toUri).copyToLocalFile(path, new Path(confFile.getAbsolutePath))

   ConfigFactory.load(ConfigFactory.parseFile(confFile))
}

def getFileSystemByUri(uri: URI) : FileSystem  = {
   val hdfsConf = new Configuration()
   hdfsConf.set(HDFS_IMPL_KEY, classOf[org.apache.hadoop.hdfs.DistributedFileSystem].getName)
FileSystem.get(uri, hdfsConf)
}

P.S这个错误只是说明ConfigFactory没有找到任何配置文件,所以他找不到你要找的属性。