获取保存 Parquet 文件的默认 HDFS 路径
Get default HDFS path that Parquet file is saved in
我 运行 一个最终保存 Parquet 文件的 spark 作业,作业成功完成。但是我只指定了文件名,并没有指定HDFS的路径。有没有办法打印出spark写入文件的默认HDFS路径?我查看了sc._conf.getAll()
,但那里似乎没有任何有用的东西。
AFAIK 这是其中一种方式(除了简单的命令方式是 hadoop fs -ls -R | grep -i yourfile
)....
下面是示例 scala 代码片段....(如果您想在 python 或 java 中执行此操作,您可以模拟相同的 api 调用)
获取镶木地板文件列表。并像下面那样过滤它们....
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileStatus, FileSystem, Path}
import org.apache.hadoop.io.{BytesWritable, Text}
import org.apache.spark.{SparkConf, SparkContext}
//other imports here
lazy val sparkConf = new SparkConf()
lazy val sc = SparkContext.getOrCreate(sparkConf)
lazy val fileSystem = FileSystem.get(sc.hadoopConfiguration)
val fileSystem = listChaildStatuses(fileSystem , new Path("yourbasepathofHDFS")) // normally hdfs://server/user like this...
val allparquet = fileSystem.filter(_.getPath.getName.endsWith(".parquet"))
// now you can print these parquet files out of which your files will be present and you can know the base path...
支持方法如下
/**
* Get [[org.apache.hadoop.fs.FileStatus]] objects for all Chaild children (files) under the given base path. If the
* given path points to a file, return a single-element collection containing [[org.apache.hadoop.fs.FileStatus]] of
* that file.
*/
def listChaildStatuses(fs: FileSystem, basePath: Path): Seq[FileStatus] = {
listChaildStatuses(fs, fs.getFileStatus(basePath))
}
/**
* Get [[FileStatus]] objects for all Chaild children (files) under the given base path. If the
* given path points to a file, return a single-element collection containing [[FileStatus]] of
* that file.
*/
def listChaildStatuses(fs: FileSystem, baseStatus: FileStatus): Seq[FileStatus] = {
def recurse(status: FileStatus): Seq[FileStatus] = {
val (directories, leaves) = fs.listStatus(status.getPath).partition(_.isDirectory)
leaves ++ directories.flatMap(f => listChaildStatuses(fs, f))
}
if (baseStatus.isDirectory) recurse(baseStatus) else Seq(baseStatus)
}
我 运行 一个最终保存 Parquet 文件的 spark 作业,作业成功完成。但是我只指定了文件名,并没有指定HDFS的路径。有没有办法打印出spark写入文件的默认HDFS路径?我查看了sc._conf.getAll()
,但那里似乎没有任何有用的东西。
AFAIK 这是其中一种方式(除了简单的命令方式是 hadoop fs -ls -R | grep -i yourfile
)....
下面是示例 scala 代码片段....(如果您想在 python 或 java 中执行此操作,您可以模拟相同的 api 调用) 获取镶木地板文件列表。并像下面那样过滤它们....
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileStatus, FileSystem, Path}
import org.apache.hadoop.io.{BytesWritable, Text}
import org.apache.spark.{SparkConf, SparkContext}
//other imports here
lazy val sparkConf = new SparkConf()
lazy val sc = SparkContext.getOrCreate(sparkConf)
lazy val fileSystem = FileSystem.get(sc.hadoopConfiguration)
val fileSystem = listChaildStatuses(fileSystem , new Path("yourbasepathofHDFS")) // normally hdfs://server/user like this...
val allparquet = fileSystem.filter(_.getPath.getName.endsWith(".parquet"))
// now you can print these parquet files out of which your files will be present and you can know the base path...
支持方法如下
/**
* Get [[org.apache.hadoop.fs.FileStatus]] objects for all Chaild children (files) under the given base path. If the
* given path points to a file, return a single-element collection containing [[org.apache.hadoop.fs.FileStatus]] of
* that file.
*/
def listChaildStatuses(fs: FileSystem, basePath: Path): Seq[FileStatus] = {
listChaildStatuses(fs, fs.getFileStatus(basePath))
}
/**
* Get [[FileStatus]] objects for all Chaild children (files) under the given base path. If the
* given path points to a file, return a single-element collection containing [[FileStatus]] of
* that file.
*/
def listChaildStatuses(fs: FileSystem, baseStatus: FileStatus): Seq[FileStatus] = {
def recurse(status: FileStatus): Seq[FileStatus] = {
val (directories, leaves) = fs.listStatus(status.getPath).partition(_.isDirectory)
leaves ++ directories.flatMap(f => listChaildStatuses(fs, f))
}
if (baseStatus.isDirectory) recurse(baseStatus) else Seq(baseStatus)
}