Spark 结构化流 - 检查点元数据无限增长

Spark structured streaming- checkpoint metadata growing indefinitely

我使用 spark structure streaming 3.1.2。我需要使用 s3 来存储检查点元数据(我知道,它不是检查点元数据的最佳存储)。压缩间隔为 10(默认),我设置为 spark.sql.streaming.minBatchesToRetain=5。当作业 运行 几周后,检查点时间显着增加(导致处理延迟几分钟)。我查看了检查点元数据结构。那里有一条沉重的道路:checkpoint/source/0。单个 .compact 文件重量为 25GB。我查看了它的内容,它包含自批次 0 以来的所有条目(当前批次约为 25000)。

我尝试了一些参数来从压缩文件中删除已经处理过的数据,即: spark.cleaner.referenceTracking.cleanCheckpoints=true - 不起作用。正如我在代码中看到的那样,它与以前版本的流媒体有关,不是吗? spark.sql.streaming.fileSource.log.deletion=truespark.sql.streaming.fileSink.log.deletion=true 不起作用。

即使处理了所有数据(最近的检查点除外),压缩文件也会存储完整的历史记录,所以我预计大部分条目都会被删除。是否有任何参数可以从压缩文件中删除条目或不时优雅地删除压缩文件?

现在我正在测试场景,当我停止作业时,删除大部分 checkpoint/source/0/* 文件,只保留几个最近的检查点(未压缩),然后重新运行作业。作业从最近的检查点正确恢复。当涉及到检查点压缩时,它会因缺少最近的压缩文件而失败。我可能需要编辑最近的压缩文件(而不是删除它)并只保留一些最近的记录。它看起来像是我的问题的可能解决方法,但这种手动删除检查点文件的场景看起来很难看,所以我更喜欢由 Spark 管理的东西。

为了后代:问题是 FileStreamSourceLog class。我需要覆盖方法 shouldRetain,默认情况下 returns true 及其文档说:

Default implementation retains all log entries. Implementations should override the method to change the behavior.