在 Spark 中捕获异常会减慢执行速度
Catching exception in Spark slows the execution
有一个包含如下元素的 RDD:
( (n_1, n_2, r), List( (t1,t2), (t3,t4), ... )
我正在尝试执行以下操作:
def p_rec_l(k: Int, acc: Double, pvals_sum: Double, n_1: Int, n_2: Int, r: Int): Double = {
if (k==r-1) return 1 - (pvals_sum + acc)
return p_rec_l(k+1, acc*f(k.toDouble, n_1, n_2), pvals_sum+acc, n_1, n_2, r)
}
def f(k:Double, n_1: Int, n_2:Int): Double = (n_1-k)*(n_2-k)/((k+1)*(N-n_1-n_2+k+1))
N = 2000000
someRDD.map({
case (key, value) => (key, {
val n_1 = key._1; val n_2 = key._2; val r = key._3
val p_k0 = (0 to n_2-1).iterator.map(j => 1- n_1/(N-j.toDouble)).reduceLeft(_*_)
val pval = p_rec_l(0, p_k0, 0, n_1, n_2, r)
value.map({
case (t1, t2) => (t1, t2, n_1, n_2, r, pval)
})
})
})
但是如果r很大,就会出现栈溢出异常,整个进程崩溃。我这样编辑代码:
someRDD.map({
case (key, value) => (key, {
val n_1 = key._1; val n_2 = key._2; val r = key._3
val p_k0 = (0 to n_2-1).iterator.map(j => 1- n_1/(N-j.toDouble)).reduceLeft(_*_)
var pval = -1.0
try{
pval = p_rec_l(0, p_k0, 0, n_1, n_2, r)
} catch{
case e: java.lang.WhosebugError => pval = -1
}
value.map({
case (t1, t2) => (t1, t2, n_1, n_2, r, pval)
})
})
})
之前版本程序7小时左右完成,现在已经运行了36小时还没完成。
这个 try-catch 子句是否有可能大大减慢执行速度?如果是,有什么办法可以改善吗?
可能更好的解决方案是不捕获 WhosebugError
,而是用 @tailrec
注释标记你的函数(我认为它是尾递归),所以你应该避免 WhosebugError
完全
@tailrec def p_rec_l(k: Int, acc: Double, pvals_sum: Double, n_1: Int, n_2: Int, r: Int): Double = {
if (k==r-1) 1 - (pvals_sum + acc)
else p_rec_l(k+1, acc*f(k.toDouble, n_1, n_2), pvals_sum+acc, n_1, n_2, r)
}
另外,为了更好地理解你的问题,我理解你比较 成功 执行的执行时间没有 WhosebugError
和没有 try-catch ,以及另一个使用 try-catch 的执行,但使用不会导致 WhosebugError
的相同数据,所以 catch
本身在您比较时间时不起作用?
有一个包含如下元素的 RDD:
( (n_1, n_2, r), List( (t1,t2), (t3,t4), ... )
我正在尝试执行以下操作:
def p_rec_l(k: Int, acc: Double, pvals_sum: Double, n_1: Int, n_2: Int, r: Int): Double = {
if (k==r-1) return 1 - (pvals_sum + acc)
return p_rec_l(k+1, acc*f(k.toDouble, n_1, n_2), pvals_sum+acc, n_1, n_2, r)
}
def f(k:Double, n_1: Int, n_2:Int): Double = (n_1-k)*(n_2-k)/((k+1)*(N-n_1-n_2+k+1))
N = 2000000
someRDD.map({
case (key, value) => (key, {
val n_1 = key._1; val n_2 = key._2; val r = key._3
val p_k0 = (0 to n_2-1).iterator.map(j => 1- n_1/(N-j.toDouble)).reduceLeft(_*_)
val pval = p_rec_l(0, p_k0, 0, n_1, n_2, r)
value.map({
case (t1, t2) => (t1, t2, n_1, n_2, r, pval)
})
})
})
但是如果r很大,就会出现栈溢出异常,整个进程崩溃。我这样编辑代码:
someRDD.map({
case (key, value) => (key, {
val n_1 = key._1; val n_2 = key._2; val r = key._3
val p_k0 = (0 to n_2-1).iterator.map(j => 1- n_1/(N-j.toDouble)).reduceLeft(_*_)
var pval = -1.0
try{
pval = p_rec_l(0, p_k0, 0, n_1, n_2, r)
} catch{
case e: java.lang.WhosebugError => pval = -1
}
value.map({
case (t1, t2) => (t1, t2, n_1, n_2, r, pval)
})
})
})
之前版本程序7小时左右完成,现在已经运行了36小时还没完成。 这个 try-catch 子句是否有可能大大减慢执行速度?如果是,有什么办法可以改善吗?
可能更好的解决方案是不捕获 WhosebugError
,而是用 @tailrec
注释标记你的函数(我认为它是尾递归),所以你应该避免 WhosebugError
完全
@tailrec def p_rec_l(k: Int, acc: Double, pvals_sum: Double, n_1: Int, n_2: Int, r: Int): Double = {
if (k==r-1) 1 - (pvals_sum + acc)
else p_rec_l(k+1, acc*f(k.toDouble, n_1, n_2), pvals_sum+acc, n_1, n_2, r)
}
另外,为了更好地理解你的问题,我理解你比较 成功 执行的执行时间没有 WhosebugError
和没有 try-catch ,以及另一个使用 try-catch 的执行,但使用不会导致 WhosebugError
的相同数据,所以 catch
本身在您比较时间时不起作用?