获取 HDFS 中镶木地板文件的大小,以便在 Scala 中使用 Spark 进行重新分区
get size of parquet file in HDFS for repartition with Spark in Scala
我在 HDFS 上有许多 parquet 文件目录,每个目录包含几千个小的(大多数 < 100kb)parquet 文件。它们减慢了我的 Spark 作业速度,所以我想将它们结合起来。
使用以下代码,我可以将本地 parquet 文件重新分区为更少的部分:
val pqFile = sqlContext.read.parquet("file:/home/hadoop/data/file.parquet")
pqFile.coalesce(4).write.save("file:/home/hadoop/data/fileSmaller.parquet")
但我不知道如何通过 Scala 代码以编程方式获取 HDFS 上目录的大小,因此我无法真正计算出要传递给 coalesce
函数的分区数数据集.
我该怎么做?或者在 Spark 中是否有一种方便的方法,以便我可以配置编写器来写入固定大小的镶木地板分区?
你可以试试
pqFile.inputFiles.size
根据文档returns "a best-effort snapshot of the files that compose this DataFrame"。
作为替代方案,直接在 HDFS 级别:
val hdfs: org.apache.hadoop.fs.FileSystem =
org.apache.hadoop.fs.FileSystem.get(
new org.apache.hadoop.conf.Configuration())
val hadoopPath= new org.apache.hadoop.fs.Path("hdfs://localhost:9000/tmp")
val recursive = false
val ri = hdfs.listFiles(hadoopPath, recursive)
val it = new Iterator[org.apache.hadoop.fs.LocatedFileStatus]() {
override def hasNext = ri.hasNext
override def next() = ri.next()
}
// Materialize iterator
val files = it.toList
println(files.size)
println(files.map(_.getLen).sum)
这样你也可以获得文件大小。
我在 HDFS 上有许多 parquet 文件目录,每个目录包含几千个小的(大多数 < 100kb)parquet 文件。它们减慢了我的 Spark 作业速度,所以我想将它们结合起来。
使用以下代码,我可以将本地 parquet 文件重新分区为更少的部分:
val pqFile = sqlContext.read.parquet("file:/home/hadoop/data/file.parquet")
pqFile.coalesce(4).write.save("file:/home/hadoop/data/fileSmaller.parquet")
但我不知道如何通过 Scala 代码以编程方式获取 HDFS 上目录的大小,因此我无法真正计算出要传递给 coalesce
函数的分区数数据集.
我该怎么做?或者在 Spark 中是否有一种方便的方法,以便我可以配置编写器来写入固定大小的镶木地板分区?
你可以试试
pqFile.inputFiles.size
根据文档returns "a best-effort snapshot of the files that compose this DataFrame"。
作为替代方案,直接在 HDFS 级别:
val hdfs: org.apache.hadoop.fs.FileSystem =
org.apache.hadoop.fs.FileSystem.get(
new org.apache.hadoop.conf.Configuration())
val hadoopPath= new org.apache.hadoop.fs.Path("hdfs://localhost:9000/tmp")
val recursive = false
val ri = hdfs.listFiles(hadoopPath, recursive)
val it = new Iterator[org.apache.hadoop.fs.LocatedFileStatus]() {
override def hasNext = ri.hasNext
override def next() = ri.next()
}
// Materialize iterator
val files = it.toList
println(files.size)
println(files.map(_.getLen).sum)
这样你也可以获得文件大小。