Spark 驱动程序不会因异常而崩溃
Spark driver doesn't crash on exception
我们在客户端模式下在 Kubernetes 上 运行 Spark 3.1.1。
我们是一个简单的 scala spark 应用程序,它从 S3 加载 parquet 文件并聚合它们:
sparkSession.read.parquet(paths).as[MyRawEvent]
我们的应用程序在快乐路径上完美运行:驱动程序 pod 启动 运行,执行程序 pods 加入聚会,当应用程序完成时,执行程序和驱动程序都终止。
另一方面,如果出现问题,驱动程序和执行程序 pods 都会保持 Running
状态。例如,如果上述 paths
之一不存在,则会发生异常(在驱动程序中):
Exception in thread "main" org.apache.spark.sql.AnalysisException: Path does not exist: s3a://<bucket-name>/client-id=8765432/date=2021-08-06
at org.apache.spark.sql.execution.datasources.DataSource$.$anonfun$checkAndGlobPathIfNecessary(DataSource.scala:803)
at org.apache.spark.sql.execution.datasources.DataSource$.$anonfun$checkAndGlobPathIfNecessary$adapted(DataSource.scala:800)
at org.apache.spark.util.ThreadUtils$.$anonfun$parmap(ThreadUtils.scala:372)
at scala.concurrent.Future$.$anonfun$apply(Future.scala:659)
at scala.util.Success.$anonfun$map(Try.scala:255)
at scala.util.Success.map(Try.scala:213)
at scala.concurrent.Future.$anonfun$map(Future.scala:292)
at scala.concurrent.impl.Promise.liftedTree1(Promise.scala:33)
at scala.concurrent.impl.Promise.$anonfun$transform(Promise.scala:33)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:64)
at java.base/java.util.concurrent.ForkJoinTask$RunnableExecuteAction.exec(Unknown Source)
at java.base/java.util.concurrent.ForkJoinTask.doExec(Unknown Source)
at java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(Unknown Source)
at java.base/java.util.concurrent.ForkJoinPool.scan(Unknown Source)
at java.base/java.util.concurrent.ForkJoinPool.runWorker(Unknown Source)
at java.base/java.util.concurrent.ForkJoinWorkerThread.run(Unknown Source)
有趣的是,此异常不会阻止执行程序立即启动,并且驱动程序和执行程序 pods 永远卡住,什么都不做。
我们没有在我们的应用程序中捕获异常,我们希望驱动程序和执行程序会停止,而不是浪费冗余资源。
我们怎样才能粉碎应用程序,使其不会永远停留在 Running
状态?
嗯,这很简单。
我必须捕获所有异常以确保无论如何都关闭 spark 上下文:
def main(args: Array[String]): Unit = {
// some code
implicit val sparkSession = SparkSession.builder().getOrCreate
try {
// application code with potential exceptions
} catch {
case exception: Exception =>
sparkSession.close()
throw exception
}
sparkSession.close()
}
这样所有资源都被释放并且驱动程序 pod 将其状态更改为 Error
作为例外。
编辑 - 以 Scala 的方式进行:
def main(args: Array[String]): Unit = {
// some code
implicit val sparkSession = SparkSession.builder().getOrCreate
Try {
// application code with potential exceptions
} match {
case Success(_) => None
case Failure(exception) =>
sparkSession.close()
throw exception
}
sparkSession.close()
}
我们在客户端模式下在 Kubernetes 上 运行 Spark 3.1.1。
我们是一个简单的 scala spark 应用程序,它从 S3 加载 parquet 文件并聚合它们:
sparkSession.read.parquet(paths).as[MyRawEvent]
我们的应用程序在快乐路径上完美运行:驱动程序 pod 启动 运行,执行程序 pods 加入聚会,当应用程序完成时,执行程序和驱动程序都终止。
另一方面,如果出现问题,驱动程序和执行程序 pods 都会保持 Running
状态。例如,如果上述 paths
之一不存在,则会发生异常(在驱动程序中):
Exception in thread "main" org.apache.spark.sql.AnalysisException: Path does not exist: s3a://<bucket-name>/client-id=8765432/date=2021-08-06
at org.apache.spark.sql.execution.datasources.DataSource$.$anonfun$checkAndGlobPathIfNecessary(DataSource.scala:803)
at org.apache.spark.sql.execution.datasources.DataSource$.$anonfun$checkAndGlobPathIfNecessary$adapted(DataSource.scala:800)
at org.apache.spark.util.ThreadUtils$.$anonfun$parmap(ThreadUtils.scala:372)
at scala.concurrent.Future$.$anonfun$apply(Future.scala:659)
at scala.util.Success.$anonfun$map(Try.scala:255)
at scala.util.Success.map(Try.scala:213)
at scala.concurrent.Future.$anonfun$map(Future.scala:292)
at scala.concurrent.impl.Promise.liftedTree1(Promise.scala:33)
at scala.concurrent.impl.Promise.$anonfun$transform(Promise.scala:33)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:64)
at java.base/java.util.concurrent.ForkJoinTask$RunnableExecuteAction.exec(Unknown Source)
at java.base/java.util.concurrent.ForkJoinTask.doExec(Unknown Source)
at java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(Unknown Source)
at java.base/java.util.concurrent.ForkJoinPool.scan(Unknown Source)
at java.base/java.util.concurrent.ForkJoinPool.runWorker(Unknown Source)
at java.base/java.util.concurrent.ForkJoinWorkerThread.run(Unknown Source)
有趣的是,此异常不会阻止执行程序立即启动,并且驱动程序和执行程序 pods 永远卡住,什么都不做。
我们没有在我们的应用程序中捕获异常,我们希望驱动程序和执行程序会停止,而不是浪费冗余资源。
我们怎样才能粉碎应用程序,使其不会永远停留在 Running
状态?
嗯,这很简单。
我必须捕获所有异常以确保无论如何都关闭 spark 上下文:
def main(args: Array[String]): Unit = {
// some code
implicit val sparkSession = SparkSession.builder().getOrCreate
try {
// application code with potential exceptions
} catch {
case exception: Exception =>
sparkSession.close()
throw exception
}
sparkSession.close()
}
这样所有资源都被释放并且驱动程序 pod 将其状态更改为 Error
作为例外。
编辑 - 以 Scala 的方式进行:
def main(args: Array[String]): Unit = {
// some code
implicit val sparkSession = SparkSession.builder().getOrCreate
Try {
// application code with potential exceptions
} match {
case Success(_) => None
case Failure(exception) =>
sparkSession.close()
throw exception
}
sparkSession.close()
}