为什么 Apache Spark 不重新提交失败的任务?

Why Apache Spark not re-submit failed tasks?

我想模拟容错行为。我写了 "hard" 函数,但有时会失败。例如:

def myMap(v: String) = {
  // print task info and return "Ok" or throw exception
  val context = TaskContext.get()
  val r = scala.util.Random
  val raise = r.nextBoolean()
  println(s"--- map $v in partition ${context.partitionId()} in stage ${context.stageId()} raise = $raise")
  if ( raise )
    throw new RuntimeException("oh ;(")
  "Ok"
}

因为Spark有容错能力,我期望失败的任务会自动重新执行,但是下面的代码并没有:

import org.apache.log4j.{Level, Logger}
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext, TaskContext}

object Example {

  def main(args:Array[String]): Unit = {
    Logger.getLogger("org").setLevel(Level.WARN)
    Logger.getLogger("akka").setLevel(Level.WARN)

    val conf = new SparkConf()
      .setAppName("shuffle example")
      .setMaster("local[*]")
      .set("spark.task.maxFailures", "4") // it is default value

    val sc = new SparkContext(conf)


    val l:RDD[String] = sc.parallelize(List( "a", "b", "c"), 3)

    def myMap(v: String) = {
      // print task info and return "Ok" or throw exception
      val context = TaskContext.get()
      val r = scala.util.Random
      val raise = r.nextBoolean()
      println(s"--- map $v in partition ${context.partitionId()} in stage ${context.stageId()} raise = $raise")
      if ( raise )
        throw new Exception("oh ;(")
      "Ok"
    }

    println (l.map(myMap).collect().mkString("\n")) // failed 

    sc.stop()
  }
}

我做错了什么?

随机变量"r"会returntrue/false随机,与spark执行没有任何关系。

初始化时:

val r = scala.util.Random
val raise = r.nextBoolean()

raise 随机生成 true 或 false。所以你

 if ( raise )
        throw new Exception("oh ;(")
      "Ok"

正在随机工作。我不明白你想要达到什么目的。 我得到以下输出

--- map a in partition 0 in stage 0 raise = true
--- map b in partition 1 in stage 0 raise = false

当我重新 运行 它时,我得到

--- map a in partition 0 in stage 0 raise = false

raise 是一个随机生成的布尔值,所以有时它会在 b 的某个时间失败。

其实spark不支持本地模式的容错

在上面的示例中,如果将 mater 设置为独立(或 yarn)集群的某个主控,生成 jar 文件,并通过 spark-submit 运行 它,行为将如预期的那样:一些任务将是失败,重新提交。如果应用程序有一些单例(Scala 中的对象),它将在失败的任务中保持自己的状态。