从 HDFS 检索数据时如何获取文件元数据?

How to get files metadata, when retrieving data from HDFS?

我从 HDFS 请求数据,我想获取从中读取数据的文件的元数据。 这将使我能够根据给定时刻的可用数据构建看起来像的报告。

我找到了使用 org.apache.hadoop.fs.FileSystem 获取所有文件列表的解决方案。 我知道分区规则,我可以根据收到的列表构建映射 row -> meta

但这个决定似乎很难执行和支持。也许有更简单的方法可以达到相同的结果?

我可以猜想调试代码:

hdfs debug computeMeta -block <block-file> -out <output-metadata-file>

你可以找到你的命令,请访问https://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-hdfs/HDFSCommands.html#verifyMeta

最简单的方法是使用 spark udf input_file_name

import scala.collection.mutable.Map
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}

val df = spark.read.text("<path>").withColumn("input_file_name", input_file_name()).repartition($"input_file_name")

def getMetadata(rdd: Iterator[Row]) = {
    val map = Map[String, Long]()
    val fs = FileSystem.get(new Configuration())
    rdd.map(row => {
                val path = row.getString(row.size -1)
                if(! map.contains(path)){
                    map.put(path,fs.listStatus(new Path(path))(0).getModificationTime())
                }
                Row.fromSeq(row.toSeq ++ Array[Any](map(path)))
            })
}

spark.createDataFrame(df.rdd.mapPartitions(getMetadata),df.schema.add("modified_ts", LongType)).show(10000,false)

此处 modified_ts 是文件的 mtime

根据数据的大小,您也可以使用 join 来完成。逻辑将类似于:

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.spark.sql.functions._

val mtime =(path:String)=> FileSystem.get(new Configuration()).listStatus(new Path(path)).head.getModificationTime
val mtimeUDF = udf(mtime)

val df = spark.read.text("<path>").withColumn("input_file_name", input_file_name())

val metadata_df = df.select($"input_file_name").distinct().withColumn("mtime", mtimeUDF($"input_file_name"))

val rows_with_metadata = df.join(metadata_df , "input_file_name")
rows_with_metadata.show(false)

我已经创建了一个小的辅助方法metadata,你可以像df.metadata一样直接调用DataFrame对象,它会在可用的元数据上创建DataFrame并返回return DataFrame。

最终 DataFrame 中的元列

  • 路径
  • 是目录
  • length -- 将以人类可读的格式显示 47 字节
  • 复制
  • blockSize -- 将以人类可读的格式显示 47 字节
  • modificationTime -- 这将从 unix 时间转换为正常日期时间。
  • 访问时间
  • 所有者
  • 权限
  • 是符号链接

scala> :paste
// Entering paste mode (ctrl-D to finish)

  import org.joda.time.DateTime
  import org.apache.commons.io.FileUtils
  import org.apache.spark.sql.DataFrame
  import org.apache.hadoop.fs.{FileSystem, LocatedFileStatus, Path, RemoteIterator}

  // Storing Metadata
  case class FileMetaData(path: String,
                          isDirectory:Boolean,
                          length:String,
                          replication:Int,
                          blockSize:String,
                          modificationTime: String,
                          accessTime:String ,
                          owner:String ,
                          group:String ,
                          permission:String,
                          isSymlink:Boolean)

  object FileMetaData {

    def apply(lfs: LocatedFileStatus):FileMetaData = {        
      new FileMetaData(
        path= lfs.getPath.toString,
        isDirectory=lfs.isDirectory,
        length=FileUtils.byteCountToDisplaySize(lfs.getLen),
        replication=lfs.getReplication,
        blockSize=FileUtils.byteCountToDisplaySize(lfs.getBlockSize),
        modificationTime=new DateTime(lfs.getModificationTime).toString,
        accessTime=new DateTime(lfs.getAccessTime).toString ,
        owner=lfs.getOwner ,
        group=lfs.getGroup ,
        permission=lfs.getPermission.toString,
        isSymlink=lfs.isSymlink
      )
    }
  }

  // Convert RemoteIterator to Scala Iterator.
  implicit def convertToScalaIterator[T](remoteIterator: RemoteIterator[T]): Iterator[T] = {
    case class wrapper(remoteIterator: RemoteIterator[T]) extends Iterator[T] {
      override def hasNext: Boolean = remoteIterator.hasNext
      override def next(): T = remoteIterator.next()
    }
    wrapper(remoteIterator)
  }

  // Using this we can call metadata method on df - like df.metadata.
  implicit class MetaData(df: DataFrame) {
    def metadata =  {
      import df.sparkSession.implicits._
       df.inputFiles.map(new Path(_))
         .flatMap{
           FileSystem
             .get(df.sparkSession.sparkContext.hadoopConfiguration)
             .listLocatedStatus(_)
             .toList
       }
         .map(FileMetaData(_)).toList.toDF
     }
  }

// Exiting paste mode, now interpreting.

warning: there was one feature warning; re-run with -feature for details
import org.joda.time.DateTime
import org.apache.commons.io.FileUtils
import org.apache.spark.sql.DataFrame
import org.apache.hadoop.fs.{FileSystem, LocatedFileStatus, Path, RemoteIterator}
defined class FileMetaData
defined object FileMetaData
convertToScalaIterator: [T](remoteIterator: org.apache.hadoop.fs.RemoteIterator[T])Iterator[T]
defined class MetaData

scala> val df = spark.read.format("json").load("/tmp/data")
df: org.apache.spark.sql.DataFrame = [json_data: struct<value: string>]



scala> df.show(false)
+------------------+
|json_data         |
+------------------+
|[{"a":1} ,{"b":2}]|
|[{"a":1} ,{"b":2}]|
|[{"a":1} ,{"b":2}]|
+------------------+

scala>

DataFrame 元数据输出

scala> df.metadata.show(false)

+-------------------------+-----------+--------+-----------+---------+-----------------------------+-----------------------------+--------+-----+----------+---------+
|path                     |isDirectory|length  |replication|blockSize|modificationTime             |accessTime                   |owner   |group|permission|isSymlink|
+-------------------------+-----------+--------+-----------+---------+-----------------------------+-----------------------------+--------+-----+----------+---------+
|file:/tmp/data/fileB.json|false      |47 bytes|1          |32 MB    |2020-04-25T13:47:00.000+05:30|1970-01-01T05:30:00.000+05:30|srinivas|wheel|rw-r--r-- |false    |
|file:/tmp/data/fileC.json|false      |47 bytes|1          |32 MB    |2020-04-25T13:47:10.000+05:30|1970-01-01T05:30:00.000+05:30|srinivas|wheel|rw-r--r-- |false    |
|file:/tmp/data/fileA.json|false      |47 bytes|1          |32 MB    |2020-04-25T11:35:12.000+05:30|1970-01-01T05:30:00.000+05:30|srinivas|wheel|rw-r--r-- |false    |
+-------------------------+-----------+--------+-----------+---------+-----------------------------+-----------------------------+--------+-----+----------+---------+