如何在 Spark Scala 中读取检查点数据框

How to read a checkpoint Dataframe in Spark Scala

我正在尝试测试下面的程序以获取检查点并从检查点位置读取是否存在,以防万一应用程序由于任何原因(如资源不可用)而失败。当我杀死作业并再次重新触发它时,执行从头开始。不确定实现此目标还需要什么。谢谢!!

代码如下:

import org.apache.log4j._
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
import org.apache.spark.SparkContext
import org.apache.spark.SparkConf

object withCheckpoint {

  def main(args: Array[String]): Unit = {

    Logger.getLogger("org").setLevel(Level.ERROR)

    //val conf = new SparkConf().setAppName("Without Checkpoint")
    val conf = new SparkConf().setAppName("With Checkpoint")
    val sc = new SparkContext(conf)


    val checkpointDirectory = "/tmp"

    sc.setCheckpointDir(checkpointDirectory)   // set checkpoint directory

    val spark = SparkSession.builder.appName("Without Checkpoint").getOrCreate()



    /************************************************************************************************************************************************/
    /*                                                Reading source data begins here                                                               */
    /************************************************************************************************************************************************/


    val readCtryDemoFile = spark.read.option("header", "true").csv("/tmp/Ctry_Demo.csv")



    val readCtryRefFile = spark.read.option("header","true").csv("/tmp/ref_ctry.csv")



    val readCtryCntntFile = spark.read.option("header","true").csv("/tmp/ctry_to_continent.csv")


    /************************************************************************************************************************************************/
    /*                                                Reading source data Completes                                                                 */
    /************************************************************************************************************************************************/


    /************************************************************************************************************************************************/
    /*                                                Transformation begins here                                                                    */
    /************************************************************************************************************************************************/


    /*********************************************************************************/
    /* Join above created dataframes to pull respective columns                      */
    /*********************************************************************************/


    val jnCtryDemoCtryref = readCtryDemoFile.join(readCtryRefFile,Seq("NUM_CTRY_CD"))


    val jnCtryCntnt = jnCtryDemoCtryref.join(readCtryCntntFile,Seq("Alpha_2_CTRY_CD"))





    /*********************************************************************************/
    /* Checkpointing the above created Dataframe to the checkpoint Directory         */
    /*********************************************************************************/

    val jnCtryCntntchkpt = jnCtryCntnt.checkpoint()
    jnCtryCntntchkpt.collect()

    /*********************************************************************************/
    /* Creating multiple outputs based on different aggregation keys                 */
    /*********************************************************************************/

    val aggCntnNm = jnCtryCntntchkpt.groupBy("CONTINENT_NM").agg(sum("POPULATION").as("SUM_POPULATION")).orderBy("CONTINENT_NM")
    aggCntnNm.show()


    val aggCtryNm = jnCtryCntntchkpt.groupBy("Ctry_NM").agg(sum("POPULATION").as("SUM_POPULATION")).orderBy("Ctry_NM")
    aggCtryNm.show()


    val aggCtryCd = jnCtryCntntchkpt.groupBy("NUM_CTRY_CD").agg(sum("POPULATION").as("SUM_POPULATION")).orderBy("NUM_CTRY_CD")
    aggCtryCd.show()

    /************************************************************************************************************************************************/
    /*                                                Transformation begins here                                                                    */
    /************************************************************************************************************************************************/

  }
}

我希望我能消除你的一些疑虑,解释检查点并给你一个例子 如何从检查点目录恢复数据集。

检查点主要用于迭代算法和流式处理。

在批处理中,我们习惯于容错(缓存或持久化)。 这意味着,万一节点崩溃,作业不会丢失其状态并且丢失的任务是 重新安排给其他工人。中间结果被写入持久存储(即 必须像 HDFS 或 Cloud Object Storage 一样具有容错能力)

维护 RDD 谱系(缓存或持久化)提供了弹性,但当谱系变得很长时也会导致问题 - 例如:迭代算法、流式处理 - 恢复可能非常昂贵 - 潜在的堆栈溢出

检查点将数据保存到HDFS - 提供跨节点的容错存储 - 血统未保存 - 必须在对 RDD

进行任何操作之前设置检查点

数据集检查点

是Spark的一个特性SQL截断一个逻辑查询计划 这对于高度迭代的数据算法(例如 Spark 使用 Spark SQL 的数据集 API 进行数据操作的 MLlib)。

检查点实际上是 Spark Core 的一个特性(即 Spark SQL 用于分布式计算),允许重新启动驱动程序 分布式计算的先前计算状态失败时 描述为 RDD 。那个已经在Spark中成功使用 Streaming - 用于流处理的现已过时的 Spark 模块 基于 RDD API。 检查点截断要设置检查点的 RDD 的沿袭。 已成功用于迭代机中的Spark MLlib 学习算法,如 ALS。 Spark SQL 中的数据集检查点使用检查点截断 被检查点的数据集的底层 RDD 的沿袭。

使用数据集检查点要求您指定检查点目录。 该目录存放了待检查点的 RDD 的检查点文件。采用 SparkContext.setCheckpointDir 设置检查点目录的路径。 检查点可以是本地的或可靠的,这定义了检查点的可靠性 目录是。本地检查点使用执行器存储来写入检查点文件 到并由于执行程序生命周期被认为是不可靠的。可靠的 检查点使用可靠的数据存储,如 Hadoop HDFS。

正在编写检查点目录

package tests

import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._


/**
  * Checkpointing
  *     - Maintaining RDD lineage provides resilience but can also cause problems when the lineage gets very long
  *         - For example: iterative algorithms, streaming
  *     - Recovery can be very expensive
  *     - Potencial stack overflow
  *     - Checkpointing saves the data to HDFS
  *         - Provides fault-tolerant storage across nodes
  *         - Lineage is not saved
  *         - Must be checkpointed before any actions on the RDD
  */
