从 apache spark 中的文本文件查找存储在 rdd 中的数据大小
Find size of data stored in rdd from a text file in apache spark
我是 Apache Spark(版本 1.4.1)的新手。我写了一个小代码来读取文本文件并将其数据存储在 Rdd 中。
有什么方法可以获取 rdd 中数据的大小。
这是我的代码:
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.util.SizeEstimator
import org.apache.spark.sql.Row
object RddSize {
def main(args: Array[String]) {
val sc = new SparkContext("local", "data size")
val FILE_LOCATION = "src/main/resources/employees.csv"
val peopleRdd = sc.textFile(FILE_LOCATION)
val newRdd = peopleRdd.filter(str => str.contains(",M,"))
//Here I want to find whats the size remaining data
}
}
我想在过滤器转换 (peopleRdd) 之前和之后 (newRdd) 获取数据大小。
我不确定您是否需要这样做。您可以缓存 rdd 并检查 Spark UI 中的大小。但是假设您确实想以编程方式执行此操作,这里有一个解决方案。
def calcRDDSize(rdd: RDD[String]): Long = {
//map to the size of each string, UTF-8 is the default
rdd.map(_.getBytes("UTF-8").length.toLong)
.reduce(_+_) //add the sizes together
}
然后您可以为您的两个 RDD 调用此函数:
println(s"peopleRdd is [${calcRDDSize(peopleRdd)}] bytes in size")
println(s"newRdd is [${calcRDDSize(newRdd)}] bytes in size")
即使文件大小大于群集中的可用内存,此解决方案也应该有效。
Spark API 文档说:
- 您可以从 Spark 上下文中获取有关您的 RDD 的信息:
sc.getRDDStorageInfo
- RDD 信息包括内存和磁盘大小:RDDInfo doc
有多种获取RDD大小的方法
1.Add spark 上下文中的 spark 侦听器
SparkDriver.getContext.addSparkListener(new SparkListener() {
override def onStageCompleted(stageCompleted: SparkListenerStageCompleted) {
val map = stageCompleted.stageInfo.rddInfos
map.foreach(row => {
println("rdd memSize " + row.memSize)
println("rdd diskSize " + row.diskSize)
})
}})
2。将你的 rdd 保存为文本文件。
myRDD.saveAsTextFile("person.txt")
/applications/[app-id]/stages
3。你也可以试试 SizeEstimater
val rddSize = SizeEstimator.estimate(myRDD)
我是 Apache Spark(版本 1.4.1)的新手。我写了一个小代码来读取文本文件并将其数据存储在 Rdd 中。
有什么方法可以获取 rdd 中数据的大小。
这是我的代码:
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.util.SizeEstimator
import org.apache.spark.sql.Row
object RddSize {
def main(args: Array[String]) {
val sc = new SparkContext("local", "data size")
val FILE_LOCATION = "src/main/resources/employees.csv"
val peopleRdd = sc.textFile(FILE_LOCATION)
val newRdd = peopleRdd.filter(str => str.contains(",M,"))
//Here I want to find whats the size remaining data
}
}
我想在过滤器转换 (peopleRdd) 之前和之后 (newRdd) 获取数据大小。
我不确定您是否需要这样做。您可以缓存 rdd 并检查 Spark UI 中的大小。但是假设您确实想以编程方式执行此操作,这里有一个解决方案。
def calcRDDSize(rdd: RDD[String]): Long = {
//map to the size of each string, UTF-8 is the default
rdd.map(_.getBytes("UTF-8").length.toLong)
.reduce(_+_) //add the sizes together
}
然后您可以为您的两个 RDD 调用此函数:
println(s"peopleRdd is [${calcRDDSize(peopleRdd)}] bytes in size")
println(s"newRdd is [${calcRDDSize(newRdd)}] bytes in size")
即使文件大小大于群集中的可用内存,此解决方案也应该有效。
Spark API 文档说:
- 您可以从 Spark 上下文中获取有关您的 RDD 的信息:
sc.getRDDStorageInfo
- RDD 信息包括内存和磁盘大小:RDDInfo doc
有多种获取RDD大小的方法
1.Add spark 上下文中的 spark 侦听器
SparkDriver.getContext.addSparkListener(new SparkListener() {
override def onStageCompleted(stageCompleted: SparkListenerStageCompleted) {
val map = stageCompleted.stageInfo.rddInfos
map.foreach(row => {
println("rdd memSize " + row.memSize)
println("rdd diskSize " + row.diskSize)
})
}})
2。将你的 rdd 保存为文本文件。
myRDD.saveAsTextFile("person.txt")
/applications/[app-id]/stages
3。你也可以试试 SizeEstimater
val rddSize = SizeEstimator.estimate(myRDD)