我的 sparkDF.persist(DISK_ONLY) 数据存储在哪里?

Where is my sparkDF.persist(DISK_ONLY) data stored?

想进一步了解hadoop out of spark的持久化策略。

当我使用 DISK_ONLY-strategy 持久化数据帧时,我的数据存储在哪里 (path/folder...)?我在哪里指定这个位置?

对于简短的回答,我们可以看看 the documentation 关于 spark.local.dir:

Directory to use for "scratch" space in Spark, including map output files and RDDs that get stored on disk. This should be on a fast, local disk in your system. It can also be a comma-separated list of multiple directories on different disks. NOTE: In Spark 1.0 and later this will be overridden by SPARK_LOCAL_DIRS (Standalone, Mesos) or LOCAL_DIRS (YARN) environment variables set by the cluster manager.

更深入的理解我们可以看代码:一个DataFrame(就是一个Dataset[Row])是在RDDs and it leverages the same persistence mechanisms. RDDs delegate this to SparkContext, which marks it for persistence. The task is then actually taken care of by several classes in the org.apache.spark.storage package: first, the BlockManager just manages chunks of data to be persisted and the policy on how to do it, delegating actual persistence to a DiskStore (when writing on disk, of course) which represents a high level interface for writing and that in turn has a DiskBlockManager的基础上进行更底层的操作.

希望您了解现在要查看的位置,以便我们继续前进并了解数据实际保存在哪里以及我们如何配置它:DiskBlockManager 调用助手 Utils.getConfiguredLocalDirs,出于实用性考虑,我将在此处复制(取自链接的 2.2.1 版本,撰写本文时的最新版本):

def getConfiguredLocalDirs(conf: SparkConf): Array[String] = {
    val shuffleServiceEnabled = conf.getBoolean("spark.shuffle.service.enabled", false)
    if (isRunningInYarnContainer(conf)) {
        // If we are in yarn mode, systems can have different disk layouts so we must set it
        // to what Yarn on this system said was available. Note this assumes that Yarn has
        // created the directories already, and that they are secured so that only the
        // user has access to them.
        getYarnLocalDirs(conf).split(",")
    } else if (conf.getenv("SPARK_EXECUTOR_DIRS") != null) {
        conf.getenv("SPARK_EXECUTOR_DIRS").split(File.pathSeparator)
    } else if (conf.getenv("SPARK_LOCAL_DIRS") != null) {
        conf.getenv("SPARK_LOCAL_DIRS").split(",")
    } else if (conf.getenv("MESOS_DIRECTORY") != null && !shuffleServiceEnabled) {
        // Mesos already creates a directory per Mesos task. Spark should use that directory
        // instead so all temporary files are automatically cleaned up when the Mesos task ends.
        // Note that we don't want this if the shuffle service is enabled because we want to
        // continue to serve shuffle files after the executors that wrote them have already exited.
        Array(conf.getenv("MESOS_DIRECTORY"))
    } else {
        if (conf.getenv("MESOS_DIRECTORY") != null && shuffleServiceEnabled) {
        logInfo("MESOS_DIRECTORY available but not using provided Mesos sandbox because " +
            "spark.shuffle.service.enabled is enabled.")
        }
        // In non-Yarn mode (or for the driver in yarn-client mode), we cannot trust the user
        // configuration to point to a secure directory. So create a subdirectory with restricted
        // permissions under each listed directory.
        conf.get("spark.local.dir", System.getProperty("java.io.tmpdir")).split(",")
    }
}

代码,我相信,是非常不言自明的,并且有很好的评论(并且与文档的内容完全匹配):当 运行 在 Yarn 上时,有一个特定的策略依赖于 Yarn 的存储容器,在 Mesos 中,它要么使用 Mesos 沙箱(除非启用了 shuffle 服务),在所有其他情况下,它将转到 spark.local.dirjava.io.tmpdir 下设置的位置(这可能是 /tmp/).

因此,如果您只是玩玩,数据很可能存储在 /tmp/ 下,否则这在很大程度上取决于您的环境和配置。

总结一下我的 YARN 环境:

在@stefanobaghino 的指导下,我能够在加载 yarn 配置的代码中更进一步。

val localDirs = Option(conf.getenv("LOCAL_DIRS")).getOrElse("")

yarn.nodemanager.local-dirs选项中设置yarn-default.xml

我的问题的背景是,由错误引起的

2018-01-23 16:57:35,229 WARN org.apache.hadoop.yarn.server.nodemanager.DirectoryCollection: Directory /data/1/yarn/local error, used space above threshold of 98.5%, removing from list of valid directories

我的 spark-job 有时会被杀死,我想了解这个磁盘是否也用于我的持久数据,而 运行 作业(实际上是一个巨大的数量)。

事实证明,这正是使用磁盘策略持久化数据时数据所在的文件夹。

非常感谢您对这个问题的所有帮助指导!