为什么 Apache Spark 不重新提交失败的任务?
Why Apache Spark not re-submit failed tasks?
我想模拟容错行为。我写了 "hard" 函数,但有时会失败。例如:
def myMap(v: String) = {
// print task info and return "Ok" or throw exception
val context = TaskContext.get()
val r = scala.util.Random
val raise = r.nextBoolean()
println(s"--- map $v in partition ${context.partitionId()} in stage ${context.stageId()} raise = $raise")
if ( raise )
throw new RuntimeException("oh ;(")
"Ok"
}
因为Spark有容错能力,我期望失败的任务会自动重新执行,但是下面的代码并没有:
import org.apache.log4j.{Level, Logger}
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext, TaskContext}
object Example {
def main(args:Array[String]): Unit = {
Logger.getLogger("org").setLevel(Level.WARN)
Logger.getLogger("akka").setLevel(Level.WARN)
val conf = new SparkConf()
.setAppName("shuffle example")
.setMaster("local[*]")
.set("spark.task.maxFailures", "4") // it is default value
val sc = new SparkContext(conf)
val l:RDD[String] = sc.parallelize(List( "a", "b", "c"), 3)
def myMap(v: String) = {
// print task info and return "Ok" or throw exception
val context = TaskContext.get()
val r = scala.util.Random
val raise = r.nextBoolean()
println(s"--- map $v in partition ${context.partitionId()} in stage ${context.stageId()} raise = $raise")
if ( raise )
throw new Exception("oh ;(")
"Ok"
}
println (l.map(myMap).collect().mkString("\n")) // failed
sc.stop()
}
}
我做错了什么?
随机变量"r"会returntrue/false随机,与spark执行没有任何关系。
初始化时:
val r = scala.util.Random
val raise = r.nextBoolean()
raise 随机生成 true 或 false。所以你
if ( raise )
throw new Exception("oh ;(")
"Ok"
正在随机工作。我不明白你想要达到什么目的。
我得到以下输出
--- map a in partition 0 in stage 0 raise = true
--- map b in partition 1 in stage 0 raise = false
当我重新 运行 它时,我得到
--- map a in partition 0 in stage 0 raise = false
raise 是一个随机生成的布尔值,所以有时它会在 b 的某个时间失败。
其实spark不支持本地模式的容错
在上面的示例中,如果将 mater 设置为独立(或 yarn)集群的某个主控,生成 jar 文件,并通过 spark-submit 运行 它,行为将如预期的那样:一些任务将是失败,重新提交。如果应用程序有一些单例(Scala 中的对象),它将在失败的任务中保持自己的状态。
我想模拟容错行为。我写了 "hard" 函数,但有时会失败。例如:
def myMap(v: String) = {
// print task info and return "Ok" or throw exception
val context = TaskContext.get()
val r = scala.util.Random
val raise = r.nextBoolean()
println(s"--- map $v in partition ${context.partitionId()} in stage ${context.stageId()} raise = $raise")
if ( raise )
throw new RuntimeException("oh ;(")
"Ok"
}
因为Spark有容错能力,我期望失败的任务会自动重新执行,但是下面的代码并没有:
import org.apache.log4j.{Level, Logger}
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext, TaskContext}
object Example {
def main(args:Array[String]): Unit = {
Logger.getLogger("org").setLevel(Level.WARN)
Logger.getLogger("akka").setLevel(Level.WARN)
val conf = new SparkConf()
.setAppName("shuffle example")
.setMaster("local[*]")
.set("spark.task.maxFailures", "4") // it is default value
val sc = new SparkContext(conf)
val l:RDD[String] = sc.parallelize(List( "a", "b", "c"), 3)
def myMap(v: String) = {
// print task info and return "Ok" or throw exception
val context = TaskContext.get()
val r = scala.util.Random
val raise = r.nextBoolean()
println(s"--- map $v in partition ${context.partitionId()} in stage ${context.stageId()} raise = $raise")
if ( raise )
throw new Exception("oh ;(")
"Ok"
}
println (l.map(myMap).collect().mkString("\n")) // failed
sc.stop()
}
}
我做错了什么?
随机变量"r"会returntrue/false随机,与spark执行没有任何关系。
初始化时:
val r = scala.util.Random
val raise = r.nextBoolean()
raise 随机生成 true 或 false。所以你
if ( raise )
throw new Exception("oh ;(")
"Ok"
正在随机工作。我不明白你想要达到什么目的。 我得到以下输出
--- map a in partition 0 in stage 0 raise = true
--- map b in partition 1 in stage 0 raise = false
当我重新 运行 它时,我得到
--- map a in partition 0 in stage 0 raise = false
raise 是一个随机生成的布尔值,所以有时它会在 b 的某个时间失败。
其实spark不支持本地模式的容错
在上面的示例中,如果将 mater 设置为独立(或 yarn)集群的某个主控,生成 jar 文件,并通过 spark-submit 运行 它,行为将如预期的那样:一些任务将是失败,重新提交。如果应用程序有一些单例(Scala 中的对象),它将在失败的任务中保持自己的状态。