如何清理spark structured streaming中积累的checkpoint文件?

How to clean up the checkpoint files accumulated in spark structured streaming?

我为 SparkContext 添加了检查点,并为长 运行 spark 结构化流作业编写了 kafka 数据流查询。

spark.sparkContext.setCheckpointDir("/tmp/checkpoint")

...

val monitoring_stream = monitoring_df.writeStream
                              .trigger(Trigger.ProcessingTime("120 seconds"))
                              .option("checkpointLocation", "s3a://spark-checkpoint/checkpointfiles")
                             .foreachBatch { (batchDF: DataFrame, batchId: Long) =>
                                if(!batchDF.isEmpty) 
                                {
                                }
                             .start()
                             .awaitTermination()

Spark 作业 运行 稳定。但是,我注意到检查点文件在没有自动清理的情况下累积在 HDFS 和 S3 中。我看到存储 space 不断被这些文件吃掉。有没有办法配置这些检查点文件的保留时间以使其自动删除?或者我是否需要 运行 一些 cron 作业来手动删除它们?如果我手动删除它们,是否会影响正在进行的 spark 作业?谢谢!

spark.cleaner.referenceTracking.cleanCheckpoints需要设置为true,默认为false。