Dataflow 2.1.0 流应用程序不清理临时文件夹

Dataflow 2.1.0 streaming application is not cleaning temp folders

我们有一个数据流应用程序,它从 Pub/Sub、windows 读取固定大小的 1 分钟持续时间 windows 并将原始消息写入具有 10 个分片的 GCS。我们的应用程序已经 运行 10 天了,它创建了一个 .temp-beam-2017**** 文件夹。它下面有大约 6200 个文件,并且这个数字每天都在增长。

我的理解是数据流会在写入完成后将临时文件移动到指定的输出文件夹。

您能否建议在这种情况下可以做什么?每个文件大约 100MB。

inputCollection.apply("Windowing",
            Window.<String>into(FixedWindows.of(ONE_MINUTE))
                  .triggering(AfterProcessingTime.pastFirstElementInPane()
                                .plusDelayOf(ONE_MINUTE))
                  .withAllowedLateness(ONE_HOUR)
                  .discardingFiredPanes()
                  )

    //Writing to GCS
                .apply(TextIO.write()
                            .withWindowedWrites()
                            .withNumShards(10)
                            .to(options.getOutputPath())
                            .withFilenamePolicy(
                                    new 
WindowedFileNames(options.getOutputPath())));

14400 和 13900 之间的差异很可能是因为您的管道没有获得任何事件时间属于特定 window 和分片的数据。在编写windowed 集合时,我们不会为"missing" windows 创建空文件,因为一般情况下,无法知道哪些windows 是[=17] =]:从理论上讲,固定或滑动 windows 非常清楚,但自定义 windowing 函数或会话等则不然。此外,分片的分配是随机的,因此有可能对于特定的 window 到达的数据很少,然后很有可能 10 个分片中的一些分片没有得到任何数据。

至于为什么会遗留临时文件:好像是pipeline在写文件到GCS的时候偶尔会出现异常。剩余的文件是 "zombies" 那些失败的数据写入尝试。我们目前在流式模式下自动清理此类文件做得不好(在批处理中,当管道完成时删除整个临时目录是安全的,但在流式中我们不能这样做,我们删除只有个别文件被重命名为它们的最终位置),但删除该目录中的旧临时文件应该是安全的。

我已提交 https://issues.apache.org/jira/browse/BEAM-3145 以改进后者。