object WriteCheckPoint {
  val spark = SparkSession
    .builder()
    .appName("WriteCheckPoint")
    .master("local[*]")
    .config("spark.sql.shuffle.partitions","4") //Change to a more reasonable default number of partitions for our data
    .config("spark.app.id","WriteCheckPoint") // To silence Metrics warning
    .getOrCreate()

  val sqlContext = spark.sqlContext

  val sc = spark.sparkContext

  // Remember to set the checkpoint directory
  spark.sparkContext.setCheckpointDir("hdfs://localhost/user/cloudera/checkpoint")

  def main(args: Array[String]): Unit = {

    Logger.getRootLogger.setLevel(Level.ERROR)
    // Set org.apache.spark.rdd.ReliableRDDCheckpointData logger to INFO
    // to see what happens while an RDD is checkpointed
    // Let's use log4j API so, you should add import org.apache.log4j.{Level, Logger}
    Logger.getLogger("org.apache.spark.rdd.ReliableRDDCheckpointData").setLevel(Level.INFO)

    try {
      val nums = spark.range(5).withColumn("random", rand()).filter("random > 0.5")
      // Must be checkpointed before any actions on the RDD
      nums.checkpoint
      // Save the schema as it is going to use to reconstruct nums dataset from a RDD
      val schema = nums.schema
      schema.printTreeString()

      nums.show()

      // To have the opportunity to view the web console of Spark: http://localhost:4040/
      println("Type whatever to the console to exit......")
      scala.io.StdIn.readLine()
    } finally {
      sc.stop()
      println("SparkContext stopped")
      spark.stop()
      println("SparkSession stopped")
    }
  }
}

输出

20/06/15 16:42:50 INFO ReliableRDDCheckpointData: Done checkpointing RDD 4 to hdfs://localhost/user/cloudera/checkpoint/607daeca-6ec2-471c-9033-9c4c236880a9/rdd-4, new parent is RDD 5
root
 |-- id: long (nullable = false)
 |-- random: double (nullable = false)

+---+------------------+
| id|            random|
+---+------------------+
|  2|0.9550560942227814|
+---+------------------+

您将必须定义几个受保护的辅助对象 在包 org.apache.spark 和 org.apache.spark.sql

package org.apache.spark

/**
  * SparkContext.checkpointFile is a `protected[spark]` method
  * define a helper object to "escape" the package lock-in
  */
object my {
  import scala.reflect.ClassTag
  import org.apache.spark.rdd.RDD
  def recover[T: ClassTag](sc: SparkContext, path: String): RDD[T] = {
    sc.checkpointFile[T](path)
  }
}
package org.apache.spark.sql

object my2 {
  import org.apache.spark.rdd.RDD
  import org.apache.spark.sql.{DataFrame, SparkSession}
  import org.apache.spark.sql.catalyst.InternalRow
  import org.apache.spark.sql.types.StructType
  def createDataFrame(spark: SparkSession, catalystRows: RDD[InternalRow], schema: StructType): DataFrame = {
    spark.internalCreateDataFrame(catalystRows, schema)
  }
}

正在读取检查点目录

package tests

import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types.{DoubleType, IntegerType, StringType, StructType}


/**
  * Recovering RDD From Checkpoint Files
  * — SparkContext.checkpointFile Method
  *   SparkContext.checkpointFile(directory: String)
  *   checkpointFile reads (recovers) a RDD from a checkpoint directory.
  * Note SparkContext.checkpointFile is a protected[spark] method
  * so the code to access it has to be in org.apache.spark package.
  * Internally, checkpointFile creates a ReliableCheckpointRDD in a scope.
  */
object ReadingCheckPoint {
  val spark = SparkSession
    .builder()
    .appName("ReadingCheckPoint")
    .master("local[*]")
    .config("spark.sql.shuffle.partitions","4") //Change to a more reasonable default number of partitions for our data
    .config("spark.app.id","ReadingCheckPoint") // To silence Metrics warning
    .getOrCreate()

  val sqlContext = spark.sqlContext

  val sc = spark.sparkContext

  // Make sure to use the same checkpoint directory
  val pathCheckpoint = "hdfs://localhost/user/cloudera/checkpoint/607daeca-6ec2-471c-9033-9c4c236880a9/rdd-4"

  def main(args: Array[String]): Unit = {

    try {

      Logger.getRootLogger.setLevel(Level.ERROR)

      val schema = new StructType()
        .add("field1",IntegerType)
        .add("field2",DoubleType)

      import org.apache.spark.my
      import org.apache.spark.sql.catalyst.InternalRow
      val numsRddRecovered = my.recover[InternalRow](spark.sparkContext, pathCheckpoint) //org.apache.spark.rdd.RDD[org.apache.spark.sql.catalyst.InternalRow]
      numsRddRecovered.foreach(x => println(x.toString))

      // We have to convert RDD[InternalRow] to DataFrame
      import org.apache.spark.sql.my2
      val numsRecovered = my2.createDataFrame(spark, numsRddRecovered, schema)
      numsRecovered.show()


      // To have the opportunity to view the web console of Spark: http://localhost:4040/
      println("Type whatever to the console to exit......")
      scala.io.StdIn.readLine()
    } finally {
      sc.stop()
      println("SparkContext stopped")
      spark.stop()
      println("SparkSession stopped")
    }
  }
}

输出

[0,2,3fee8fd1cc5108ef]
+------+------------------+
|field1|            field2|
+------+------------------+
|     2|0.9550560942227814|
+------+------------------+

您可以按照此 link 访问 Spark 文档:Checkpointing