在包含子文件夹的文件夹顶部分区 Table,其中包含 spark 中的 json 个文件

Partition Table on top of folders containing sub-folders which contains json files in spark

我正在 Databricks 中研究 spark。我的存储位置有一个指向我的目录的挂载点。我们将该目录称为“/mnt/abc1/abc2”——路径。 在这个“abc2”目录中,假设我有 10 个名为“xyz1”..“xyz10”的文件夹。所有这些“xyz%”文件夹都包含 json 个文件,我们称它们为“xyz1_1.json”,依此类推。 我需要构建一个 table 以便我可以将我的 json 访问到 spark table 中,方法是将其引用为路径 + "abc2.xyz1.xyz1_1.json"

var path = "/mnt/abc1/"
var data = spark.read.json(path)

当 json 文件直接位于路径下方而不是我们路径中的文件夹内时,此方法有效。 我想找出一种方法可以自动检测包含 json 的底层文件夹和子文件夹,并在其上构建 table。

对于 spark 3+,您可以将选项 recursiveFileLookup 添加为 true 以搜索子目录

var path = "/mnt/abc1/"
var data = spark.read.option("recursiveFileLookup","true").json(path)

试试下面的代码。

import org.apache.hadoop.fs.{FileSystem, Path, RemoteIterator}
import org.apache.spark.sql.SparkSession
import scala.util.{Failure, Success, Try}

case class Hdfs(fs: FileSystem) {
  implicit def convertToScalaIterator[T](remoteIterator: RemoteIterator[T]): Iterator[T] = {
    case class wrapper(remoteIterator: RemoteIterator[T]) extends Iterator[T] {
      override def hasNext: Boolean = remoteIterator.hasNext
      override def next(): T = remoteIterator.next()
    }
    wrapper(remoteIterator)
  }
  def listFiles(path: String): List[String] = {
    Try(
      fs
        .listFiles(new Path(path), true)
        .toList
        .map(_.getPath)
        .filter(!_.toString.contains("_spark_metadata"))
        .map(_.toString)
    ) match {
      case Success(files) => files
      case Failure(ex) => Nil
    }
  }
}

使用 spark 会话获取 hdfs 对象。

val hdfs = Hdfs(FileSystem.get(spark.sparkContext.hadoopConfiguration))

使用 listFiles 函数递归获取文件列表。

val files = hdfs.listFiles("/mnt/abc1/")

检查 hdfs 路径中的文件是否可用。

if(!files.isEmpty) val data = spark.read.json(files:_*)