在包含子文件夹的文件夹顶部分区 Table,其中包含 spark 中的 json 个文件
Partition Table on top of folders containing sub-folders which contains json files in spark
我正在 Databricks 中研究 spark。我的存储位置有一个指向我的目录的挂载点。我们将该目录称为“/mnt/abc1/abc2”——路径。
在这个“abc2”目录中,假设我有 10 个名为“xyz1”..“xyz10”的文件夹。所有这些“xyz%”文件夹都包含 json 个文件,我们称它们为“xyz1_1.json”,依此类推。
我需要构建一个 table 以便我可以将我的 json 访问到 spark table 中,方法是将其引用为路径 + "abc2.xyz1.xyz1_1.json"
var path = "/mnt/abc1/"
var data = spark.read.json(path)
当 json 文件直接位于路径下方而不是我们路径中的文件夹内时,此方法有效。
我想找出一种方法可以自动检测包含 json 的底层文件夹和子文件夹,并在其上构建 table。
对于 spark 3+,您可以将选项 recursiveFileLookup
添加为 true 以搜索子目录
var path = "/mnt/abc1/"
var data = spark.read.option("recursiveFileLookup","true").json(path)
试试下面的代码。
import org.apache.hadoop.fs.{FileSystem, Path, RemoteIterator}
import org.apache.spark.sql.SparkSession
import scala.util.{Failure, Success, Try}
case class Hdfs(fs: FileSystem) {
implicit def convertToScalaIterator[T](remoteIterator: RemoteIterator[T]): Iterator[T] = {
case class wrapper(remoteIterator: RemoteIterator[T]) extends Iterator[T] {
override def hasNext: Boolean = remoteIterator.hasNext
override def next(): T = remoteIterator.next()
}
wrapper(remoteIterator)
}
def listFiles(path: String): List[String] = {
Try(
fs
.listFiles(new Path(path), true)
.toList
.map(_.getPath)
.filter(!_.toString.contains("_spark_metadata"))
.map(_.toString)
) match {
case Success(files) => files
case Failure(ex) => Nil
}
}
}
使用 spark 会话获取 hdfs 对象。
val hdfs = Hdfs(FileSystem.get(spark.sparkContext.hadoopConfiguration))
使用 listFiles
函数递归获取文件列表。
val files = hdfs.listFiles("/mnt/abc1/")
检查 hdfs 路径中的文件是否可用。
if(!files.isEmpty) val data = spark.read.json(files:_*)
我正在 Databricks 中研究 spark。我的存储位置有一个指向我的目录的挂载点。我们将该目录称为“/mnt/abc1/abc2”——路径。 在这个“abc2”目录中,假设我有 10 个名为“xyz1”..“xyz10”的文件夹。所有这些“xyz%”文件夹都包含 json 个文件,我们称它们为“xyz1_1.json”,依此类推。 我需要构建一个 table 以便我可以将我的 json 访问到 spark table 中,方法是将其引用为路径 + "abc2.xyz1.xyz1_1.json"
var path = "/mnt/abc1/"
var data = spark.read.json(path)
当 json 文件直接位于路径下方而不是我们路径中的文件夹内时,此方法有效。 我想找出一种方法可以自动检测包含 json 的底层文件夹和子文件夹,并在其上构建 table。
对于 spark 3+,您可以将选项 recursiveFileLookup
添加为 true 以搜索子目录
var path = "/mnt/abc1/"
var data = spark.read.option("recursiveFileLookup","true").json(path)
试试下面的代码。
import org.apache.hadoop.fs.{FileSystem, Path, RemoteIterator}
import org.apache.spark.sql.SparkSession
import scala.util.{Failure, Success, Try}
case class Hdfs(fs: FileSystem) {
implicit def convertToScalaIterator[T](remoteIterator: RemoteIterator[T]): Iterator[T] = {
case class wrapper(remoteIterator: RemoteIterator[T]) extends Iterator[T] {
override def hasNext: Boolean = remoteIterator.hasNext
override def next(): T = remoteIterator.next()
}
wrapper(remoteIterator)
}
def listFiles(path: String): List[String] = {
Try(
fs
.listFiles(new Path(path), true)
.toList
.map(_.getPath)
.filter(!_.toString.contains("_spark_metadata"))
.map(_.toString)
) match {
case Success(files) => files
case Failure(ex) => Nil
}
}
}
使用 spark 会话获取 hdfs 对象。
val hdfs = Hdfs(FileSystem.get(spark.sparkContext.hadoopConfiguration))
使用 listFiles
函数递归获取文件列表。
val files = hdfs.listFiles("/mnt/abc1/")
检查 hdfs 路径中的文件是否可用。
if(!files.isEmpty) val data = spark.read.json(files:_*)