从 apache spark 中的文本文件查找存储在 rdd 中的数据大小

Find size of data stored in rdd from a text file in apache spark

我是 Apache Spark(版本 1.4.1)的新手。我写了一个小代码来读取文本文件并将其数据存储在 Rdd 中。

有什么方法可以获取 rdd 中数据的大小。

这是我的代码:

import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.util.SizeEstimator
import org.apache.spark.sql.Row

object RddSize {

  def main(args: Array[String]) {

    val sc = new SparkContext("local", "data size")
    val FILE_LOCATION = "src/main/resources/employees.csv"
    val peopleRdd = sc.textFile(FILE_LOCATION)

    val newRdd = peopleRdd.filter(str => str.contains(",M,"))
    //Here I want to find whats the size remaining data
  }
} 

我想在过滤器转换 (peopleRdd) 之前和之后 (newRdd) 获取数据大小。

我不确定您是否需要这样做。您可以缓存 rdd 并检查 Spark UI 中的大小。但是假设您确实想以编程方式执行此操作,这里有一个解决方案。

    def calcRDDSize(rdd: RDD[String]): Long = {
        //map to the size of each string, UTF-8 is the default
        rdd.map(_.getBytes("UTF-8").length.toLong) 
           .reduce(_+_) //add the sizes together
    }

然后您可以为您的两个 RDD 调用此函数:

println(s"peopleRdd is [${calcRDDSize(peopleRdd)}] bytes in size")
println(s"newRdd is [${calcRDDSize(newRdd)}] bytes in size")

即使文件大小大于群集中的可用内存,此解决方案也应该有效。

Spark API 文档说:

  1. 您可以从 Spark 上下文中获取有关您的 RDD 的信息:sc.getRDDStorageInfo
  2. RDD 信息包括内存和磁盘大小:RDDInfo doc

有多种获取RDD大小的方法

1.Add spark 上下文中的 spark 侦听器

SparkDriver.getContext.addSparkListener(new SparkListener() {
override def onStageCompleted(stageCompleted: SparkListenerStageCompleted) {
  val map = stageCompleted.stageInfo.rddInfos
  map.foreach(row => {
      println("rdd memSize " + row.memSize)
      println("rdd diskSize " + row.diskSize)
   })
}})

2。将你的 rdd 保存为文本文件。

myRDD.saveAsTextFile("person.txt")

并调用 Apache Spark REST API.

/applications/[app-id]/stages

3。你也可以试试 SizeEstimater

val rddSize = SizeEstimator.estimate(myRDD)