我怎样才能找到一个RDD的大小
How can I find the size of a RDD
我有 RDD[Row]
,需要将其保存到第三方存储库。
但是这个第三方存储库在一次调用中最多接受 5 MB。
所以我想根据 RDD 中存在的数据大小而不是 RDD 中存在的行数来创建分区。
如何找到 RDD
的大小并根据它创建分区?
这将取决于连载等因素,所以不是一成不变的。但是,您可以获取样本集并运行对该样本数据进行一些实验,从那里进行推断。
一个直接的方法是调用下面的方法,这取决于你是否想以序列化的形式存储你的数据,然后转到 spark UI "Storage" 页面,你应该可以计算出 RDD 的总大小(内存 + 磁盘):
rdd.persist(StorageLevel.MEMORY_AND_DISK)
or
rdd.persist(StorageLevel.MEMORY_AND_DISK_SER)
在运行时计算准确的内存大小并不容易。不过,您可以尝试在运行时进行估算:根据离线采样的数据大小,例如,X 行离线使用 Y GB,Z 行在运行时可能需要 Z*Y/X GB;这与 Justin 之前建议的类似。
希望这对您有所帮助。
正如 Justin 和 Wang 提到的那样,获取 RDD 的大小并不是直接的。我们可以做一个估计。
我们可以对一个RDD进行采样,然后使用SizeEstimator来获取样本的大小。
正如 Wang 和 Justin 提到的,
基于离线采样的数据大小,比如说,X 行离线使用了 Y GB,Z 行在运行时可能需要 Z*Y/X GB
这是获取 RDD 的 size/estimate 的示例 Scala 代码。
我是 scala 和 spark 的新手。下面的例子可能写得更好
def getTotalSize(rdd: RDD[Row]): Long = {
// This can be a parameter
val NO_OF_SAMPLE_ROWS = 10l;
val totalRows = rdd.count();
var totalSize = 0l
if (totalRows > NO_OF_SAMPLE_ROWS) {
val sampleRDD = rdd.sample(true, NO_OF_SAMPLE_ROWS)
val sampleRDDSize = getRDDSize(sampleRDD)
totalSize = sampleRDDSize.*(totalRows)./(NO_OF_SAMPLE_ROWS)
} else {
// As the RDD is smaller than sample rows count, we can just calculate the total RDD size
totalSize = getRDDSize(rdd)
}
totalSize
}
def getRDDSize(rdd: RDD[Row]) : Long = {
var rddSize = 0l
val rows = rdd.collect()
for (i <- 0 until rows.length) {
rddSize += SizeEstimator.estimate(rows.apply(i).toSeq.map { value => value.asInstanceOf[AnyRef] })
}
rddSize
}
我认为 RDD.count() 会给你 RDD
中元素的数量
如果您实际在集群上处理大数据,这是要使用的版本——即它消除了收集。
def calcRDDSize(rdd: RDD[Row]): Long = {
rdd.map(_.mkString(",").getBytes("UTF-8").length.toLong)
.reduce(_+_) //add the sizes together
}
def estimateRDDSize( rdd: RDD[Row], fraction: Double ) : Long = {
val sampleRDD = rdd.sample(true,fraction)
val sampleRDDsize = calcRDDSize(sampleRDD)
println(s"sampleRDDsize is ${sampleRDDsize/(1024*1024)} MB")
val sampleAvgRowSize = sampleRDDsize / sampleRDD.count()
println(s"sampleAvgRowSize is $sampleAvgRowSize")
val totalRows = rdd.count()
println(s"totalRows is $totalRows")
val estimatedTotalSize = totalRows * sampleAvgRowSize
val formatter = java.text.NumberFormat.getIntegerInstance
val estimateInMB = formatter.format(estimatedTotalSize/(1024*1024))
println(s"estimatedTotalSize is ${estimateInMB} MB")
return estimatedTotalSize
}
// estimate using 15% of data
val size = estimateRDDSize(df.rdd,0.15)
我有 RDD[Row]
,需要将其保存到第三方存储库。
但是这个第三方存储库在一次调用中最多接受 5 MB。
所以我想根据 RDD 中存在的数据大小而不是 RDD 中存在的行数来创建分区。
如何找到 RDD
的大小并根据它创建分区?
这将取决于连载等因素,所以不是一成不变的。但是,您可以获取样本集并运行对该样本数据进行一些实验,从那里进行推断。
一个直接的方法是调用下面的方法,这取决于你是否想以序列化的形式存储你的数据,然后转到 spark UI "Storage" 页面,你应该可以计算出 RDD 的总大小(内存 + 磁盘):
rdd.persist(StorageLevel.MEMORY_AND_DISK)
or
rdd.persist(StorageLevel.MEMORY_AND_DISK_SER)
在运行时计算准确的内存大小并不容易。不过,您可以尝试在运行时进行估算:根据离线采样的数据大小,例如,X 行离线使用 Y GB,Z 行在运行时可能需要 Z*Y/X GB;这与 Justin 之前建议的类似。
希望这对您有所帮助。
正如 Justin 和 Wang 提到的那样,获取 RDD 的大小并不是直接的。我们可以做一个估计。
我们可以对一个RDD进行采样,然后使用SizeEstimator来获取样本的大小。 正如 Wang 和 Justin 提到的, 基于离线采样的数据大小,比如说,X 行离线使用了 Y GB,Z 行在运行时可能需要 Z*Y/X GB
这是获取 RDD 的 size/estimate 的示例 Scala 代码。
我是 scala 和 spark 的新手。下面的例子可能写得更好
def getTotalSize(rdd: RDD[Row]): Long = {
// This can be a parameter
val NO_OF_SAMPLE_ROWS = 10l;
val totalRows = rdd.count();
var totalSize = 0l
if (totalRows > NO_OF_SAMPLE_ROWS) {
val sampleRDD = rdd.sample(true, NO_OF_SAMPLE_ROWS)
val sampleRDDSize = getRDDSize(sampleRDD)
totalSize = sampleRDDSize.*(totalRows)./(NO_OF_SAMPLE_ROWS)
} else {
// As the RDD is smaller than sample rows count, we can just calculate the total RDD size
totalSize = getRDDSize(rdd)
}
totalSize
}
def getRDDSize(rdd: RDD[Row]) : Long = {
var rddSize = 0l
val rows = rdd.collect()
for (i <- 0 until rows.length) {
rddSize += SizeEstimator.estimate(rows.apply(i).toSeq.map { value => value.asInstanceOf[AnyRef] })
}
rddSize
}
我认为 RDD.count() 会给你 RDD
中元素的数量如果您实际在集群上处理大数据,这是要使用的版本——即它消除了收集。
def calcRDDSize(rdd: RDD[Row]): Long = {
rdd.map(_.mkString(",").getBytes("UTF-8").length.toLong)
.reduce(_+_) //add the sizes together
}
def estimateRDDSize( rdd: RDD[Row], fraction: Double ) : Long = {
val sampleRDD = rdd.sample(true,fraction)
val sampleRDDsize = calcRDDSize(sampleRDD)
println(s"sampleRDDsize is ${sampleRDDsize/(1024*1024)} MB")
val sampleAvgRowSize = sampleRDDsize / sampleRDD.count()
println(s"sampleAvgRowSize is $sampleAvgRowSize")
val totalRows = rdd.count()
println(s"totalRows is $totalRows")
val estimatedTotalSize = totalRows * sampleAvgRowSize
val formatter = java.text.NumberFormat.getIntegerInstance
val estimateInMB = formatter.format(estimatedTotalSize/(1024*1024))
println(s"estimatedTotalSize is ${estimateInMB} MB")
return estimatedTotalSize
}
// estimate using 15% of data
val size = estimateRDDSize(df.rdd,0.15)