使用 Scala 列出特定深度的给定 HDFS 路径的所有子目录?

List all sub-directories for a given HDFS path up to a certain depth using Scala?

我有各种 Spark 项目,它们以几种分区格式将数据写入 HDFS。 示例:

格式 1:

/tmp/myProject1/dir1/dir2/parquet/first_id=1234/dir3=x/dir4=y/second_id=4567

格式 2:

/tmp/myProject2/dir1/dir2/parquet/first_id=3212/second_id=9129

格式 3:

/tmp/myProject3/dir1/dir2/parquet/first_id=9912/dir3=x/second_id=1129

我的问题给出了一个基本路径,它是 /tmp/<myProject>/dir1/dir2/parquet 动态构建这些路径最简单的方法是 second_id?

注意:我不想使用通配符,而是希望在给定任何基本路径的情况下,动态获取所有这些路径的列表,直到 second_id。在给定任何基本路径作为参数的情况下,我无法找出一种足够灵活的方法来创建最多 second_id 的此类路径列表。

到目前为止我尝试过的是:

val fs = FileSystem.get(new Configuration())
val status = fs.listStatus(new Path("/tmp/myProject1/dir1/dir2/parquet/first_id=1234/"))
status.foreach(x=> println(x.getPath))

这只是打印到第 1 级:

/tmp/myProject1/dir1/dir2/parquet/first_id=1234/dir3=x
/tmp/myProject1/dir1/dir2/parquet/first_id=1234/dir3=a
/tmp/myProject1/dir1/dir2/parquet/first_id=1234/dir3=c

相反,我希望它列出最多 second_id 的所有文件,例如:

/tmp/myProject1/dir1/dir2/parquet/first_id=1234/dir3=x/dir4=y/second_id=4567
/tmp/myProject1/dir1/dir2/parquet/first_id=1234/dir3=a/dir4=z/second_id=1231
/tmp/myProject1/dir1/dir2/parquet/first_id=1234/dir3=c/dir4=k/second_id=4123

同样,对于其他两种格式,它应该列出最多 second_id 的所有文件。有没有可能的解决方案?我对 HDFS 和 Scala 很陌生。

Given a base path which is /tmp/<myProject>/dir1/dir2/parquet what would be the easiest way to dynamically build these paths up to second_id?

Hadoop FS 中没有这样的选项API。对于文件,您可以使用 listFiles 方法递归列出文件,但您无法控制最大深度。

对于目录,您可以使用像这样的自定义递归函数来实现:

import org.apache.hadoop.fs._

def listDirectories(baseFolder: Path, depth: Integer = 0, maxDepth: Integer = -1): Seq[Path] = {
  val fs = FileSystem.get(sc.hadoopConfiguration)
  val ls = fs.listStatus(baseFolder)
  ls.filter(_.isDir).flatMap { s =>
    maxDepth match {
      case m if (m == -1 || depth < m) => listDirectories(s.getPath, depth + 1, maxDepth)
      case _ => Seq(s.getPath)

      }
    }
}

将其用于您的示例:

val baseFolder = new Path("/tmp/myProject1/dir1/dir2/parquet/first_id=1234/")

// listing all subDirectories up to second_id
val subDirectories = listDirectories(baseFolder, maxDepth=-1)

// listing all subDirectories up to dir4
val subDirectories = listDirectories(baseFolder, maxDepth=1)