当合并大量 RDD 时引发堆栈溢出错误

Spark when union a lot of RDD throws stack overflow error

当我使用“++”组合大量RDD时,出现错误stack over flow错误。

Spark 版本 1.3.1 环境:纱线客户端。 --驱动内存8G

RDD数量超过4000个,每个RDD都是从一个1GB大小的文本文件中读取的

是这样生成的

val collection = (for (
  path <- files
) yield sc.textFile(path)).reduce(_ union _)

files 尺寸较小时,它工作正常。 还有错误

错误重复出现。估计是递归函数调用太多了?

 Exception at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)
    at org.apache.spark.rdd.UnionRDD$$anonfun.apply(UnionRDD.scala:66)
    at org.apache.spark.rdd.UnionRDD$$anonfun.apply(UnionRDD.scala:66)
    at scala.collection.TraversableLike$$anonfun$map.apply(TraversableLike.scala:244)
    at scala.collection.TraversableLike$$anonfun$map.apply(TraversableLike.scala:244)
    at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
    at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:34)
    at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
    at scala.collection.AbstractTraversable.map(Traversable.scala:105)
    at org.apache.spark.rdd.UnionRDD.getPartitions(UnionRDD.scala:66)
    at org.apache.spark.rdd.RDD$$anonfun$partitions.apply(RDD.scala:219)
    at org.apache.spark.rdd.RDD$$anonfun$partitions.apply(RDD.scala:217)
    at scala.Option.getOrElse(Option.scala:120)
    at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)
    at org.apache.spark.rdd.UnionRDD$$anonfun.apply(UnionRDD.scala:66)
    at org.apache.spark.rdd.UnionRDD$$anonfun.apply(UnionRDD.scala:66)
    at scala.collection.TraversableLike$$anonfun$map.apply(TraversableLike.scala:244)
    at scala.collection.TraversableLike$$anonfun$map.apply(TraversableLike.scala:244)
    at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
    at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:34)
    at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
    at scala.collection.AbstractTraversable.map(Traversable.scala:105)
    at org.apache.spark.rdd.UnionRDD.getPartitions(UnionRDD.scala:66)
    at org.apache.spark.rdd.RDD$$anonfun$partitions.apply(RDD.scala:219)
    at org.apache.spark.rdd.RDD$$anonfun$partitions.apply(RDD.scala:217)
    at scala.Option.getOrElse(Option.scala:120)
  .....

改为使用SparkContext.union(...)一次联合多个RDD。

你不想像那样一次一个地做,因为 RDD.union() 为每个 RDD 在沿袭中创建了一个新步骤(任何计算中的一组额外堆栈帧),而SparkContext.union() 一次性完成。这将确保不会出现堆栈溢出错误。

好像当一个个union RDD可以进入一系列很长的递归函数调用。 在这种情况下,我们需要增加 JVM 堆栈内存。 在带有选项 --driver-java-options "-Xss 100M" 的 spark 中,驱动程序 jvm 堆栈内存配置为 100M。

Sean Owen 的解决方案也以更优雅的方式解决了这个问题。