Spark 请求的数组大小超过 BufferHolder.grow 的 VM 限制
Spark Requested array size exceeds VM limit from BufferHolder.grow
我在 Hadoop 集群上的 Spark 2.1 运行 上,在混合 scala-python 应用程序(类似于 Zeppelin)上收到此错误:
18/04/09 08:19:34 ERROR Utils: Uncaught exception in thread stdout writer for /x/python/miniconda/bin/python
java.lang.OutOfMemoryError: Requested array size exceeds VM limit
at org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder.grow(BufferHolder.java:73)
at org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter.write(UnsafeRowWriter.java:214)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply6_4$(Unknown Source)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply7_16$(Unknown Source)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source)
at org.apache.spark.sql.execution.aggregate.AggregationIterator$$anonfun$generateResultProjection.apply(AggregationIterator.scala:232)
at org.apache.spark.sql.execution.aggregate.AggregationIterator$$anonfun$generateResultProjection.apply(AggregationIterator.scala:221)
at org.apache.spark.sql.execution.aggregate.SortBasedAggregationIterator.next(SortBasedAggregationIterator.scala:159)
at org.apache.spark.sql.execution.aggregate.SortBasedAggregationIterator.next(SortBasedAggregationIterator.scala:29)
at scala.collection.Iterator$$anon.next(Iterator.scala:409)
at scala.collection.Iterator$$anon.next(Iterator.scala:409)
at scala.collection.Iterator$GroupedIterator.takeDestructively(Iterator.scala:1076)
at scala.collection.Iterator$GroupedIterator.go(Iterator.scala:1091)
at scala.collection.Iterator$GroupedIterator.fill(Iterator.scala:1129)
at scala.collection.Iterator$GroupedIterator.hasNext(Iterator.scala:1132)
at scala.collection.Iterator$$anon.hasNext(Iterator.scala:408)
at scala.collection.Iterator$class.foreach(Iterator.scala:893)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:504)
at org.apache.spark.api.python.PythonRunner$WriterThread$$anonfun$run.apply(PythonRDD.scala:328)
at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1951)
at org.apache.spark.api.python.PythonRunner$WriterThread.run(PythonRDD.scala:269)
从 BufferHolder.grow
抛出这种错误似乎很奇怪,因为它包含显式检查:
if (neededSize > Integer.MAX_VALUE - totalSize()) {
throw new UnsupportedOperationException(
"Cannot grow BufferHolder by size " + neededSize + " because the size after growing " +
"exceeds size limitation " + Integer.MAX_VALUE);
}
但是在运行时它通过这个断言来初始化一个大小大于 Integer.MAX_VALUE 的数组(第 73 行)。这个错误似乎与配置调整无关(如果我错了请纠正我),所以我会跳过 application/cluster 的规范,除了 - 150 个执行程序,每个执行程序 2 个内核。 spark.sql.shuffle.partitions
设置为 8000 以尝试消除随机播放偏差。
PythonRDD的父RDD实际上是一个DataFrame,它是一个shuffle的结果,大约有30列,其中一列是非常大的String类型(最大100MB,但平均150KB)。我之所以提到这一点,是因为从堆栈跟踪来看,错误似乎是在随机读取和 PythonRDD 之间的某处引发的。此外,这总是发生在分区的最后 10%(输入数据是静态的),前 90% 完成时没有错误。
有人遇到过这个问题吗?或者可以阐明一下吗?
这是此处描述的内部 Spark 问题 - https://issues.apache.org/jira/browse/SPARK-22033 并在 2.3.0
中解决
我在 Hadoop 集群上的 Spark 2.1 运行 上,在混合 scala-python 应用程序(类似于 Zeppelin)上收到此错误:
18/04/09 08:19:34 ERROR Utils: Uncaught exception in thread stdout writer for /x/python/miniconda/bin/python
java.lang.OutOfMemoryError: Requested array size exceeds VM limit
at org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder.grow(BufferHolder.java:73)
at org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter.write(UnsafeRowWriter.java:214)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply6_4$(Unknown Source)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply7_16$(Unknown Source)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source)
at org.apache.spark.sql.execution.aggregate.AggregationIterator$$anonfun$generateResultProjection.apply(AggregationIterator.scala:232)
at org.apache.spark.sql.execution.aggregate.AggregationIterator$$anonfun$generateResultProjection.apply(AggregationIterator.scala:221)
at org.apache.spark.sql.execution.aggregate.SortBasedAggregationIterator.next(SortBasedAggregationIterator.scala:159)
at org.apache.spark.sql.execution.aggregate.SortBasedAggregationIterator.next(SortBasedAggregationIterator.scala:29)
at scala.collection.Iterator$$anon.next(Iterator.scala:409)
at scala.collection.Iterator$$anon.next(Iterator.scala:409)
at scala.collection.Iterator$GroupedIterator.takeDestructively(Iterator.scala:1076)
at scala.collection.Iterator$GroupedIterator.go(Iterator.scala:1091)
at scala.collection.Iterator$GroupedIterator.fill(Iterator.scala:1129)
at scala.collection.Iterator$GroupedIterator.hasNext(Iterator.scala:1132)
at scala.collection.Iterator$$anon.hasNext(Iterator.scala:408)
at scala.collection.Iterator$class.foreach(Iterator.scala:893)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:504)
at org.apache.spark.api.python.PythonRunner$WriterThread$$anonfun$run.apply(PythonRDD.scala:328)
at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1951)
at org.apache.spark.api.python.PythonRunner$WriterThread.run(PythonRDD.scala:269)
从 BufferHolder.grow
抛出这种错误似乎很奇怪,因为它包含显式检查:
if (neededSize > Integer.MAX_VALUE - totalSize()) {
throw new UnsupportedOperationException(
"Cannot grow BufferHolder by size " + neededSize + " because the size after growing " +
"exceeds size limitation " + Integer.MAX_VALUE);
}
但是在运行时它通过这个断言来初始化一个大小大于 Integer.MAX_VALUE 的数组(第 73 行)。这个错误似乎与配置调整无关(如果我错了请纠正我),所以我会跳过 application/cluster 的规范,除了 - 150 个执行程序,每个执行程序 2 个内核。 spark.sql.shuffle.partitions
设置为 8000 以尝试消除随机播放偏差。
PythonRDD的父RDD实际上是一个DataFrame,它是一个shuffle的结果,大约有30列,其中一列是非常大的String类型(最大100MB,但平均150KB)。我之所以提到这一点,是因为从堆栈跟踪来看,错误似乎是在随机读取和 PythonRDD 之间的某处引发的。此外,这总是发生在分区的最后 10%(输入数据是静态的),前 90% 完成时没有错误。
有人遇到过这个问题吗?或者可以阐明一下吗?
这是此处描述的内部 Spark 问题 - https://issues.apache.org/jira/browse/SPARK-22033 并在 2.3.0
中解决