Spark 无效检查点目录

Spark Invalid Checkpoint Directory

我的程序中有一个很长的 运行 迭代,我想每隔几次迭代就缓存和检查点(建议使用这种技术来减少网络上的长血统),所以我不会有 WhosebugError,通过这样做

for (i <- 2 to 100) {
      //cache and checkpoint ever 30 iterations
      if (i % 30 == 0) {
        graph.cache
        graph.checkpoint
        //I use numEdges in order to start the transformation I need
        graph.numEdges
      }
      //graphs are stored to a list
      //here I use the graph of previous iteration to this iteration
      //and perform a transformation
}

而且我已经这样设置了检查点目录

val sc = new SparkContext(conf)
sc.setCheckpointDir("checkpoints/")

但是,当我最终 运行 我的程序出现异常时

Exception in thread "main" org.apache.spark.SparkException: Invalid checkpoint directory

我用了3台电脑,每台电脑都有Ubuntu 14.04,我还在每台电脑上都使用了预装的spark 1.4.1版本和hadoop 2.4或更高版本。

检查点目录必须是 HDFS 兼容目录(来自 scala 文档 "HDFS-compatible directory where the checkpoint data will be reliably stored. Note that this must be a fault-tolerant file system like HDFS")。因此,如果您在这些节点上设置了 HDFS,请将其指向 "hdfs://[yourcheckpointdirectory]".

如果您已经在节点集群上设置了 HDFS,您可以在目录 HADOOP_HOME/etc/hadoop 中的 "core-site.xml" 中找到您的 hdfs 地址。对我来说,core-site.xml设置为:

<configuration>
      <property>
           <name>fs.default.name</name>
           <value>hdfs://master:9000</value>
      </property>
</configuration>

然后你可以在hdfs上创建一个目录来保存Rdd checkpoint文件,我们把这个目录命名为RddChekPoint, by hadoop hdfs shell:

$ hadoop fs -mkdir /RddCheckPoint

如果你使用pyspark,SparkContext被sc = SparkContext(conf)初始化后,你可以通过

设置checkpoint目录

sc.setCheckpointDir("hdfs://master:9000/RddCheckPoint")

当一个Rdd被checkpoint时,在hdfs目录RddCheckPoint中,可以看到checkpoint文件保存在那里,看一下:

$ hadoop fs -ls /RddCheckPoint