Flink FsStateBackend 使用 S3 存储太昂贵
Flink FsStateBackend using S3 storage too expensive
目前我使用 FsStateBackend 来检查点状态。我正在使用如下代码所示的间隔 10s。但我看到使用检查点的传输存储桶的成本约为 20 美元/天,aws 传输 s3 定价:0.005 美元/1000 次请求 =>(我使用的是 ~4000000 requests/day @@)。我有 7 份工作,其中:
- 6 个作业使用检查点间隔 = 10000(毫秒)
- 1 个作业使用检查点间隔 = 1000(毫秒)
并且 运行 在 AWS EMR 上使用 flink。每个检查点的平均状态大小从 (8KB -> 30M)。检查点后面发生了什么?
// set up checkpoint
env.enableCheckpointing(1000 or 10000);
// advanced options:
// make sure 500 ms of progress happen between checkpoints
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
// checkpoints have to complete within one minute, or are discarded
// env.getCheckpointConfig().setCheckpointTimeout(60000);
// allow only one checkpoint to be in progress at the same time
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
// enable externalized checkpoints which are retained after job cancellation
env.getCheckpointConfig().enableExternalizedCheckpoints(
CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
// folder to checkpoint
StateBackend backend = new FsStateBackend(checkpointPath, true);
env.setStateBackend(backend);
您使用哪个 S3 实现来创建检查点?这有很大的不同。
虽然您必须将 S3 的 hadoop 实现与 StreamingFileSink
一起使用,但对于检查点来说,这是一个非常糟糕的选择。 Hadoop S3 FS 试图模仿 S3 之上的文件系统:
- 在写入密钥之前,它通过检查前缀到最后一个“/”的密钥来检查“父目录”是否存在
- 它创建空的标记文件来标记这样一个父目录的存在
- 所有这些“存在”请求都是昂贵的 S3 HEAD 请求
因此,Hadoop S3 FS 具有非常高的“创建文件”延迟并且它很快达到请求速率限制(HEAD 请求在 S3 上具有非常低的请求速率限制)。
Presto S3 不尝试做那种魔术;
它只是简单地执行 PUT/GET 操作而没有周围的所有其他东西。
因为 Flink 的检查点假设仅此而已,因此它更加高效和一致。
此外,对于 Hadoop S3,您可能会遇到恢复操作失败的情况,因为状态文件看起来不存在(HEAD 请求导致 S3 负载平衡器中的错误缓存)。只有在一段时间后文件才会可见,然后才能恢复成功。
请注意,可以将 hadoop 版本用于接收器,将 presto 版本用于检查点。在这种情况下,您应该明确使用 s3a:// 作为接收器 (Hadoop) 的方案,并将 s3p:// 用于检查点 (Presto)。
目前我使用 FsStateBackend 来检查点状态。我正在使用如下代码所示的间隔 10s。但我看到使用检查点的传输存储桶的成本约为 20 美元/天,aws 传输 s3 定价:0.005 美元/1000 次请求 =>(我使用的是 ~4000000 requests/day @@)。我有 7 份工作,其中:
- 6 个作业使用检查点间隔 = 10000(毫秒)
- 1 个作业使用检查点间隔 = 1000(毫秒)
并且 运行 在 AWS EMR 上使用 flink。每个检查点的平均状态大小从 (8KB -> 30M)。检查点后面发生了什么?
// set up checkpoint
env.enableCheckpointing(1000 or 10000);
// advanced options:
// make sure 500 ms of progress happen between checkpoints
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
// checkpoints have to complete within one minute, or are discarded
// env.getCheckpointConfig().setCheckpointTimeout(60000);
// allow only one checkpoint to be in progress at the same time
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
// enable externalized checkpoints which are retained after job cancellation
env.getCheckpointConfig().enableExternalizedCheckpoints(
CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
// folder to checkpoint
StateBackend backend = new FsStateBackend(checkpointPath, true);
env.setStateBackend(backend);
您使用哪个 S3 实现来创建检查点?这有很大的不同。
虽然您必须将 S3 的 hadoop 实现与 StreamingFileSink
一起使用,但对于检查点来说,这是一个非常糟糕的选择。 Hadoop S3 FS 试图模仿 S3 之上的文件系统:
- 在写入密钥之前,它通过检查前缀到最后一个“/”的密钥来检查“父目录”是否存在
- 它创建空的标记文件来标记这样一个父目录的存在
- 所有这些“存在”请求都是昂贵的 S3 HEAD 请求
因此,Hadoop S3 FS 具有非常高的“创建文件”延迟并且它很快达到请求速率限制(HEAD 请求在 S3 上具有非常低的请求速率限制)。
Presto S3 不尝试做那种魔术; 它只是简单地执行 PUT/GET 操作而没有周围的所有其他东西。 因为 Flink 的检查点假设仅此而已,因此它更加高效和一致。
此外,对于 Hadoop S3,您可能会遇到恢复操作失败的情况,因为状态文件看起来不存在(HEAD 请求导致 S3 负载平衡器中的错误缓存)。只有在一段时间后文件才会可见,然后才能恢复成功。
请注意,可以将 hadoop 版本用于接收器,将 presto 版本用于检查点。在这种情况下,您应该明确使用 s3a:// 作为接收器 (Hadoop) 的方案,并将 s3p:// 用于检查点 (Presto)。