Spark Checkpointing Non-Streaming - 检查点文件可用于后续作业 运行 或驱动程序
Spark Checkpointing Non-Streaming - Checkpoint files can be used in subsequent job run or driver program
这篇文章来自一篇有趣的文章:http://www.lifeisafile.com/Apache-Spark-Caching-Vs-Checkpointing/
" ... Checkpointing 将 rdd 物理存储到 hdfs 并销毁创建它的沿袭。即使 Spark 应用程序终止,检查点文件也不会被删除。检查点文件可用于后续作业 运行 或驱动程序。 为 RDD 设置检查点会导致双重计算,因为该操作将首先调用缓存,然后再执行实际的计算和写入检查点目录的工作。 ...""= 12=]
我似乎记得在别处读到检查点文件仅适用于给定 Spark 应用程序中的作业或共享作业。
寻求说明以及新应用程序如何使用检查点目录,因为我认为这是不可能的。
I seem to remember reading elsewhere that checkpointed files were only
for the Job or shared Jobs in a given Spark App.
即使在停止 SparkContext
后,Spark 也不会清除 checkpoint
目录。我们可以通过设置以下属性来打开自动清理:
spark.conf.set("spark.cleaner.referenceTracking.cleanCheckpoints", "true")
Looking for clarification and how a new App could use the checkpoint
directory, as I did not think that was possible.
要再次重用检查点数据集,我们可以按照以下步骤操作:
- 启动上下文 1 和检查点数据集:
// Setting logger on for ReliableRDDCheckpointData
scala> import org.apache.log4j.{Level, Logger}
scala> Logger.getLogger("org.apache.spark.rdd.ReliableRDDCheckpointData").setLevel(Level.INFO)
// Note application ID
scala> spark.sparkContext.applicationId
res1: String = local-1567969150914
// Set checkpoint Dir
scala> spark.sparkContext.setCheckpointDir("/tmp/spark/checkpoint")
// File system localtion
Users-Air:checkpoint User$ pwd
/tmp/spark/checkpoint
Users-Air:checkpoint User$ ls -lrth
total 0
drwxr-xr-x 2 User wheel 64B Sep 8 15:00 7aabcb46-e707-49dd-8893-148a162368d5
// Create Dataframe
scala> val df = spark.range(3).withColumn("random", rand())
scala> df.show
+---+------------------+
| id| random|
+---+------------------+
| 0|0.8517439782779789|
| 1| 0.288880016535247|
| 2|0.7027831376739603|
+---+------------------+
scala> df.schema
res5: org.apache.spark.sql.types.StructType = StructType(StructField(id,LongType,false), StructField(random,DoubleType,false))
//Check point
scala> df.checkpoint
19/09/08 15:02:22 INFO ReliableRDDCheckpointData: Done checkpointing RDD 7 to file:/tmp/spark/checkpoint/7aabcb46-e707-49dd-8893-148a162368d5/rdd-7, new parent is RDD 8
res6: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [id: bigint, random: double]
// New RDD saved in checkpoint directory /tmp/spark/checkpoint/7aabcb46-e707-49dd-8893-148a162368d5/rdd-7
Users-Air:7aabcb46-e707-49dd-8893-148a162368d5 User$ cd rdd-7/
Users-Air:rdd-7 User$ ls -lrth
total 32
-rw-r--r-- 1 User wheel 4B Sep 8 15:02 part-00000
-rw-r--r-- 1 User wheel 163B Sep 8 15:02 part-00002
-rw-r--r-- 1 User wheel 163B Sep 8 15:02 part-00001
-rw-r--r-- 1 User wheel 163B Sep 8 15:02 part-00003
// Stop context
scala> spark.stop
scala> :quit
- 启动新的上下文 2 并读取检查点数据集
// Initilaized New Context
scala> spark.sparkContext.applicationId
res0: String = local-1567969525656
SparkContext.checkpointFile 是一个 protected[spark]
方法所以我们需要在 org.apache.spark
包
下创建 class
scala> :paste -raw
// Entering paste mode (ctrl-D to finish)
package org.apache.spark
object RecoverCheckpoint {
import scala.reflect.ClassTag
import org.apache.spark.rdd.RDD
def recover[T: ClassTag](sc: SparkContext, path: String): RDD[T] = {
sc.checkpointFile[T](path)
}
}
现在恢复 Checkpointed RDD 作为RDD[InternalRow]
使用上面的RecoverCheckpoint
class
// Path from first context
scala> val checkPointFilePath = "/tmp/spark/checkpoint/7aabcb46-e707-49dd-8893-148a162368d5/rdd-7"
scala> import org.apache.spark.RecoverCheckpoint
scala> import org.apache.spark.sql.catalyst.InternalRow
scala> import org.apache.spark.sql.types._
scala> val RecoveredRDD = RecoverCheckpoint.recover[InternalRow](spark.sparkContext, checkPointFilePath)
// RDD is recovered as RDD[InternalRow]
scala> RecoveredRDD
res2: org.apache.spark.rdd.RDD[org.apache.spark.sql.catalyst.InternalRow] = ReliableCheckpointRDD[0] at recover at <console>:34
// Count matches with original
RecoveredRDD.count
res3: Long = 3
将恢复的 RDD 转换为数据集创建 RecoverCheckpointRDDToDF
class
// Need to convert RDD[InternalRow] to DataFrame
scala> :paste -raw
// Entering paste mode (ctrl-D to finish)
// Creating Dataframe from RDD[InternalRow]
package org.apache.spark.sql
object RecoverCheckpointRDDToDF {
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.types.StructType
def createDataFrame(spark: SparkSession, catalystRows: RDD[InternalRow], schema: StructType): DataFrame = {
spark.internalCreateDataFrame(catalystRows, schema)
}
}
最后,使用 RecoverCheckpointRDDToDF
并取回数据集
// Schema should be know
val df_schema = StructType(List(StructField("id",LongType,false), StructField("random",DoubleType,false)))
df_schema: org.apache.spark.sql.types.StructType = StructType(StructField(id,LongType,false), StructField(random,DoubleType,false))
scala> import org.apache.spark.sql.RecoverCheckpointRDDToDF
scala> val df = RecoverCheckpointRDDToDF.createDataFrame(spark, RecoveredRDD, df_schema)
scala> df.show
+---+------------------+
| id| random|
+---+------------------+
| 0|0.8517439782779789|
| 1| 0.288880016535247|
| 2|0.7027831376739603|
+---+------------------+
// Same as first context
// Stop context
scala> spark.stop
scala> :quit
这篇文章来自一篇有趣的文章:http://www.lifeisafile.com/Apache-Spark-Caching-Vs-Checkpointing/
" ... Checkpointing 将 rdd 物理存储到 hdfs 并销毁创建它的沿袭。即使 Spark 应用程序终止,检查点文件也不会被删除。检查点文件可用于后续作业 运行 或驱动程序。 为 RDD 设置检查点会导致双重计算,因为该操作将首先调用缓存,然后再执行实际的计算和写入检查点目录的工作。 ...""= 12=]
我似乎记得在别处读到检查点文件仅适用于给定 Spark 应用程序中的作业或共享作业。
寻求说明以及新应用程序如何使用检查点目录,因为我认为这是不可能的。
I seem to remember reading elsewhere that checkpointed files were only for the Job or shared Jobs in a given Spark App.
即使在停止 SparkContext
后,Spark 也不会清除 checkpoint
目录。我们可以通过设置以下属性来打开自动清理:
spark.conf.set("spark.cleaner.referenceTracking.cleanCheckpoints", "true")
Looking for clarification and how a new App could use the checkpoint directory, as I did not think that was possible.
要再次重用检查点数据集,我们可以按照以下步骤操作:
- 启动上下文 1 和检查点数据集:
// Setting logger on for ReliableRDDCheckpointData
scala> import org.apache.log4j.{Level, Logger}
scala> Logger.getLogger("org.apache.spark.rdd.ReliableRDDCheckpointData").setLevel(Level.INFO)
// Note application ID
scala> spark.sparkContext.applicationId
res1: String = local-1567969150914
// Set checkpoint Dir
scala> spark.sparkContext.setCheckpointDir("/tmp/spark/checkpoint")
// File system localtion
Users-Air:checkpoint User$ pwd
/tmp/spark/checkpoint
Users-Air:checkpoint User$ ls -lrth
total 0
drwxr-xr-x 2 User wheel 64B Sep 8 15:00 7aabcb46-e707-49dd-8893-148a162368d5
// Create Dataframe
scala> val df = spark.range(3).withColumn("random", rand())
scala> df.show
+---+------------------+
| id| random|
+---+------------------+
| 0|0.8517439782779789|
| 1| 0.288880016535247|
| 2|0.7027831376739603|
+---+------------------+
scala> df.schema
res5: org.apache.spark.sql.types.StructType = StructType(StructField(id,LongType,false), StructField(random,DoubleType,false))
//Check point
scala> df.checkpoint
19/09/08 15:02:22 INFO ReliableRDDCheckpointData: Done checkpointing RDD 7 to file:/tmp/spark/checkpoint/7aabcb46-e707-49dd-8893-148a162368d5/rdd-7, new parent is RDD 8
res6: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [id: bigint, random: double]
// New RDD saved in checkpoint directory /tmp/spark/checkpoint/7aabcb46-e707-49dd-8893-148a162368d5/rdd-7
Users-Air:7aabcb46-e707-49dd-8893-148a162368d5 User$ cd rdd-7/
Users-Air:rdd-7 User$ ls -lrth
total 32
-rw-r--r-- 1 User wheel 4B Sep 8 15:02 part-00000
-rw-r--r-- 1 User wheel 163B Sep 8 15:02 part-00002
-rw-r--r-- 1 User wheel 163B Sep 8 15:02 part-00001
-rw-r--r-- 1 User wheel 163B Sep 8 15:02 part-00003
// Stop context
scala> spark.stop
scala> :quit
- 启动新的上下文 2 并读取检查点数据集
// Initilaized New Context
scala> spark.sparkContext.applicationId
res0: String = local-1567969525656
SparkContext.checkpointFile 是一个 protected[spark]
方法所以我们需要在 org.apache.spark
包
scala> :paste -raw
// Entering paste mode (ctrl-D to finish)
package org.apache.spark
object RecoverCheckpoint {
import scala.reflect.ClassTag
import org.apache.spark.rdd.RDD
def recover[T: ClassTag](sc: SparkContext, path: String): RDD[T] = {
sc.checkpointFile[T](path)
}
}
现在恢复 Checkpointed RDD 作为RDD[InternalRow]
使用上面的RecoverCheckpoint
class
// Path from first context
scala> val checkPointFilePath = "/tmp/spark/checkpoint/7aabcb46-e707-49dd-8893-148a162368d5/rdd-7"
scala> import org.apache.spark.RecoverCheckpoint
scala> import org.apache.spark.sql.catalyst.InternalRow
scala> import org.apache.spark.sql.types._
scala> val RecoveredRDD = RecoverCheckpoint.recover[InternalRow](spark.sparkContext, checkPointFilePath)
// RDD is recovered as RDD[InternalRow]
scala> RecoveredRDD
res2: org.apache.spark.rdd.RDD[org.apache.spark.sql.catalyst.InternalRow] = ReliableCheckpointRDD[0] at recover at <console>:34
// Count matches with original
RecoveredRDD.count
res3: Long = 3
将恢复的 RDD 转换为数据集创建 RecoverCheckpointRDDToDF
class
// Need to convert RDD[InternalRow] to DataFrame
scala> :paste -raw
// Entering paste mode (ctrl-D to finish)
// Creating Dataframe from RDD[InternalRow]
package org.apache.spark.sql
object RecoverCheckpointRDDToDF {
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.types.StructType
def createDataFrame(spark: SparkSession, catalystRows: RDD[InternalRow], schema: StructType): DataFrame = {
spark.internalCreateDataFrame(catalystRows, schema)
}
}
最后,使用 RecoverCheckpointRDDToDF
并取回数据集
// Schema should be know
val df_schema = StructType(List(StructField("id",LongType,false), StructField("random",DoubleType,false)))
df_schema: org.apache.spark.sql.types.StructType = StructType(StructField(id,LongType,false), StructField(random,DoubleType,false))
scala> import org.apache.spark.sql.RecoverCheckpointRDDToDF
scala> val df = RecoverCheckpointRDDToDF.createDataFrame(spark, RecoveredRDD, df_schema)
scala> df.show
+---+------------------+
| id| random|
+---+------------------+
| 0|0.8517439782779789|
| 1| 0.288880016535247|
| 2|0.7027831376739603|
+---+------------------+
// Same as first context
// Stop context
scala> spark.stop
scala> :quit