Apache Flink StreamingFileSink 在写入 S3 时发出多个 HEAD 请求,这会导致速率限制

Apache Flink StreamingFileSink making several HEAD requests while writing to S3 which causes ratelimiting

我在 Kinesis Data analytics 上部署了一个 Apache Flink 应用程序。

此应用程序从 Kafka 读取并写入 S3。它写入的 S3 存储桶结构是使用 BucketAssigner here

的 BucketAssigner.A 精简版计算得出的

我遇到的问题是,假设我们必须写入此目录结构:s3://myBucket/folder1/folder2/folder3/myFile.json

在发出 PUT 请求之前,它发出以下 HEAD 请求:

然后它发出 PUT 请求。

它对每个请求都这样做,这会导致 S3 速率限制并在我的 FLink 应用程序中产生背压。

我发现有人对 BucketingSink 有类似的问题:https://lists.apache.org/thread/rbp2gdbxwdrk7zmvwhd2bw56mlwokpzz

那里提到的解决方案是切换到 StreamingFileSink,这就是我正在做的。

关于如何在 StreamingFileSink 中修复此问题的任何想法?

我的SinkConfig如下:

StreamingFileSink
  .forRowFormat(new Path(s3Bucket), new JsonEncoder<>())
  .withBucketAssigner(bucketAssigner)
  .withRollingPolicy(DefaultRollingPolicy.builder()
                .withRolloverInterval(60000)
                .build())
  .build()

JsonEncoder 获取对象并将其转换为 json 并写出像 this

这样的字节

如果有帮助的话,我已经在这个问题中描述了整个管道如何工作的更多细节:Heavy back pressure and huge checkpoint size

Hadoop S3 文件系统试图模仿 S3 之上的文件系统。这意味着:

  • 在写入密钥之前,它通过检查前缀到最后一个“/”的密钥来检查“父目录”是否存在
  • 它创建空标记文件来标记此类父目录的存在
  • 所有这些“存在”请求都是 S3 HEAD 请求,它们既昂贵又开始违反一致的 read-after-create 可见性

因此,Hadoop S3 文件系统具有非常高的“创建文件”延迟并且它很快达到请求速率限制(HEAD 请求在 S3 上具有非常低的请求速率限制)。因此,最好找到写入更少不同文件的方法。

您也可以探索使用 entropy injection。熵注入发生在文件系统级别,因此它应该与 FileSink 一起使用。除了我不确定它将如何与接收器完成的 partitioning/bucketing 交互,所以您可能会或可能不会发现它在实践中可用。如果您尝试了,请反馈!