使用 Scala 列出特定深度的给定 HDFS 路径的所有子目录?
List all sub-directories for a given HDFS path up to a certain depth using Scala?
我有各种 Spark 项目,它们以几种分区格式将数据写入 HDFS。
示例:
格式 1:
/tmp/myProject1/dir1/dir2/parquet/first_id=1234/dir3=x/dir4=y/second_id=4567
格式 2:
/tmp/myProject2/dir1/dir2/parquet/first_id=3212/second_id=9129
格式 3:
/tmp/myProject3/dir1/dir2/parquet/first_id=9912/dir3=x/second_id=1129
我的问题给出了一个基本路径,它是 /tmp/<myProject>/dir1/dir2/parquet
动态构建这些路径最简单的方法是 second_id?
注意:我不想使用通配符,而是希望在给定任何基本路径的情况下,动态获取所有这些路径的列表,直到 second_id
。在给定任何基本路径作为参数的情况下,我无法找出一种足够灵活的方法来创建最多 second_id
的此类路径列表。
到目前为止我尝试过的是:
val fs = FileSystem.get(new Configuration())
val status = fs.listStatus(new Path("/tmp/myProject1/dir1/dir2/parquet/first_id=1234/"))
status.foreach(x=> println(x.getPath))
这只是打印到第 1 级:
/tmp/myProject1/dir1/dir2/parquet/first_id=1234/dir3=x
/tmp/myProject1/dir1/dir2/parquet/first_id=1234/dir3=a
/tmp/myProject1/dir1/dir2/parquet/first_id=1234/dir3=c
相反,我希望它列出最多 second_id 的所有文件,例如:
/tmp/myProject1/dir1/dir2/parquet/first_id=1234/dir3=x/dir4=y/second_id=4567
/tmp/myProject1/dir1/dir2/parquet/first_id=1234/dir3=a/dir4=z/second_id=1231
/tmp/myProject1/dir1/dir2/parquet/first_id=1234/dir3=c/dir4=k/second_id=4123
同样,对于其他两种格式,它应该列出最多 second_id 的所有文件。有没有可能的解决方案?我对 HDFS 和 Scala 很陌生。
Given a base path which is /tmp/<myProject>/dir1/dir2/parquet
what would be the easiest way to dynamically build these paths up to second_id
?
Hadoop FS 中没有这样的选项API。对于文件,您可以使用 listFiles
方法递归列出文件,但您无法控制最大深度。
对于目录,您可以使用像这样的自定义递归函数来实现:
import org.apache.hadoop.fs._
def listDirectories(baseFolder: Path, depth: Integer = 0, maxDepth: Integer = -1): Seq[Path] = {
val fs = FileSystem.get(sc.hadoopConfiguration)
val ls = fs.listStatus(baseFolder)
ls.filter(_.isDir).flatMap { s =>
maxDepth match {
case m if (m == -1 || depth < m) => listDirectories(s.getPath, depth + 1, maxDepth)
case _ => Seq(s.getPath)
}
}
}
将其用于您的示例:
val baseFolder = new Path("/tmp/myProject1/dir1/dir2/parquet/first_id=1234/")
// listing all subDirectories up to second_id
val subDirectories = listDirectories(baseFolder, maxDepth=-1)
// listing all subDirectories up to dir4
val subDirectories = listDirectories(baseFolder, maxDepth=1)
我有各种 Spark 项目,它们以几种分区格式将数据写入 HDFS。 示例:
格式 1:
/tmp/myProject1/dir1/dir2/parquet/first_id=1234/dir3=x/dir4=y/second_id=4567
格式 2:
/tmp/myProject2/dir1/dir2/parquet/first_id=3212/second_id=9129
格式 3:
/tmp/myProject3/dir1/dir2/parquet/first_id=9912/dir3=x/second_id=1129
我的问题给出了一个基本路径,它是 /tmp/<myProject>/dir1/dir2/parquet
动态构建这些路径最简单的方法是 second_id?
注意:我不想使用通配符,而是希望在给定任何基本路径的情况下,动态获取所有这些路径的列表,直到 second_id
。在给定任何基本路径作为参数的情况下,我无法找出一种足够灵活的方法来创建最多 second_id
的此类路径列表。
到目前为止我尝试过的是:
val fs = FileSystem.get(new Configuration())
val status = fs.listStatus(new Path("/tmp/myProject1/dir1/dir2/parquet/first_id=1234/"))
status.foreach(x=> println(x.getPath))
这只是打印到第 1 级:
/tmp/myProject1/dir1/dir2/parquet/first_id=1234/dir3=x
/tmp/myProject1/dir1/dir2/parquet/first_id=1234/dir3=a
/tmp/myProject1/dir1/dir2/parquet/first_id=1234/dir3=c
相反,我希望它列出最多 second_id 的所有文件,例如:
/tmp/myProject1/dir1/dir2/parquet/first_id=1234/dir3=x/dir4=y/second_id=4567
/tmp/myProject1/dir1/dir2/parquet/first_id=1234/dir3=a/dir4=z/second_id=1231
/tmp/myProject1/dir1/dir2/parquet/first_id=1234/dir3=c/dir4=k/second_id=4123
同样,对于其他两种格式,它应该列出最多 second_id 的所有文件。有没有可能的解决方案?我对 HDFS 和 Scala 很陌生。
Given a base path which is
/tmp/<myProject>/dir1/dir2/parquet
what would be the easiest way to dynamically build these paths up tosecond_id
?
Hadoop FS 中没有这样的选项API。对于文件,您可以使用 listFiles
方法递归列出文件,但您无法控制最大深度。
对于目录,您可以使用像这样的自定义递归函数来实现:
import org.apache.hadoop.fs._
def listDirectories(baseFolder: Path, depth: Integer = 0, maxDepth: Integer = -1): Seq[Path] = {
val fs = FileSystem.get(sc.hadoopConfiguration)
val ls = fs.listStatus(baseFolder)
ls.filter(_.isDir).flatMap { s =>
maxDepth match {
case m if (m == -1 || depth < m) => listDirectories(s.getPath, depth + 1, maxDepth)
case _ => Seq(s.getPath)
}
}
}
将其用于您的示例:
val baseFolder = new Path("/tmp/myProject1/dir1/dir2/parquet/first_id=1234/")
// listing all subDirectories up to second_id
val subDirectories = listDirectories(baseFolder, maxDepth=-1)
// listing all subDirectories up to dir4
val subDirectories = listDirectories(baseFolder, maxDepth=1)