在 Spark 中处理大量列时出现 StackOverflowError
StackOverflowError when operating with a large number of columns in Spark
我有一个宽数据框(130000 行 x 8700 列),当我尝试对所有列求和时出现以下错误:
Exception in thread "main" java.lang.WhosebugError
at scala.collection.generic.Growable$$anonfun$$plus$plus$eq.apply(Growable.scala:59)
at scala.collection.generic.Growable$$anonfun$$plus$plus$eq.apply(Growable.scala:59)
at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:35)
at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
at scala.collection.mutable.ListBuffer.$plus$plus$eq(ListBuffer.scala:183)
at scala.collection.mutable.ListBuffer.$plus$plus$eq(ListBuffer.scala:45)
at scala.collection.generic.GenericCompanion.apply(GenericCompanion.scala:49)
at org.apache.spark.sql.catalyst.expressions.BinaryExpression.children(Expression.scala:400)
at org.apache.spark.sql.catalyst.trees.TreeNode.containsChild$lzycompute(TreeNode.scala:88)
...
这是我的 Scala 代码:
val df = spark.read
.option("header", "false")
.option("delimiter", "\t")
.option("inferSchema", "true")
.csv("D:\Documents\Trabajo\Fábregas\matrizLuna\matrizRelativa")
val arrayList = df.drop("cups").columns
var colsList = List[Column]()
arrayList.foreach { c => colsList :+= col(c) }
val df_suma = df.withColumn("consumo_total", colsList.reduce(_ + _))
如果我对几列执行相同的操作,它工作正常,但是当我尝试对大量列进行归约操作时,我总是遇到同样的错误。
任何人都可以建议我该怎么做吗?列数有限制吗?
谢谢!
您可以使用不同的缩减方法来生成深度为 O(log(n))
的平衡二叉树,而不是深度为 O(n)
:
的退化线性化 BinaryExpression
链
def balancedReduce[X](list: List[X])(op: (X, X) => X): X = list match {
case Nil => throw new IllegalArgumentException("Cannot reduce empty list")
case List(x) => x
case xs => {
val n = xs.size
val (as, bs) = list.splitAt(n / 2)
op(balancedReduce(as)(op), balancedReduce(bs)(op))
}
}
现在在您的代码中,您可以替换
colsList.reduce(_ + _)
来自
balancedReduce(colsList)(_ + _)
一个小例子来进一步说明 BinaryExpression
s 会发生什么,可以在没有任何依赖的情况下编译:
sealed trait FormalExpr
case class BinOp(left: FormalExpr, right: FormalExpr) extends FormalExpr {
override def toString: String = {
val lStr = left.toString.split("\n").map(" " + _).mkString("\n")
val rStr = right.toString.split("\n").map(" " + _).mkString("\n")
return s"BinOp(\n${lStr}\n${rStr}\n)"
}
}
case object Leaf extends FormalExpr
val leafs = List.fill[FormalExpr](16){Leaf}
println(leafs.reduce(BinOp(_, _)))
println(balancedReduce(leafs)(BinOp(_, _)))
这就是普通的 reduce
所做的(这实际上是您的代码中发生的事情):
BinOp(
BinOp(
BinOp(
BinOp(
BinOp(
BinOp(
BinOp(
BinOp(
BinOp(
BinOp(
BinOp(
BinOp(
BinOp(
BinOp(
BinOp(
Leaf
Leaf
)
Leaf
)
Leaf
)
Leaf
)
Leaf
)
Leaf
)
Leaf
)
Leaf
)
Leaf
)
Leaf
)
Leaf
)
Leaf
)
Leaf
)
Leaf
)
Leaf
)
这是 balancedReduce
产生的:
BinOp(
BinOp(
BinOp(
BinOp(
Leaf
Leaf
)
BinOp(
Leaf
Leaf
)
)
BinOp(
BinOp(
Leaf
Leaf
)
BinOp(
Leaf
Leaf
)
)
)
BinOp(
BinOp(
BinOp(
Leaf
Leaf
)
BinOp(
Leaf
Leaf
)
)
BinOp(
BinOp(
Leaf
Leaf
)
BinOp(
Leaf
Leaf
)
)
)
)
线性化链的长度为 O(n)
,当 Catalyst 尝试对其求值时,它炸毁了堆栈。这不应该发生在深度为 O(log(n))
的扁平树上。
当我们谈论渐近运行时时:为什么要附加到可变 colsList
?这需要 O(n^2)
时间。为什么不直接在 .columns
的输出上调用 toList
?
我有一个宽数据框(130000 行 x 8700 列),当我尝试对所有列求和时出现以下错误:
Exception in thread "main" java.lang.WhosebugError at scala.collection.generic.Growable$$anonfun$$plus$plus$eq.apply(Growable.scala:59) at scala.collection.generic.Growable$$anonfun$$plus$plus$eq.apply(Growable.scala:59) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:35) at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59) at scala.collection.mutable.ListBuffer.$plus$plus$eq(ListBuffer.scala:183) at scala.collection.mutable.ListBuffer.$plus$plus$eq(ListBuffer.scala:45) at scala.collection.generic.GenericCompanion.apply(GenericCompanion.scala:49) at org.apache.spark.sql.catalyst.expressions.BinaryExpression.children(Expression.scala:400) at org.apache.spark.sql.catalyst.trees.TreeNode.containsChild$lzycompute(TreeNode.scala:88) ...
这是我的 Scala 代码:
val df = spark.read
.option("header", "false")
.option("delimiter", "\t")
.option("inferSchema", "true")
.csv("D:\Documents\Trabajo\Fábregas\matrizLuna\matrizRelativa")
val arrayList = df.drop("cups").columns
var colsList = List[Column]()
arrayList.foreach { c => colsList :+= col(c) }
val df_suma = df.withColumn("consumo_total", colsList.reduce(_ + _))
如果我对几列执行相同的操作,它工作正常,但是当我尝试对大量列进行归约操作时,我总是遇到同样的错误。
任何人都可以建议我该怎么做吗?列数有限制吗?
谢谢!
您可以使用不同的缩减方法来生成深度为 O(log(n))
的平衡二叉树,而不是深度为 O(n)
:
BinaryExpression
链
def balancedReduce[X](list: List[X])(op: (X, X) => X): X = list match {
case Nil => throw new IllegalArgumentException("Cannot reduce empty list")
case List(x) => x
case xs => {
val n = xs.size
val (as, bs) = list.splitAt(n / 2)
op(balancedReduce(as)(op), balancedReduce(bs)(op))
}
}
现在在您的代码中,您可以替换
colsList.reduce(_ + _)
来自
balancedReduce(colsList)(_ + _)
一个小例子来进一步说明 BinaryExpression
s 会发生什么,可以在没有任何依赖的情况下编译:
sealed trait FormalExpr
case class BinOp(left: FormalExpr, right: FormalExpr) extends FormalExpr {
override def toString: String = {
val lStr = left.toString.split("\n").map(" " + _).mkString("\n")
val rStr = right.toString.split("\n").map(" " + _).mkString("\n")
return s"BinOp(\n${lStr}\n${rStr}\n)"
}
}
case object Leaf extends FormalExpr
val leafs = List.fill[FormalExpr](16){Leaf}
println(leafs.reduce(BinOp(_, _)))
println(balancedReduce(leafs)(BinOp(_, _)))
这就是普通的 reduce
所做的(这实际上是您的代码中发生的事情):
BinOp(
BinOp(
BinOp(
BinOp(
BinOp(
BinOp(
BinOp(
BinOp(
BinOp(
BinOp(
BinOp(
BinOp(
BinOp(
BinOp(
BinOp(
Leaf
Leaf
)
Leaf
)
Leaf
)
Leaf
)
Leaf
)
Leaf
)
Leaf
)
Leaf
)
Leaf
)
Leaf
)
Leaf
)
Leaf
)
Leaf
)
Leaf
)
Leaf
)
这是 balancedReduce
产生的:
BinOp(
BinOp(
BinOp(
BinOp(
Leaf
Leaf
)
BinOp(
Leaf
Leaf
)
)
BinOp(
BinOp(
Leaf
Leaf
)
BinOp(
Leaf
Leaf
)
)
)
BinOp(
BinOp(
BinOp(
Leaf
Leaf
)
BinOp(
Leaf
Leaf
)
)
BinOp(
BinOp(
Leaf
Leaf
)
BinOp(
Leaf
Leaf
)
)
)
)
线性化链的长度为 O(n)
,当 Catalyst 尝试对其求值时,它炸毁了堆栈。这不应该发生在深度为 O(log(n))
的扁平树上。
当我们谈论渐近运行时时:为什么要附加到可变 colsList
?这需要 O(n^2)
时间。为什么不直接在 .columns
的输出上调用 toList
?