如何删除由 Spark Structured Streaming 创建的旧数据?
How to delete old data that was created by Spark Structured Streaming?
如何删除由 Spark Structured Streaming (Spark 2.4.5) 创建的旧数据?
我在 HDFS 上有 parquet/avro 格式的数据(不是 Delta),由 Spark Structured Streaming 创建,并且按时间划分(年、月、日、小时)。
数据创建如下:
query = df.writeStream.format("avro").partitionBy("year", "month", "day", "hour").outputMode("append").option("checkpointLocation", "/data/avro.cp").start("/data/avro")
结果我有以下分区文件夹布局:
./year=2020/month=3/day=13/hour=12
./year=2020/month=3/day=13/hour=13
./year=2020/month=3/day=13/hour=14
./year=2020/month=3/day=13/hour=15
./year=2020/month=3/day=13/hour=16
如何删除旧数据,例如早于 year=2020,month=2,day=13,hour=14 的数据?
只是删除相关文件夹
./year=2020/month=3/day=13/hour=12
./year=2020/month=3/day=13/hour=13
从文件系统读取批数据帧时抛出异常:
df = spark.read.format("avro").load("/data/avro")
java.io.FileNotFoundException: File file:/data/avro/year=2020/month=3/day=13/hour=12/part-00000-0cc84e65-3f49-4686-85e3-1ecf48952794.c000.avro does not exist
据我所知,这与检查点使用的 _spark_metadata
文件夹有某种关系。
感谢您的帮助。
您不能删除该文件夹,除非您也删除它对应的检查点文件夹。您正试图在检查点仍然知道该文件夹的情况下删除该文件夹,这就是发生错误的原因。
但是,除非必要,否则我真的不建议弄乱检查点文件夹。如果您的情况可行,我建议您将旧数据移动到不同的数据存储类型,例如 AWS Standard -> Glacier。
好像找到了solution/workaround。
关键概念是使用 FileStreamSinkLog,然后使用 SinkFileStatus 更新它,并将操作设置为 delete
:
加载 FileStreamSinkLog
sinkLog = new FileStreamSinkLog(1, spark, full-path-to-spark-metadata-dir);
获取最新的 SinkFileStatus
Option<Tuple2<Object, SinkFileStatus[]>> latest = sinkLog.getLatest();
long batchId = (long)latest.get()._1;
SinkFileStatus[] fileStatuses = latest.get()._2;
删除旧文件
将带有 delete
操作的新条目添加到 fileStatuses
数组
用更新的 fileStatuses
写回 batchId
日志文件
然而,这需要停止结构化流作业。
所以没有解决方案可以在不停止的情况下删除Spark Structured Streaming写入的旧文件。
为了方便 copy/paste,这里是 spark 3.0.1 的工作代码 (scala) 片段。删除一个文件并写入一批新文件:
import org.apache.spark.sql.execution.streaming.FileStreamSinkLog
import scala.language.postfixOps
import scala.sys.process._
import scala.util.Try
val sinkLog = new FileStreamSinkLog (
1,
spark,
SPARK_METADATA_ROOT
)
val head = sinkLog.allFiles().head
val deleteCommand = s"hadoop fs -rm ${head.path}"
println (Try (deleteCommand ! processlogger).getOrElse(s""""$deleteCommand" failed""") )
head.copy(action = FileStreamSinkLog.DELETE_ACTION)
sinkLog
.add (
latestBatch.get._1+1,
Array(head.copy(action = FileStreamSinkLog.DELETE_ACTION))
)
Spark 3.0.0及以上版本已经实现
- 原刊:https://issues.apache.org/jira/browse/SPARK-20568
- 配置:https://github.com/apache/spark/blob/branch-3.0/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamOptions.scala#L117
基本上它为已提交的文件添加了 3 种策略(ARCHIVE、DELETE、OFF)并且只允许对其进行配置。
老实说,我自己从未尝试过,但我在这里看到了一些针对 Spark 3+ 的答案,这绝对值得一提。
如何删除由 Spark Structured Streaming (Spark 2.4.5) 创建的旧数据?
我在 HDFS 上有 parquet/avro 格式的数据(不是 Delta),由 Spark Structured Streaming 创建,并且按时间划分(年、月、日、小时)。
数据创建如下:
query = df.writeStream.format("avro").partitionBy("year", "month", "day", "hour").outputMode("append").option("checkpointLocation", "/data/avro.cp").start("/data/avro")
结果我有以下分区文件夹布局:
./year=2020/month=3/day=13/hour=12
./year=2020/month=3/day=13/hour=13
./year=2020/month=3/day=13/hour=14
./year=2020/month=3/day=13/hour=15
./year=2020/month=3/day=13/hour=16
如何删除旧数据,例如早于 year=2020,month=2,day=13,hour=14 的数据?
只是删除相关文件夹
./year=2020/month=3/day=13/hour=12
./year=2020/month=3/day=13/hour=13
从文件系统读取批数据帧时抛出异常:
df = spark.read.format("avro").load("/data/avro")
java.io.FileNotFoundException: File file:/data/avro/year=2020/month=3/day=13/hour=12/part-00000-0cc84e65-3f49-4686-85e3-1ecf48952794.c000.avro does not exist
据我所知,这与检查点使用的 _spark_metadata
文件夹有某种关系。
感谢您的帮助。
您不能删除该文件夹,除非您也删除它对应的检查点文件夹。您正试图在检查点仍然知道该文件夹的情况下删除该文件夹,这就是发生错误的原因。
但是,除非必要,否则我真的不建议弄乱检查点文件夹。如果您的情况可行,我建议您将旧数据移动到不同的数据存储类型,例如 AWS Standard -> Glacier。
好像找到了solution/workaround。
关键概念是使用 FileStreamSinkLog,然后使用 SinkFileStatus 更新它,并将操作设置为 delete
:
加载 FileStreamSinkLog
sinkLog = new FileStreamSinkLog(1, spark, full-path-to-spark-metadata-dir);
获取最新的 SinkFileStatus
Option<Tuple2<Object, SinkFileStatus[]>> latest = sinkLog.getLatest(); long batchId = (long)latest.get()._1; SinkFileStatus[] fileStatuses = latest.get()._2;
删除旧文件
将带有
delete
操作的新条目添加到fileStatuses
数组用更新的
写回fileStatuses
batchId
日志文件
然而,这需要停止结构化流作业。 所以没有解决方案可以在不停止的情况下删除Spark Structured Streaming写入的旧文件。
为了方便 copy/paste,这里是 spark 3.0.1 的工作代码 (scala) 片段。删除一个文件并写入一批新文件:
import org.apache.spark.sql.execution.streaming.FileStreamSinkLog
import scala.language.postfixOps
import scala.sys.process._
import scala.util.Try
val sinkLog = new FileStreamSinkLog (
1,
spark,
SPARK_METADATA_ROOT
)
val head = sinkLog.allFiles().head
val deleteCommand = s"hadoop fs -rm ${head.path}"
println (Try (deleteCommand ! processlogger).getOrElse(s""""$deleteCommand" failed""") )
head.copy(action = FileStreamSinkLog.DELETE_ACTION)
sinkLog
.add (
latestBatch.get._1+1,
Array(head.copy(action = FileStreamSinkLog.DELETE_ACTION))
)
Spark 3.0.0及以上版本已经实现
- 原刊:https://issues.apache.org/jira/browse/SPARK-20568
- 配置:https://github.com/apache/spark/blob/branch-3.0/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamOptions.scala#L117
基本上它为已提交的文件添加了 3 种策略(ARCHIVE、DELETE、OFF)并且只允许对其进行配置。
老实说,我自己从未尝试过,但我在这里看到了一些针对 Spark 3+ 的答案,这绝对值得一提。