如何并行化 Spark scala 计算?
How to parallelize Spark scala computation?
我有代码在聚类后计算误差平方和内,我主要从 Spark mllib 源代码中获取。
当我 运行 使用 spark API 的类似代码时,它 运行 在许多不同的(分布式)作业中 运行 成功。当我 运行 它是我的代码(它应该做与 Spark 代码相同的事情)时,我得到一个堆栈溢出错误。有什么想法吗?
代码如下:
import java.util.Arrays
import org.apache.spark.mllib.linalg.{Vectors, Vector}
import org.apache.spark.mllib.linalg._
import org.apache.spark.mllib.linalg.distributed.RowMatrix
import org.apache.spark.rdd.RDD
import org.apache.spark.api.java.JavaRDD
import breeze.linalg.{axpy => brzAxpy, inv, svd => brzSvd, DenseMatrix => BDM, DenseVector => BDV,
MatrixSingularException, SparseVector => BSV, CSCMatrix => BSM, Matrix => BM}
val EPSILON = {
var eps = 1.0
while ((1.0 + (eps / 2.0)) != 1.0) {
eps /= 2.0
}
eps
}
def dot(x: Vector, y: Vector): Double = {
require(x.size == y.size,
"BLAS.dot(x: Vector, y:Vector) was given Vectors with non-matching sizes:" +
" x.size = " + x.size + ", y.size = " + y.size)
(x, y) match {
case (dx: DenseVector, dy: DenseVector) =>
dot(dx, dy)
case (sx: SparseVector, dy: DenseVector) =>
dot(sx, dy)
case (dx: DenseVector, sy: SparseVector) =>
dot(sy, dx)
case (sx: SparseVector, sy: SparseVector) =>
dot(sx, sy)
case _ =>
throw new IllegalArgumentException(s"dot doesn't support (${x.getClass}, ${y.getClass}).")
}
}
def fastSquaredDistance(
v1: Vector,
norm1: Double,
v2: Vector,
norm2: Double,
precision: Double = 1e-6): Double = {
val n = v1.size
require(v2.size == n)
require(norm1 >= 0.0 && norm2 >= 0.0)
val sumSquaredNorm = norm1 * norm1 + norm2 * norm2
val normDiff = norm1 - norm2
var sqDist = 0.0
/*
* The relative error is
* <pre>
* EPSILON * ( \|a\|_2^2 + \|b\_2^2 + 2 |a^T b|) / ( \|a - b\|_2^2 ),
* </pre>
* which is bounded by
* <pre>
* 2.0 * EPSILON * ( \|a\|_2^2 + \|b\|_2^2 ) / ( (\|a\|_2 - \|b\|_2)^2 ).
* </pre>
* The bound doesn't need the inner product, so we can use it as a sufficient condition to
* check quickly whether the inner product approach is accurate.
*/
val precisionBound1 = 2.0 * EPSILON * sumSquaredNorm / (normDiff * normDiff + EPSILON)
if (precisionBound1 < precision) {
sqDist = sumSquaredNorm - 2.0 * dot(v1, v2)
} else if (v1.isInstanceOf[SparseVector] || v2.isInstanceOf[SparseVector]) {
val dotValue = dot(v1, v2)
sqDist = math.max(sumSquaredNorm - 2.0 * dotValue, 0.0)
val precisionBound2 = EPSILON * (sumSquaredNorm + 2.0 * math.abs(dotValue)) /
(sqDist + EPSILON)
if (precisionBound2 > precision) {
sqDist = Vectors.sqdist(v1, v2)
}
} else {
sqDist = Vectors.sqdist(v1, v2)
}
sqDist
}
def findClosest(
centers: TraversableOnce[Vector],
point: Vector): (Int, Double) = {
var bestDistance = Double.PositiveInfinity
var bestIndex = 0
var i = 0
centers.foreach { center =>
// Since `\|a - b\| \geq |\|a\| - \|b\||`, we can use this lower bound to avoid unnecessary
// distance computation.
var lowerBoundOfSqDist = Vectors.norm(center, 2.0) - Vectors.norm(point, 2.0)
lowerBoundOfSqDist = lowerBoundOfSqDist * lowerBoundOfSqDist
if (lowerBoundOfSqDist < bestDistance) {
val distance: Double = fastSquaredDistance(center, Vectors.norm(center, 2.0), point, Vectors.norm(point, 2.0))
if (distance < bestDistance) {
bestDistance = distance
bestIndex = i
}
}
i += 1
}
(bestIndex, bestDistance)
}
def pointCost(
centers: TraversableOnce[Vector],
point: Vector): Double =
findClosest(centers, point)._2
def clusterCentersIter: Iterable[Vector] =
clusterCenters.map(p => p)
def computeCostZep(indata: RDD[Vector]): Double = {
val bcCenters = indata.context.broadcast(clusterCenters)
indata.map(p => pointCost(bcCenters.value, p)).sum()
}
computeCostZep(projectedData)
我相信我正在使用与 spark 相同的所有并行化作业,但它对我不起作用。关于使我的代码 distributed/helping 了解为什么我的代码中发生内存溢出的任何建议都会非常有帮助
这里是一个link spark 中的源代码,非常相似:
KMeansModel and KMeans
这是 运行 的代码:
val clusters = KMeans.train(projectedData, numClusters, numIterations)
val clusterCenters = clusters.clusterCenters
// Evaluate clustering by computing Within Set Sum of Squared Errors
val WSSSE = clusters.computeCost(projectedData)
println("Within Set Sum of Squared Errors = " + WSSSE)
这是错误输出:
org.apache.spark.SparkException:作业因阶段失败而中止:阶段 94.0 中的任务 1 失败 4 次,最近的失败:阶段 94.0 中的任务 1.3 丢失(TID 37663,ip-172-31-13-209 .ec2.internal): java.lang.WhosebugError 在 $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$$$ $$c57ec8bf9b0d5f6161b97741d596ff0$$$$wC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC $$iwC$$iwC.dot(:226) 在 $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$$ $$$c57ec8bf9b0d5f6161b97741d596ff0$$$$wC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$万国表$$万国表$$iwC.dot(:226)
...
以后往下:
驱动程序堆栈跟踪:在 org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1431) 在 org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1419) 在 org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1418) 在 scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) 在 scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) 在 org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1418) 在 org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed $1.apply(DAGScheduler.scala:799) 在 org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799) 在 scala.Option.foreach(Option.scala:236) 在 org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:799) 在 org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1640) 在 org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive (DAGScheduler.scala:1599) 在 org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1588) 在 org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48 ) 在 org.apache.spark.scheduler.DAGScheduler.运行Job(DAGScheduler.scala:620) 在 org.apache.spark.SparkContext.runJob(SparkContext.scala:1832) 在 org.apache.spark.SparkContext.runJob(SparkContext.scala :1952) 在 org.apache.spark.rdd.RDD$$anonfun$fold$1.apply(RDD.scala:1088) 在 org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150) 在 org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111) 在 org.apache.spark.rdd.RDD.withScope(RDD.scala:316) 在 org.apache.spark.rdd.RDD.fold(RDD.scala:1082 ) 在 org.apache.spark.rdd.DoubleRDDFunctions$$anonfun$sum$1.apply$mcD$sp(DoubleRDDFunctions.scala:34) 在 org.apache.spark.rdd.DoubleRDDFunctions$$anonf un$sum$1.apply(DoubleRDDFunctions.scala:34) 在 org.apache.spark.rdd.DoubleRDDFunctions$$anonfun$sum$1.apply(DoubleRDDFunctions.scala:34) 在 org.apache.spark.rdd.RDD OperationScope$.withScope(RDDOperationScope.scala:150) 在 org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111) 在 org.apache.spark.rdd.RDD.withScope(RDD.scala:316) 在org.apache.spark.rdd.DoubleRDDFunctions.sum(DoubleRDDFunctions.scala:33)
发生的事情似乎很简单:您在这里递归调用 dot
方法:
def dot(x: Vector, y: Vector): Double = {
require(x.size == y.size,
"BLAS.dot(x: Vector, y:Vector) was given Vectors with non-matching sizes:" +
" x.size = " + x.size + ", y.size = " + y.size)
(x, y) match {
case (dx: DenseVector, dy: DenseVector) =>
dot(dx, dy)
case (sx: SparseVector, dy: DenseVector) =>
dot(sx, dy)
case (dx: DenseVector, sy: SparseVector) =>
dot(sy, dx)
case (sx: SparseVector, sy: SparseVector) =>
dot(sx, sy)
case _ =>
throw new IllegalArgumentException(s"dot doesn't support (${x.getClass}, ${y.getClass}).")
}
}
对 dot
的后续递归调用与前者 相同 参数 - 因此递归永远不会有结论。
堆栈跟踪也告诉您 - 注意位置在 点 方法:
java.lang.WhosebugError 在 $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$$$$$c57ec8bf9b0d5f6161b97741d596ff0 $$$$wC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC $$iwC.dot(:226) 在
我有代码在聚类后计算误差平方和内,我主要从 Spark mllib 源代码中获取。
当我 运行 使用 spark API 的类似代码时,它 运行 在许多不同的(分布式)作业中 运行 成功。当我 运行 它是我的代码(它应该做与 Spark 代码相同的事情)时,我得到一个堆栈溢出错误。有什么想法吗?
代码如下:
import java.util.Arrays
import org.apache.spark.mllib.linalg.{Vectors, Vector}
import org.apache.spark.mllib.linalg._
import org.apache.spark.mllib.linalg.distributed.RowMatrix
import org.apache.spark.rdd.RDD
import org.apache.spark.api.java.JavaRDD
import breeze.linalg.{axpy => brzAxpy, inv, svd => brzSvd, DenseMatrix => BDM, DenseVector => BDV,
MatrixSingularException, SparseVector => BSV, CSCMatrix => BSM, Matrix => BM}
val EPSILON = {
var eps = 1.0
while ((1.0 + (eps / 2.0)) != 1.0) {
eps /= 2.0
}
eps
}
def dot(x: Vector, y: Vector): Double = {
require(x.size == y.size,
"BLAS.dot(x: Vector, y:Vector) was given Vectors with non-matching sizes:" +
" x.size = " + x.size + ", y.size = " + y.size)
(x, y) match {
case (dx: DenseVector, dy: DenseVector) =>
dot(dx, dy)
case (sx: SparseVector, dy: DenseVector) =>
dot(sx, dy)
case (dx: DenseVector, sy: SparseVector) =>
dot(sy, dx)
case (sx: SparseVector, sy: SparseVector) =>
dot(sx, sy)
case _ =>
throw new IllegalArgumentException(s"dot doesn't support (${x.getClass}, ${y.getClass}).")
}
}
def fastSquaredDistance(
v1: Vector,
norm1: Double,
v2: Vector,
norm2: Double,
precision: Double = 1e-6): Double = {
val n = v1.size
require(v2.size == n)
require(norm1 >= 0.0 && norm2 >= 0.0)
val sumSquaredNorm = norm1 * norm1 + norm2 * norm2
val normDiff = norm1 - norm2
var sqDist = 0.0
/*
* The relative error is
* <pre>
* EPSILON * ( \|a\|_2^2 + \|b\_2^2 + 2 |a^T b|) / ( \|a - b\|_2^2 ),
* </pre>
* which is bounded by
* <pre>
* 2.0 * EPSILON * ( \|a\|_2^2 + \|b\|_2^2 ) / ( (\|a\|_2 - \|b\|_2)^2 ).
* </pre>
* The bound doesn't need the inner product, so we can use it as a sufficient condition to
* check quickly whether the inner product approach is accurate.
*/
val precisionBound1 = 2.0 * EPSILON * sumSquaredNorm / (normDiff * normDiff + EPSILON)
if (precisionBound1 < precision) {
sqDist = sumSquaredNorm - 2.0 * dot(v1, v2)
} else if (v1.isInstanceOf[SparseVector] || v2.isInstanceOf[SparseVector]) {
val dotValue = dot(v1, v2)
sqDist = math.max(sumSquaredNorm - 2.0 * dotValue, 0.0)
val precisionBound2 = EPSILON * (sumSquaredNorm + 2.0 * math.abs(dotValue)) /
(sqDist + EPSILON)
if (precisionBound2 > precision) {
sqDist = Vectors.sqdist(v1, v2)
}
} else {
sqDist = Vectors.sqdist(v1, v2)
}
sqDist
}
def findClosest(
centers: TraversableOnce[Vector],
point: Vector): (Int, Double) = {
var bestDistance = Double.PositiveInfinity
var bestIndex = 0
var i = 0
centers.foreach { center =>
// Since `\|a - b\| \geq |\|a\| - \|b\||`, we can use this lower bound to avoid unnecessary
// distance computation.
var lowerBoundOfSqDist = Vectors.norm(center, 2.0) - Vectors.norm(point, 2.0)
lowerBoundOfSqDist = lowerBoundOfSqDist * lowerBoundOfSqDist
if (lowerBoundOfSqDist < bestDistance) {
val distance: Double = fastSquaredDistance(center, Vectors.norm(center, 2.0), point, Vectors.norm(point, 2.0))
if (distance < bestDistance) {
bestDistance = distance
bestIndex = i
}
}
i += 1
}
(bestIndex, bestDistance)
}
def pointCost(
centers: TraversableOnce[Vector],
point: Vector): Double =
findClosest(centers, point)._2
def clusterCentersIter: Iterable[Vector] =
clusterCenters.map(p => p)
def computeCostZep(indata: RDD[Vector]): Double = {
val bcCenters = indata.context.broadcast(clusterCenters)
indata.map(p => pointCost(bcCenters.value, p)).sum()
}
computeCostZep(projectedData)
我相信我正在使用与 spark 相同的所有并行化作业,但它对我不起作用。关于使我的代码 distributed/helping 了解为什么我的代码中发生内存溢出的任何建议都会非常有帮助
这里是一个link spark 中的源代码,非常相似: KMeansModel and KMeans
这是 运行 的代码:
val clusters = KMeans.train(projectedData, numClusters, numIterations)
val clusterCenters = clusters.clusterCenters
// Evaluate clustering by computing Within Set Sum of Squared Errors
val WSSSE = clusters.computeCost(projectedData)
println("Within Set Sum of Squared Errors = " + WSSSE)
这是错误输出:
org.apache.spark.SparkException:作业因阶段失败而中止:阶段 94.0 中的任务 1 失败 4 次,最近的失败:阶段 94.0 中的任务 1.3 丢失(TID 37663,ip-172-31-13-209 .ec2.internal): java.lang.WhosebugError 在 $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$$$ $$c57ec8bf9b0d5f6161b97741d596ff0$$$$wC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC $$iwC$$iwC.dot(:226) 在 $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$$ $$$c57ec8bf9b0d5f6161b97741d596ff0$$$$wC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$万国表$$万国表$$iwC.dot(:226) ...
以后往下:
驱动程序堆栈跟踪:在 org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1431) 在 org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1419) 在 org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1418) 在 scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) 在 scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) 在 org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1418) 在 org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed $1.apply(DAGScheduler.scala:799) 在 org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799) 在 scala.Option.foreach(Option.scala:236) 在 org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:799) 在 org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1640) 在 org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive (DAGScheduler.scala:1599) 在 org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1588) 在 org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48 ) 在 org.apache.spark.scheduler.DAGScheduler.运行Job(DAGScheduler.scala:620) 在 org.apache.spark.SparkContext.runJob(SparkContext.scala:1832) 在 org.apache.spark.SparkContext.runJob(SparkContext.scala :1952) 在 org.apache.spark.rdd.RDD$$anonfun$fold$1.apply(RDD.scala:1088) 在 org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150) 在 org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111) 在 org.apache.spark.rdd.RDD.withScope(RDD.scala:316) 在 org.apache.spark.rdd.RDD.fold(RDD.scala:1082 ) 在 org.apache.spark.rdd.DoubleRDDFunctions$$anonfun$sum$1.apply$mcD$sp(DoubleRDDFunctions.scala:34) 在 org.apache.spark.rdd.DoubleRDDFunctions$$anonf un$sum$1.apply(DoubleRDDFunctions.scala:34) 在 org.apache.spark.rdd.DoubleRDDFunctions$$anonfun$sum$1.apply(DoubleRDDFunctions.scala:34) 在 org.apache.spark.rdd.RDD OperationScope$.withScope(RDDOperationScope.scala:150) 在 org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111) 在 org.apache.spark.rdd.RDD.withScope(RDD.scala:316) 在org.apache.spark.rdd.DoubleRDDFunctions.sum(DoubleRDDFunctions.scala:33)
发生的事情似乎很简单:您在这里递归调用 dot
方法:
def dot(x: Vector, y: Vector): Double = {
require(x.size == y.size,
"BLAS.dot(x: Vector, y:Vector) was given Vectors with non-matching sizes:" +
" x.size = " + x.size + ", y.size = " + y.size)
(x, y) match {
case (dx: DenseVector, dy: DenseVector) =>
dot(dx, dy)
case (sx: SparseVector, dy: DenseVector) =>
dot(sx, dy)
case (dx: DenseVector, sy: SparseVector) =>
dot(sy, dx)
case (sx: SparseVector, sy: SparseVector) =>
dot(sx, sy)
case _ =>
throw new IllegalArgumentException(s"dot doesn't support (${x.getClass}, ${y.getClass}).")
}
}
对 dot
的后续递归调用与前者 相同 参数 - 因此递归永远不会有结论。
堆栈跟踪也告诉您 - 注意位置在 点 方法:
java.lang.WhosebugError 在 $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$$$$$c57ec8bf9b0d5f6161b97741d596ff0 $$$$wC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC $$iwC.dot(:226) 在