我怎样才能找到一个RDD的大小

How can I find the size of a RDD

我有 RDD[Row],需要将其保存到第三方存储库。 但是这个第三方存储库在一次调用中最多接受 5 MB。

所以我想根据 RDD 中存在的数据大小而不是 RDD 中存在的行数来创建分区。

如何找到 RDD 的大小并根据它创建分区?

这将取决于连载等因素,所以不是一成不变的。但是,您可以获取样本集并运行对该样本数据进行一些实验,从那里进行推断。

一个直接的方法是调用下面的方法,这取决于你是否想以序列化的形式存储你的数据,然后转到 spark UI "Storage" 页面,你应该可以计算出 RDD 的总大小(内存 + 磁盘):

rdd.persist(StorageLevel.MEMORY_AND_DISK)

or

rdd.persist(StorageLevel.MEMORY_AND_DISK_SER)

在运行时计算准确的内存大小并不容易。不过,您可以尝试在运行时进行估算:根据离线采样的数据大小,例如,X 行离线使用 Y GB,Z 行在运行时可能需要 Z*Y/X GB;这与 Justin 之前建议的类似。

希望这对您有所帮助。

正如 Justin 和 Wang 提到的那样,获取 RDD 的大小并不是直接的。我们可以做一个估计。

我们可以对一个RDD进行采样,然后使用SizeEstimator来获取样本的大小。 正如 Wang 和 Justin 提到的, 基于离线采样的数据大小,比如说,X 行离线使用了 Y GB,Z 行在运行时可能需要 Z*Y/X GB

这是获取 RDD 的 size/estimate 的示例 Scala 代码。

我是 scala 和 spark 的新手。下面的例子可能写得更好

def getTotalSize(rdd: RDD[Row]): Long = {
  // This can be a parameter
  val NO_OF_SAMPLE_ROWS = 10l;
  val totalRows = rdd.count();
  var totalSize = 0l
  if (totalRows > NO_OF_SAMPLE_ROWS) {
    val sampleRDD = rdd.sample(true, NO_OF_SAMPLE_ROWS)
    val sampleRDDSize = getRDDSize(sampleRDD)
    totalSize = sampleRDDSize.*(totalRows)./(NO_OF_SAMPLE_ROWS)
  } else {
    // As the RDD is smaller than sample rows count, we can just calculate the total RDD size
    totalSize = getRDDSize(rdd)
  }

  totalSize
}

def getRDDSize(rdd: RDD[Row]) : Long = {
    var rddSize = 0l
    val rows = rdd.collect()
    for (i <- 0 until rows.length) {
       rddSize += SizeEstimator.estimate(rows.apply(i).toSeq.map { value => value.asInstanceOf[AnyRef] })
    }

    rddSize
}

我认为 RDD.count() 会给你 RDD

中元素的数量

如果您实际在集群上处理大数据,这是要使用的版本——即它消除了收集。

def calcRDDSize(rdd: RDD[Row]): Long = {
  rdd.map(_.mkString(",").getBytes("UTF-8").length.toLong)
     .reduce(_+_) //add the sizes together
}

def estimateRDDSize( rdd: RDD[Row], fraction: Double ) : Long = {
  val sampleRDD = rdd.sample(true,fraction)
  val sampleRDDsize = calcRDDSize(sampleRDD)
  println(s"sampleRDDsize is ${sampleRDDsize/(1024*1024)} MB")

  val sampleAvgRowSize = sampleRDDsize / sampleRDD.count()
  println(s"sampleAvgRowSize is $sampleAvgRowSize")

  val totalRows = rdd.count()
  println(s"totalRows is $totalRows")

  val estimatedTotalSize = totalRows * sampleAvgRowSize
  val formatter = java.text.NumberFormat.getIntegerInstance
  val estimateInMB = formatter.format(estimatedTotalSize/(1024*1024))
  println(s"estimatedTotalSize is ${estimateInMB} MB")

  return estimatedTotalSize
}

// estimate using 15% of data
val size = estimateRDDSize(df.rdd,0.15)