Flink FsStateBackend 使用 S3 存储太昂贵

Flink FsStateBackend using S3 storage too expensive

目前我使用 FsStateBackend 来检查点状态。我正在使用如下代码所示的间隔 10s。但我看到使用检查点的传输存储桶的成本约为 20 美元/天,aws 传输 s3 定价:0.005 美元/1000 次请求 =>(我使用的是 ~4000000 requests/day @@)。我有 7 份工作,其中:

并且 运行 在 AWS EMR 上使用 flink。每个检查点的平均状态大小从 (8KB -> 30M)。检查点后面发生了什么?

// set up checkpoint
        env.enableCheckpointing(1000 or 10000);

        // advanced options:
        // make sure 500 ms of progress happen between checkpoints
        env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
        // checkpoints have to complete within one minute, or are discarded
//            env.getCheckpointConfig().setCheckpointTimeout(60000);
        // allow only one checkpoint to be in progress at the same time
        env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
        // enable externalized checkpoints which are retained after job cancellation
        env.getCheckpointConfig().enableExternalizedCheckpoints(
                CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
        // folder to checkpoint
        StateBackend backend = new FsStateBackend(checkpointPath, true);
        env.setStateBackend(backend);

您使用哪个 S3 实现来创建检查点?这有很大的不同。

虽然您必须将 S3 的 hadoop 实现与 StreamingFileSink 一起使用,但对于检查点来说,这是一个非常糟糕的选择。 Hadoop S3 FS 试图模仿 S3 之上的文件系统:

  • 在写入密钥之前,它通过检查前缀到最后一个“/”的密钥来检查“父目录”是否存在
  • 它创建空的标记文件来标记这样一个父目录的存在
  • 所有这些“存在”请求都是昂贵的 S3 HEAD 请求

因此,Hadoop S3 FS 具有非常高的“创建文件”延迟并且它很快达到请求速率限制(HEAD 请求在 S3 上具有非常低的请求速率限制)。

Presto S3 不尝试做那种魔术; 它只是简单地执行 PUT/GET 操作而没有周围的所有其他东西。 因为 Flink 的检查点假设仅此而已,因此它更加高效和一致。

此外,对于 Hadoop S3,您可能会遇到恢复操作失败的情况,因为状态文件看起来不存在(HEAD 请求导致 S3 负载平衡器中的错误缓存)。只有在一段时间后文件才会可见,然后才能恢复成功。

请注意,可以将 hadoop 版本用于接收器,将 presto 版本用于检查点。在这种情况下,您应该明确使用 s3a:// 作为接收器 (Hadoop) 的方案,并将 s3p:// 用于检查点 (Presto)。

Flink S3 docs.