Spark 驱动程序不会因异常而崩溃

Spark driver doesn't crash on exception

我们在客户端模式下在 Kubernetes 上 运行 Spark 3.1.1。

我们是一个简单的 scala spark 应用程序,它从 S3 加载 parquet 文件并聚合它们:

sparkSession.read.parquet(paths).as[MyRawEvent]

我们的应用程序在快乐路径上完美运行:驱动程序 pod 启动 运行,执行程序 pods 加入聚会,当应用程序完成时,执行程序和驱动程序都终止。

另一方面,如果出现问题,驱动程序和执行程序 pods 都会保持 Running 状态。例如,如果上述 paths 之一不存在,则会发生异常(在驱动程序中):

Exception in thread "main" org.apache.spark.sql.AnalysisException: Path does not exist: s3a://<bucket-name>/client-id=8765432/date=2021-08-06
     at org.apache.spark.sql.execution.datasources.DataSource$.$anonfun$checkAndGlobPathIfNecessary(DataSource.scala:803)
     at org.apache.spark.sql.execution.datasources.DataSource$.$anonfun$checkAndGlobPathIfNecessary$adapted(DataSource.scala:800)
     at org.apache.spark.util.ThreadUtils$.$anonfun$parmap(ThreadUtils.scala:372)
     at scala.concurrent.Future$.$anonfun$apply(Future.scala:659)
     at scala.util.Success.$anonfun$map(Try.scala:255)
     at scala.util.Success.map(Try.scala:213)
     at scala.concurrent.Future.$anonfun$map(Future.scala:292)
     at scala.concurrent.impl.Promise.liftedTree1(Promise.scala:33)
     at scala.concurrent.impl.Promise.$anonfun$transform(Promise.scala:33)
     at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:64)
     at java.base/java.util.concurrent.ForkJoinTask$RunnableExecuteAction.exec(Unknown Source)
     at java.base/java.util.concurrent.ForkJoinTask.doExec(Unknown Source)
     at java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(Unknown Source)
     at java.base/java.util.concurrent.ForkJoinPool.scan(Unknown Source)
     at java.base/java.util.concurrent.ForkJoinPool.runWorker(Unknown Source)
     at java.base/java.util.concurrent.ForkJoinWorkerThread.run(Unknown Source)

有趣的是,此异常不会阻止执行程序立即启动,并且驱动程序和执行程序 pods 永远卡住,什么都不做。

我们没有在我们的应用程序中捕获异常,我们希望驱动程序和执行程序会停止,而不是浪费冗余资源。

我们怎样才能粉碎应用程序,使其不会永远停留在 Running 状态?

嗯,这很简单。

我必须捕获所有异常以确保无论如何都关闭 spark 上下文:

  def main(args: Array[String]): Unit = {
    // some code
    implicit val sparkSession = SparkSession.builder().getOrCreate
    try {
      // application code with potential exceptions
    } catch {
      case exception: Exception =>
        sparkSession.close()
        throw exception
    }

    sparkSession.close()
  }

这样所有资源都被释放并且驱动程序 pod 将其状态更改为 Error 作为例外。

编辑 - 以 Scala 的方式进行:

  def main(args: Array[String]): Unit = {
    // some code
    implicit val sparkSession = SparkSession.builder().getOrCreate
    Try {
      // application code with potential exceptions
    } match {
      case Success(_) => None
      case Failure(exception) =>
        sparkSession.close()
        throw exception
    }

    sparkSession.close()
  }