即使进程成功完成,spark-submit 也不会退出
spark-submit not exiting even after process finishes successfully
我写了一个简单的应用程序,它 运行 很好,但是当我通过 spark-submit 提交代码时,即使在调用 close() 之后,spark-submit 会话还没有完成,我需要杀死 PID。
下面是代码片段
object FaultApp {
case class Person(name: String, age: Long)
def main(args: Array[String]):Unit = {
val spark = SparkSession
.builder
.enableHiveSupport()
.config("spark.scheduler.mode", "FAIR")
.appName("parjobs")
.getOrCreate()
import spark.implicits._
val pool = Executors.newFixedThreadPool(5)
// create the implicit ExecutionContext based on our thread pool
implicit val xc = ExecutionContext.fromExecutorService(pool)
import Function._
val caseClass = Seq(Person("X", 32)
,Person("Y", 37)
,Person("Z", 37)
,Person("A", 6)
)
val caseClassDS = caseClass.toDF()
val taskA = write_first(caseClassDS)
Await.result(Future.sequence(Seq(taskA)), Duration(1, MINUTES))
spark.stop()
println("After Spark Stop command")
}
}
object Function {
def write_first (ds : DataFrame)(implicit xc: ExecutionContext) = Future {
Thread.sleep(10000)
ds.write.format("orc").mode("overwrite")
.option("compression", "zlib")
.saveAsTable("save_1")
}
}
我正在使用以下命令提交作业
spark-submit --master yarn --deploy-mode client fault_application-assembly-1.0-SNAPSHOT.jar --executor-memory 1G --executor-cores 2 --driver-memory 1G
下面是日志的最后几行
18/04/18 15:15:20 INFO SchedulerExtensionServices: Stopping
SchedulerExtensionServices
(serviceOption=None,
services=List(),
started=false)
18/04/18 15:15:20 INFO YarnClientSchedulerBackend: Stopped
18/04/18 15:15:20 INFO MapOutputTrackerMasterEndpoint:
MapOutputTrackerMasterEndpoint stopped!
18/04/18 15:15:20 INFO MemoryStore: MemoryStore cleared
18/04/18 15:15:20 INFO BlockManager: BlockManager stopped
18/04/18 15:15:20 INFO BlockManagerMaster: BlockManagerMaster stopped
18/04/18 15:15:20 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
18/04/18 15:15:20 INFO SparkContext: Successfully stopped SparkContext
After Spark Stop command
如有任何帮助或建议,我们将不胜感激。
那是因为您正在创建一个带有线程池的执行上下文,所以您的程序在线程池也关闭之前不会关闭。
在spark.stop()
之后,添加
xc.shutdown()
println("After shutdown.")
或者,您可以只使用全局执行上下文,而不是为您的未来创建新的执行上下文:
implicit val executor = scala.concurrent.ExecutionContext.global
我写了一个简单的应用程序,它 运行 很好,但是当我通过 spark-submit 提交代码时,即使在调用 close() 之后,spark-submit 会话还没有完成,我需要杀死 PID。
下面是代码片段
object FaultApp {
case class Person(name: String, age: Long)
def main(args: Array[String]):Unit = {
val spark = SparkSession
.builder
.enableHiveSupport()
.config("spark.scheduler.mode", "FAIR")
.appName("parjobs")
.getOrCreate()
import spark.implicits._
val pool = Executors.newFixedThreadPool(5)
// create the implicit ExecutionContext based on our thread pool
implicit val xc = ExecutionContext.fromExecutorService(pool)
import Function._
val caseClass = Seq(Person("X", 32)
,Person("Y", 37)
,Person("Z", 37)
,Person("A", 6)
)
val caseClassDS = caseClass.toDF()
val taskA = write_first(caseClassDS)
Await.result(Future.sequence(Seq(taskA)), Duration(1, MINUTES))
spark.stop()
println("After Spark Stop command")
}
}
object Function {
def write_first (ds : DataFrame)(implicit xc: ExecutionContext) = Future {
Thread.sleep(10000)
ds.write.format("orc").mode("overwrite")
.option("compression", "zlib")
.saveAsTable("save_1")
}
}
我正在使用以下命令提交作业
spark-submit --master yarn --deploy-mode client fault_application-assembly-1.0-SNAPSHOT.jar --executor-memory 1G --executor-cores 2 --driver-memory 1G
下面是日志的最后几行
18/04/18 15:15:20 INFO SchedulerExtensionServices: Stopping
SchedulerExtensionServices
(serviceOption=None,
services=List(),
started=false)
18/04/18 15:15:20 INFO YarnClientSchedulerBackend: Stopped
18/04/18 15:15:20 INFO MapOutputTrackerMasterEndpoint:
MapOutputTrackerMasterEndpoint stopped!
18/04/18 15:15:20 INFO MemoryStore: MemoryStore cleared
18/04/18 15:15:20 INFO BlockManager: BlockManager stopped
18/04/18 15:15:20 INFO BlockManagerMaster: BlockManagerMaster stopped
18/04/18 15:15:20 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
18/04/18 15:15:20 INFO SparkContext: Successfully stopped SparkContext
After Spark Stop command
如有任何帮助或建议,我们将不胜感激。
那是因为您正在创建一个带有线程池的执行上下文,所以您的程序在线程池也关闭之前不会关闭。
在spark.stop()
之后,添加
xc.shutdown()
println("After shutdown.")
或者,您可以只使用全局执行上下文,而不是为您的未来创建新的执行上下文:
implicit val executor = scala.concurrent.ExecutionContext.global