用 Scala/Spark 列出目录中的文件(包括文件信息)
List files in directory (including file information) with Scala/Spark
我是 Scala/Spark 的新手,希望大家能帮助我。我想获取在 hdfs 目录中的某个时间戳后创建的文件,以便在 Zeppelin 中进行一些监控。因此我需要一个包含文件名、文件大小和修改日期的列。
我发现这对我来说很有效,可以获取我需要的所有信息:
val fs = FileSystem.get(new Configuration())
val dir: String = "some/hdfs/path"
val input_files = fs.listStatus(new Path(dir)).filter(_.getModificationTime> timeInEpoch)
结果我想在 spark 中创建一个 DataFrame,每个文件都有一行及其信息(或者至少是上面提到的信息)
val data = sc.parallelize(input_files)
val dfFromData2 = spark.createDataFrame(data).toDF()
如果我以这种方式尝试,我会得到以下响应:
309: error: overloaded method value createDataFrame with alternatives:
[A <: Product](data: Seq[A])(implicit evidence: reflect.runtime.universe.TypeTag[A])org.apache.spark.sql.DataFrame <and>
[A <: Product](rdd: org.apache.spark.rdd.RDD[A])(implicit evidence: reflect.runtime.universe.TypeTag[A])org.apache.spark.sql.DataFrame
cannot be applied to (org.apache.spark.rdd.RDD[org.apache.hadoop.fs.FileStatus])
val dfFromData2 = spark.createDataFrame(data).toDF()
希望你能帮帮我:)
问候
如错误消息所示,Hadoop FileStatus
类型不是 Product
的子类型,即元组。 Spark DataFrames 有自己的 SQL 风格的类型系统,不允许像 FileStatus
这样的任意复杂类型。同样,如果您尝试对您创建的 RDD 进行操作,您将收到类似的错误,因为 FileStatus
不可序列化。最好的办法是将所需的数据提取为元组或大小写 class 并从中创建一个 DataFrame:
case class FileInfo(name : String, modifiedTime : Long, size : Long)
val df = input_files.map{x =>
FileInfo(x.getPath.toString, x.getModificationTime, x.getLen)
}.toSeq.toDF()
我是 Scala/Spark 的新手,希望大家能帮助我。我想获取在 hdfs 目录中的某个时间戳后创建的文件,以便在 Zeppelin 中进行一些监控。因此我需要一个包含文件名、文件大小和修改日期的列。
我发现这对我来说很有效,可以获取我需要的所有信息:
val fs = FileSystem.get(new Configuration())
val dir: String = "some/hdfs/path"
val input_files = fs.listStatus(new Path(dir)).filter(_.getModificationTime> timeInEpoch)
结果我想在 spark 中创建一个 DataFrame,每个文件都有一行及其信息(或者至少是上面提到的信息)
val data = sc.parallelize(input_files)
val dfFromData2 = spark.createDataFrame(data).toDF()
如果我以这种方式尝试,我会得到以下响应:
309: error: overloaded method value createDataFrame with alternatives:
[A <: Product](data: Seq[A])(implicit evidence: reflect.runtime.universe.TypeTag[A])org.apache.spark.sql.DataFrame <and>
[A <: Product](rdd: org.apache.spark.rdd.RDD[A])(implicit evidence: reflect.runtime.universe.TypeTag[A])org.apache.spark.sql.DataFrame
cannot be applied to (org.apache.spark.rdd.RDD[org.apache.hadoop.fs.FileStatus])
val dfFromData2 = spark.createDataFrame(data).toDF()
希望你能帮帮我:)
问候
如错误消息所示,Hadoop FileStatus
类型不是 Product
的子类型,即元组。 Spark DataFrames 有自己的 SQL 风格的类型系统,不允许像 FileStatus
这样的任意复杂类型。同样,如果您尝试对您创建的 RDD 进行操作,您将收到类似的错误,因为 FileStatus
不可序列化。最好的办法是将所需的数据提取为元组或大小写 class 并从中创建一个 DataFrame:
case class FileInfo(name : String, modifiedTime : Long, size : Long)
val df = input_files.map{x =>
FileInfo(x.getPath.toString, x.getModificationTime, x.getLen)
}.toSeq.toDF()