使用 S3AFileSystem 的 Flink 不会从 S3 读取子文件夹

Flink using S3AFileSystem does not read subfolders from S3

我们正在使用具有建议的 S3AFileSystem 配置的 Flink 1.2.0。当源是 S3 存储桶中的单个文件夹时,简单的流式处理作业会按预期工作。

作业运行没有错误--但产生输出--当它的源是一个本身包含子文件夹的文件夹时。

为清楚起见,下面是 S3 存储桶的模型。 运行 指向 s3a://bucket/folder/2017/04/25/01/ 的作业会正确读取所有三个对象以及存储桶中出现的任何后续对象。将作业指向 s3a://bucket/folder/2017/(或任何其他中间文件夹)会导致作业在不产生任何内容的情况下运行。

在绝望中,我们尝试了 [in|ex] 包含尾随 /.

的排列
.
`-- folder
    `-- 2017
        `-- 04
            |-- 25
            |   |-- 00
            |   |   |-- a.txt
            |   |   `-- b.txt
            |   `-- 01
            |       |-- c.txt
            |       |-- d.txt
            |       `-- e.txt
            `-- 26

职位代码:

def main(args: Array[String]) {

  val parameters = ParameterTool.fromArgs(args)
  val bucket = parameters.get("bucket")
  val folder = parameters.get("folder")

  val path = s"s3a://$bucket/$folder"

  val env = StreamExecutionEnvironment.getExecutionEnvironment

  val lines: DataStream[String] = env.readFile(
    inputFormat = new TextInputFormat(new Path(path)),
    filePath = path,
    watchType = FileProcessingMode.PROCESS_CONTINUOUSLY,
    interval = Time.seconds(10).toMilliseconds)

  lines.print()
  env.execute("Flink Streaming Scala API Skeleton")
}

core-site.xml 根据文档配置:

<configuration>
  <property>
    <name>fs.s3a.impl</name>
    <value>org.apache.hadoop.fs.s3a.S3AFileSystem</value>
  </property>
  <property>
    <name>fs.s3a.buffer.dir</name>
    <value>/tmp</value>
  </property>
</configuration>

我们已经包含了此处列出的 S3AFileSystem 的所有 jar:https://ci.apache.org/projects/flink/flink-docs-release-1.2/setup/aws.html#flink-for-hadoop-27

我们被难住了。这似乎应该有效;互联网上有大量面包屑表明此 确实 有效。 [例如,http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Reading-files-from-an-S3-folder-td10281.html]

松鼠们,帮帮我……你们是我唯一的希望!

这个下是什么版本的Hadoop?

如果 Hadoop 2.8 已经停止,可能是回归,这可能是我的错。首先在 FLINK 下提交一个 JIRA @ issues.apache.org,然后,如果它是 2.8.0 link 中的新版本,则它被 HADOOP-13208

破坏了

此处的代码片段是一个很好的示例,可用于回归测试,是时候为 Flink 做一些了。

这个巨大的 listFiles() 更改将路径下的文件枚举从递归树状结构移动到路径下所有子条目的一系列平面列表:它对其他所有内容(distcp、测试、 hive, spark) 并且自 2016 年 12 月以来一直在运送产品;如果这是原因,我会感到有些惊讶,但不否认责备。抱歉

在上面 Steve Loughran 的帮助下回答我自己的问题。

在 Flink 中,默认使用 file-based data source to process continuously, FileInputFormat does not enumerate nested files

无论来源是 S3 还是其他任何来源,都是如此。

你必须这样设置:

def main(args: Array[String]) {

  val parameters = ParameterTool.fromArgs(args)
  val bucket = parameters.get("bucket")
  val folder = parameters.get("folder")

  val path = s"s3a://$bucket/$folder"

  val env = StreamExecutionEnvironment.getExecutionEnvironment

  val textInputFormat = new TextInputFormat(new Path(path))

  //this is important!
  textInputFormat.setNestedFileEnumeration(true)

  val lines: DataStream[String] = env.readFile(
    inputFormat = textInputFormat,
    filePath = path,
    watchType = FileProcessingMode.PROCESS_CONTINUOUSLY,
    interval = Time.seconds(10).toMilliseconds)

  lines.print()
  env.execute("Flink Streaming Scala API Skeleton")

}

与 flink 1.7.x 版本一样,Flink 提供了两个文件系统来与 Amazon S3 通信,flink-s3-fs-prestoflink-s3-fs-hadoopflink-s3-fs-hadoopflink-s3-fs-presto 都使用 s3:// 方案为 URI 注册默认文件系统包装器,flink-s3-fs-hadoop 也为 s3a:// 注册,flink-s3-fs-presto 也为 s3p://, 所以你可以同时使用这两个。

示例代码:

//Reading Data from S3
// This will print all the contents in the bucket line wise
final Path directory = new Path("s3a://husnain28may2020/");
final FileSystem fs = directory.getFileSystem();

//using input format
org.apache.flink.api.java.io.TextInputFormat textInputFormatS3 = new org.apache.flink.api.java.io.TextInputFormat(directory);
DataSet<String> linesS3 = env.createInput(textInputFormatS3);
linesS3.print();