从 HDFS 检索数据时如何获取文件元数据?
How to get files metadata, when retrieving data from HDFS?
我从 HDFS 请求数据,我想获取从中读取数据的文件的元数据。
这将使我能够根据给定时刻的可用数据构建看起来像的报告。
我找到了使用 org.apache.hadoop.fs.FileSystem
获取所有文件列表的解决方案。
我知道分区规则,我可以根据收到的列表构建映射 row -> meta
。
但这个决定似乎很难执行和支持。也许有更简单的方法可以达到相同的结果?
我可以猜想调试代码:
hdfs debug computeMeta -block <block-file> -out <output-metadata-file>
你可以找到你的命令,请访问https://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-hdfs/HDFSCommands.html#verifyMeta
最简单的方法是使用 spark udf input_file_name
。
import scala.collection.mutable.Map
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
val df = spark.read.text("<path>").withColumn("input_file_name", input_file_name()).repartition($"input_file_name")
def getMetadata(rdd: Iterator[Row]) = {
val map = Map[String, Long]()
val fs = FileSystem.get(new Configuration())
rdd.map(row => {
val path = row.getString(row.size -1)
if(! map.contains(path)){
map.put(path,fs.listStatus(new Path(path))(0).getModificationTime())
}
Row.fromSeq(row.toSeq ++ Array[Any](map(path)))
})
}
spark.createDataFrame(df.rdd.mapPartitions(getMetadata),df.schema.add("modified_ts", LongType)).show(10000,false)
此处 modified_ts
是文件的 mtime
。
根据数据的大小,您也可以使用 join 来完成。逻辑将类似于:
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.spark.sql.functions._
val mtime =(path:String)=> FileSystem.get(new Configuration()).listStatus(new Path(path)).head.getModificationTime
val mtimeUDF = udf(mtime)
val df = spark.read.text("<path>").withColumn("input_file_name", input_file_name())
val metadata_df = df.select($"input_file_name").distinct().withColumn("mtime", mtimeUDF($"input_file_name"))
val rows_with_metadata = df.join(metadata_df , "input_file_name")
rows_with_metadata.show(false)
我已经创建了一个小的辅助方法metadata
,你可以像df.metadata
一样直接调用DataFrame对象,它会在可用的元数据上创建DataFrame并返回return DataFrame。
最终 DataFrame 中的元列
- 路径
- 是目录
- length -- 将以人类可读的格式显示 47 字节
- 复制
- blockSize -- 将以人类可读的格式显示 47 字节
- modificationTime -- 这将从 unix 时间转换为正常日期时间。
- 访问时间
- 所有者
- 组
- 权限
- 是符号链接
scala> :paste
// Entering paste mode (ctrl-D to finish)
import org.joda.time.DateTime
import org.apache.commons.io.FileUtils
import org.apache.spark.sql.DataFrame
import org.apache.hadoop.fs.{FileSystem, LocatedFileStatus, Path, RemoteIterator}
// Storing Metadata
case class FileMetaData(path: String,
isDirectory:Boolean,
length:String,
replication:Int,
blockSize:String,
modificationTime: String,
accessTime:String ,
owner:String ,
group:String ,
permission:String,
isSymlink:Boolean)
object FileMetaData {
def apply(lfs: LocatedFileStatus):FileMetaData = {
new FileMetaData(
path= lfs.getPath.toString,
isDirectory=lfs.isDirectory,
length=FileUtils.byteCountToDisplaySize(lfs.getLen),
replication=lfs.getReplication,
blockSize=FileUtils.byteCountToDisplaySize(lfs.getBlockSize),
modificationTime=new DateTime(lfs.getModificationTime).toString,
accessTime=new DateTime(lfs.getAccessTime).toString ,
owner=lfs.getOwner ,
group=lfs.getGroup ,
permission=lfs.getPermission.toString,
isSymlink=lfs.isSymlink
)
}
}
// Convert RemoteIterator to Scala Iterator.
implicit def convertToScalaIterator[T](remoteIterator: RemoteIterator[T]): Iterator[T] = {
case class wrapper(remoteIterator: RemoteIterator[T]) extends Iterator[T] {
override def hasNext: Boolean = remoteIterator.hasNext
override def next(): T = remoteIterator.next()
}
wrapper(remoteIterator)
}
// Using this we can call metadata method on df - like df.metadata.
implicit class MetaData(df: DataFrame) {
def metadata = {
import df.sparkSession.implicits._
df.inputFiles.map(new Path(_))
.flatMap{
FileSystem
.get(df.sparkSession.sparkContext.hadoopConfiguration)
.listLocatedStatus(_)
.toList
}
.map(FileMetaData(_)).toList.toDF
}
}
// Exiting paste mode, now interpreting.
warning: there was one feature warning; re-run with -feature for details
import org.joda.time.DateTime
import org.apache.commons.io.FileUtils
import org.apache.spark.sql.DataFrame
import org.apache.hadoop.fs.{FileSystem, LocatedFileStatus, Path, RemoteIterator}
defined class FileMetaData
defined object FileMetaData
convertToScalaIterator: [T](remoteIterator: org.apache.hadoop.fs.RemoteIterator[T])Iterator[T]
defined class MetaData
scala> val df = spark.read.format("json").load("/tmp/data")
df: org.apache.spark.sql.DataFrame = [json_data: struct<value: string>]
scala> df.show(false)
+------------------+
|json_data |
+------------------+
|[{"a":1} ,{"b":2}]|
|[{"a":1} ,{"b":2}]|
|[{"a":1} ,{"b":2}]|
+------------------+
scala>
DataFrame 元数据输出
scala> df.metadata.show(false)
+-------------------------+-----------+--------+-----------+---------+-----------------------------+-----------------------------+--------+-----+----------+---------+
|path |isDirectory|length |replication|blockSize|modificationTime |accessTime |owner |group|permission|isSymlink|
+-------------------------+-----------+--------+-----------+---------+-----------------------------+-----------------------------+--------+-----+----------+---------+
|file:/tmp/data/fileB.json|false |47 bytes|1 |32 MB |2020-04-25T13:47:00.000+05:30|1970-01-01T05:30:00.000+05:30|srinivas|wheel|rw-r--r-- |false |
|file:/tmp/data/fileC.json|false |47 bytes|1 |32 MB |2020-04-25T13:47:10.000+05:30|1970-01-01T05:30:00.000+05:30|srinivas|wheel|rw-r--r-- |false |
|file:/tmp/data/fileA.json|false |47 bytes|1 |32 MB |2020-04-25T11:35:12.000+05:30|1970-01-01T05:30:00.000+05:30|srinivas|wheel|rw-r--r-- |false |
+-------------------------+-----------+--------+-----------+---------+-----------------------------+-----------------------------+--------+-----+----------+---------+
我从 HDFS 请求数据,我想获取从中读取数据的文件的元数据。 这将使我能够根据给定时刻的可用数据构建看起来像的报告。
我找到了使用 org.apache.hadoop.fs.FileSystem
获取所有文件列表的解决方案。
我知道分区规则,我可以根据收到的列表构建映射 row -> meta
。
但这个决定似乎很难执行和支持。也许有更简单的方法可以达到相同的结果?
我可以猜想调试代码:
hdfs debug computeMeta -block <block-file> -out <output-metadata-file>
你可以找到你的命令,请访问https://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-hdfs/HDFSCommands.html#verifyMeta
最简单的方法是使用 spark udf input_file_name
。
import scala.collection.mutable.Map
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
val df = spark.read.text("<path>").withColumn("input_file_name", input_file_name()).repartition($"input_file_name")
def getMetadata(rdd: Iterator[Row]) = {
val map = Map[String, Long]()
val fs = FileSystem.get(new Configuration())
rdd.map(row => {
val path = row.getString(row.size -1)
if(! map.contains(path)){
map.put(path,fs.listStatus(new Path(path))(0).getModificationTime())
}
Row.fromSeq(row.toSeq ++ Array[Any](map(path)))
})
}
spark.createDataFrame(df.rdd.mapPartitions(getMetadata),df.schema.add("modified_ts", LongType)).show(10000,false)
此处 modified_ts
是文件的 mtime
。
根据数据的大小,您也可以使用 join 来完成。逻辑将类似于:
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.spark.sql.functions._
val mtime =(path:String)=> FileSystem.get(new Configuration()).listStatus(new Path(path)).head.getModificationTime
val mtimeUDF = udf(mtime)
val df = spark.read.text("<path>").withColumn("input_file_name", input_file_name())
val metadata_df = df.select($"input_file_name").distinct().withColumn("mtime", mtimeUDF($"input_file_name"))
val rows_with_metadata = df.join(metadata_df , "input_file_name")
rows_with_metadata.show(false)
我已经创建了一个小的辅助方法metadata
,你可以像df.metadata
一样直接调用DataFrame对象,它会在可用的元数据上创建DataFrame并返回return DataFrame。
最终 DataFrame 中的元列
- 路径
- 是目录
- length -- 将以人类可读的格式显示 47 字节
- 复制
- blockSize -- 将以人类可读的格式显示 47 字节
- modificationTime -- 这将从 unix 时间转换为正常日期时间。
- 访问时间
- 所有者
- 组
- 权限
- 是符号链接
scala> :paste
// Entering paste mode (ctrl-D to finish)
import org.joda.time.DateTime
import org.apache.commons.io.FileUtils
import org.apache.spark.sql.DataFrame
import org.apache.hadoop.fs.{FileSystem, LocatedFileStatus, Path, RemoteIterator}
// Storing Metadata
case class FileMetaData(path: String,
isDirectory:Boolean,
length:String,
replication:Int,
blockSize:String,
modificationTime: String,
accessTime:String ,
owner:String ,
group:String ,
permission:String,
isSymlink:Boolean)
object FileMetaData {
def apply(lfs: LocatedFileStatus):FileMetaData = {
new FileMetaData(
path= lfs.getPath.toString,
isDirectory=lfs.isDirectory,
length=FileUtils.byteCountToDisplaySize(lfs.getLen),
replication=lfs.getReplication,
blockSize=FileUtils.byteCountToDisplaySize(lfs.getBlockSize),
modificationTime=new DateTime(lfs.getModificationTime).toString,
accessTime=new DateTime(lfs.getAccessTime).toString ,
owner=lfs.getOwner ,
group=lfs.getGroup ,
permission=lfs.getPermission.toString,
isSymlink=lfs.isSymlink
)
}
}
// Convert RemoteIterator to Scala Iterator.
implicit def convertToScalaIterator[T](remoteIterator: RemoteIterator[T]): Iterator[T] = {
case class wrapper(remoteIterator: RemoteIterator[T]) extends Iterator[T] {
override def hasNext: Boolean = remoteIterator.hasNext
override def next(): T = remoteIterator.next()
}
wrapper(remoteIterator)
}
// Using this we can call metadata method on df - like df.metadata.
implicit class MetaData(df: DataFrame) {
def metadata = {
import df.sparkSession.implicits._
df.inputFiles.map(new Path(_))
.flatMap{
FileSystem
.get(df.sparkSession.sparkContext.hadoopConfiguration)
.listLocatedStatus(_)
.toList
}
.map(FileMetaData(_)).toList.toDF
}
}
// Exiting paste mode, now interpreting.
warning: there was one feature warning; re-run with -feature for details
import org.joda.time.DateTime
import org.apache.commons.io.FileUtils
import org.apache.spark.sql.DataFrame
import org.apache.hadoop.fs.{FileSystem, LocatedFileStatus, Path, RemoteIterator}
defined class FileMetaData
defined object FileMetaData
convertToScalaIterator: [T](remoteIterator: org.apache.hadoop.fs.RemoteIterator[T])Iterator[T]
defined class MetaData
scala> val df = spark.read.format("json").load("/tmp/data")
df: org.apache.spark.sql.DataFrame = [json_data: struct<value: string>]
scala> df.show(false)
+------------------+
|json_data |
+------------------+
|[{"a":1} ,{"b":2}]|
|[{"a":1} ,{"b":2}]|
|[{"a":1} ,{"b":2}]|
+------------------+
scala>
DataFrame 元数据输出
scala> df.metadata.show(false)
+-------------------------+-----------+--------+-----------+---------+-----------------------------+-----------------------------+--------+-----+----------+---------+
|path |isDirectory|length |replication|blockSize|modificationTime |accessTime |owner |group|permission|isSymlink|
+-------------------------+-----------+--------+-----------+---------+-----------------------------+-----------------------------+--------+-----+----------+---------+
|file:/tmp/data/fileB.json|false |47 bytes|1 |32 MB |2020-04-25T13:47:00.000+05:30|1970-01-01T05:30:00.000+05:30|srinivas|wheel|rw-r--r-- |false |
|file:/tmp/data/fileC.json|false |47 bytes|1 |32 MB |2020-04-25T13:47:10.000+05:30|1970-01-01T05:30:00.000+05:30|srinivas|wheel|rw-r--r-- |false |
|file:/tmp/data/fileA.json|false |47 bytes|1 |32 MB |2020-04-25T11:35:12.000+05:30|1970-01-01T05:30:00.000+05:30|srinivas|wheel|rw-r--r-- |false |
+-------------------------+-----------+--------+-----------+---------+-----------------------------+-----------------------------+--------+-----+----------+---------+