当合并大量 RDD 时引发堆栈溢出错误
Spark when union a lot of RDD throws stack overflow error
当我使用“++”组合大量RDD时,出现错误stack over flow错误。
Spark 版本 1.3.1
环境:纱线客户端。 --驱动内存8G
RDD数量超过4000个,每个RDD都是从一个1GB大小的文本文件中读取的
是这样生成的
val collection = (for (
path <- files
) yield sc.textFile(path)).reduce(_ union _)
当 files
尺寸较小时,它工作正常。
还有错误
错误重复出现。估计是递归函数调用太多了?
Exception at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)
at org.apache.spark.rdd.UnionRDD$$anonfun.apply(UnionRDD.scala:66)
at org.apache.spark.rdd.UnionRDD$$anonfun.apply(UnionRDD.scala:66)
at scala.collection.TraversableLike$$anonfun$map.apply(TraversableLike.scala:244)
at scala.collection.TraversableLike$$anonfun$map.apply(TraversableLike.scala:244)
at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:34)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at scala.collection.AbstractTraversable.map(Traversable.scala:105)
at org.apache.spark.rdd.UnionRDD.getPartitions(UnionRDD.scala:66)
at org.apache.spark.rdd.RDD$$anonfun$partitions.apply(RDD.scala:219)
at org.apache.spark.rdd.RDD$$anonfun$partitions.apply(RDD.scala:217)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)
at org.apache.spark.rdd.UnionRDD$$anonfun.apply(UnionRDD.scala:66)
at org.apache.spark.rdd.UnionRDD$$anonfun.apply(UnionRDD.scala:66)
at scala.collection.TraversableLike$$anonfun$map.apply(TraversableLike.scala:244)
at scala.collection.TraversableLike$$anonfun$map.apply(TraversableLike.scala:244)
at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:34)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at scala.collection.AbstractTraversable.map(Traversable.scala:105)
at org.apache.spark.rdd.UnionRDD.getPartitions(UnionRDD.scala:66)
at org.apache.spark.rdd.RDD$$anonfun$partitions.apply(RDD.scala:219)
at org.apache.spark.rdd.RDD$$anonfun$partitions.apply(RDD.scala:217)
at scala.Option.getOrElse(Option.scala:120)
.....
改为使用SparkContext.union(...)
一次联合多个RDD。
你不想像那样一次一个地做,因为 RDD.union() 为每个 RDD 在沿袭中创建了一个新步骤(任何计算中的一组额外堆栈帧),而SparkContext.union() 一次性完成。这将确保不会出现堆栈溢出错误。
好像当一个个union RDD可以进入一系列很长的递归函数调用。
在这种情况下,我们需要增加 JVM 堆栈内存。
在带有选项 --driver-java-options "-Xss 100M"
的 spark 中,驱动程序 jvm 堆栈内存配置为 100M。
Sean Owen 的解决方案也以更优雅的方式解决了这个问题。
当我使用“++”组合大量RDD时,出现错误stack over flow错误。
Spark 版本 1.3.1 环境:纱线客户端。 --驱动内存8G
RDD数量超过4000个,每个RDD都是从一个1GB大小的文本文件中读取的
是这样生成的
val collection = (for (
path <- files
) yield sc.textFile(path)).reduce(_ union _)
当 files
尺寸较小时,它工作正常。
还有错误
错误重复出现。估计是递归函数调用太多了?
Exception at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)
at org.apache.spark.rdd.UnionRDD$$anonfun.apply(UnionRDD.scala:66)
at org.apache.spark.rdd.UnionRDD$$anonfun.apply(UnionRDD.scala:66)
at scala.collection.TraversableLike$$anonfun$map.apply(TraversableLike.scala:244)
at scala.collection.TraversableLike$$anonfun$map.apply(TraversableLike.scala:244)
at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:34)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at scala.collection.AbstractTraversable.map(Traversable.scala:105)
at org.apache.spark.rdd.UnionRDD.getPartitions(UnionRDD.scala:66)
at org.apache.spark.rdd.RDD$$anonfun$partitions.apply(RDD.scala:219)
at org.apache.spark.rdd.RDD$$anonfun$partitions.apply(RDD.scala:217)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)
at org.apache.spark.rdd.UnionRDD$$anonfun.apply(UnionRDD.scala:66)
at org.apache.spark.rdd.UnionRDD$$anonfun.apply(UnionRDD.scala:66)
at scala.collection.TraversableLike$$anonfun$map.apply(TraversableLike.scala:244)
at scala.collection.TraversableLike$$anonfun$map.apply(TraversableLike.scala:244)
at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:34)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at scala.collection.AbstractTraversable.map(Traversable.scala:105)
at org.apache.spark.rdd.UnionRDD.getPartitions(UnionRDD.scala:66)
at org.apache.spark.rdd.RDD$$anonfun$partitions.apply(RDD.scala:219)
at org.apache.spark.rdd.RDD$$anonfun$partitions.apply(RDD.scala:217)
at scala.Option.getOrElse(Option.scala:120)
.....
改为使用SparkContext.union(...)
一次联合多个RDD。
你不想像那样一次一个地做,因为 RDD.union() 为每个 RDD 在沿袭中创建了一个新步骤(任何计算中的一组额外堆栈帧),而SparkContext.union() 一次性完成。这将确保不会出现堆栈溢出错误。
好像当一个个union RDD可以进入一系列很长的递归函数调用。
在这种情况下,我们需要增加 JVM 堆栈内存。
在带有选项 --driver-java-options "-Xss 100M"
的 spark 中,驱动程序 jvm 堆栈内存配置为 100M。
Sean Owen 的解决方案也以更优雅的方式解决了这个问题。