如何删除由 Spark Structured Streaming 创建的旧数据?

How to delete old data that was created by Spark Structured Streaming?

如何删除由 Spark Structured Streaming (Spark 2.4.5) 创建的旧数据?

我在 HDFS 上有 parquet/avro 格式的数据(不是 Delta),由 Spark Structured Streaming 创建,并且按时间划分(年、月、日、小时)。

数据创建如下:

query = df.writeStream.format("avro").partitionBy("year", "month", "day", "hour").outputMode("append").option("checkpointLocation", "/data/avro.cp").start("/data/avro")

结果我有以下分区文件夹布局:

./year=2020/month=3/day=13/hour=12
./year=2020/month=3/day=13/hour=13
./year=2020/month=3/day=13/hour=14
./year=2020/month=3/day=13/hour=15
./year=2020/month=3/day=13/hour=16

如何删除旧数据,例如早于 year=2020,month=2,day=13,hour=14 的数据?

只是删除相关文件夹

./year=2020/month=3/day=13/hour=12
./year=2020/month=3/day=13/hour=13

从文件系统读取批数据帧时抛出异常:

df = spark.read.format("avro").load("/data/avro")
java.io.FileNotFoundException: File file:/data/avro/year=2020/month=3/day=13/hour=12/part-00000-0cc84e65-3f49-4686-85e3-1ecf48952794.c000.avro does not exist

据我所知,这与检查点使用的 _spark_metadata 文件夹有某种关系。

感谢您的帮助。

您不能删除该文件夹,除非您也删除它对应的检查点文件夹。您正试图在检查点仍然知道该文件夹的情况下删除该文件夹,这就是发生错误的原因。

但是,除非必要,否则我真的不建议弄乱检查点文件夹。如果您的情况可行,我建议您将旧数据移动到不同的数据存储类型,例如 AWS Standard -> Glacier。

好像找到了solution/workaround。 关键概念是使用 FileStreamSinkLog,然后使用 SinkFileStatus 更新它,并将操作设置为 delete:

  1. 加载 FileStreamSinkLog

     sinkLog = new FileStreamSinkLog(1, spark, full-path-to-spark-metadata-dir);
    
  2. 获取最新的 SinkFileStatus

     Option<Tuple2<Object, SinkFileStatus[]>> latest = sinkLog.getLatest();
     long batchId = (long)latest.get()._1;
     SinkFileStatus[] fileStatuses = latest.get()._2;
    
  3. 删除旧文件

  4. 将带有 delete 操作的新条目添加到 fileStatuses 数组

  5. 用更新的 fileStatuses

    写回 batchId 日志文件

然而,这需要停止结构化流作业。 所以没有解决方案可以在不停止的情况下删除Spark Structured Streaming写入的旧文件。

为了方便 copy/paste,这里是 spark 3.0.1 的工作代码 (scala) 片段。删除一个文件并写入一批新文件:

import org.apache.spark.sql.execution.streaming.FileStreamSinkLog

import scala.language.postfixOps
import scala.sys.process._
import scala.util.Try

        val sinkLog = new FileStreamSinkLog (
            1,
            spark,
            SPARK_METADATA_ROOT
        )
        val head = sinkLog.allFiles().head

        val deleteCommand = s"hadoop fs -rm ${head.path}"
        println (Try (deleteCommand ! processlogger).getOrElse(s""""$deleteCommand" failed""") )

        head.copy(action = FileStreamSinkLog.DELETE_ACTION)

        sinkLog
            .add (
                latestBatch.get._1+1,
                Array(head.copy(action = FileStreamSinkLog.DELETE_ACTION))
                )

Spark 3.0.0及以上版本已经实现

基本上它为已提交的文件添加了 3 种策略(ARCHIVE、DELETE、OFF)并且只允许对其进行配置。

老实说,我自己从未尝试过,但我在这里看到了一些针对 Spark 3+ 的答案,这绝对值得一提。