Apache Spark:如何取消代码中的作业并终止 运行 任务?
Apache Spark: how to cancel job in code and kill running tasks?
我运行在客户端模式下使用 Yarn(版本 2.6.0)在 Hadoop 集群上安装 Spark 应用程序(版本 1.6.0)。我有一段代码 运行 是一个很长的计算,如果它花费的时间太长我想杀死它(然后 运行 一些其他函数代替)。
这是一个例子:
val conf = new SparkConf().setAppName("TIMEOUT_TEST")
val sc = new SparkContext(conf)
val lst = List(1,2,3)
// setting up an infite action
val future = sc.parallelize(lst).map(while (true) _).collectAsync()
try {
Await.result(future, Duration(30, TimeUnit.SECONDS))
println("success!")
} catch {
case _:Throwable =>
future.cancel()
println("timeout")
}
// sleep for 1 hour to allow inspecting the application in yarn
Thread.sleep(60*60*1000)
sc.stop()
超时设置为30秒,当然计算是无限的,所以等待future的结果会抛出Exception,会被捕获,然后future会被取消,备份功能将执行。
这一切都很好,除了被取消的作业没有完全终止:当查看应用程序的网络 UI 时,作业被标记为失败,但我可以看到仍然有 运行里面有任务。
当我使用 SparkContext.cancelAllJobs 或 SparkContext.cancelJobGroup 时会发生同样的事情。问题是,即使我设法继续我的程序,已取消作业的 运行ning 任务仍在占用宝贵的资源(这最终会使我的速度减慢到几乎停止)。
总结一下:如何终止 Spark 作业同时终止该作业的所有 运行ning 任务? (与现在发生的情况相反,即停止 运行ning 新任务的工作,但让当前 运行ning 任务完成)
更新:
在长时间忽略这个问题之后,我们找到了一个凌乱但有效的小解决方法。我们没有尝试从 Spark 应用程序中终止相应的 Spark Job/Stage,而是在发生超时时简单地记录了所有活动阶段的阶段 ID,并向 URL 发出了一个 HTTP GET 请求Spark Web UI 用于终止所述阶段。
根据 setJobGroup:
"If interruptOnCancel is set to true for the job group, then job cancellation will result in Thread.interrupt() being called on the job's executor threads."
因此您地图中的 anno 函数必须像这样可中断:
val future = sc.parallelize(lst).map(while (!Thread.interrupted) _).collectAsync()
我不知道这回答了你的问题。
我的需要是终止挂起时间过长的作业(我的作业从 Oracle 表中提取数据,但由于某些未知原因,连接很少永远挂起)。
经过一番研究,我得出了这个解决方案:
val MAX_JOB_SECONDS = 100
val statusTracker = sc.statusTracker;
val sparkListener = new SparkListener()
{
override def onJobStart(jobStart : SparkListenerJobStart)
{
val jobId = jobStart.jobId
val f = Future
{
var c = MAX_JOB_SECONDS;
var mustCancel = false;
var running = true;
while(!mustCancel && running)
{
Thread.sleep(1000);
c = c - 1;
mustCancel = c <= 0;
val jobInfo = statusTracker.getJobInfo(jobId);
if(jobInfo!=null)
{
val v = jobInfo.get.status()
running = v == JobExecutionStatus.RUNNING
}
else
running = false;
}
if(mustCancel)
{
sc.cancelJob(jobId)
}
}
}
}
sc.addSparkListener(sparkListener)
try
{
val df = spark.sql("SELECT * FROM VERY_BIG_TABLE") //just an example of long-running-job
println(df.count)
}
catch
{
case exc: org.apache.spark.SparkException =>
{
if(exc.getMessage.contains("cancelled"))
throw new Exception("Job forcibly cancelled")
else
throw exc
}
case ex : Throwable =>
{
println(s"Another exception: $ex")
}
}
finally
{
sc.removeSparkListener(sparkListener)
}
为了将来的访问者,Spark 从 2.0.3 开始引入了 Spark task reaper,它确实(或多或少)解决了这种情况,并且是 built-in 解决方案。
请注意,如果任务没有响应,它最终会杀死一个执行器。
此外,some built-in Spark sources of data 已经过重构,可以更好地响应 spark:
对于 1.6.0 版本,Zohar 的解决方案是“混乱但高效”的解决方案。
我运行在客户端模式下使用 Yarn(版本 2.6.0)在 Hadoop 集群上安装 Spark 应用程序(版本 1.6.0)。我有一段代码 运行 是一个很长的计算,如果它花费的时间太长我想杀死它(然后 运行 一些其他函数代替)。
这是一个例子:
val conf = new SparkConf().setAppName("TIMEOUT_TEST")
val sc = new SparkContext(conf)
val lst = List(1,2,3)
// setting up an infite action
val future = sc.parallelize(lst).map(while (true) _).collectAsync()
try {
Await.result(future, Duration(30, TimeUnit.SECONDS))
println("success!")
} catch {
case _:Throwable =>
future.cancel()
println("timeout")
}
// sleep for 1 hour to allow inspecting the application in yarn
Thread.sleep(60*60*1000)
sc.stop()
超时设置为30秒,当然计算是无限的,所以等待future的结果会抛出Exception,会被捕获,然后future会被取消,备份功能将执行。
这一切都很好,除了被取消的作业没有完全终止:当查看应用程序的网络 UI 时,作业被标记为失败,但我可以看到仍然有 运行里面有任务。
当我使用 SparkContext.cancelAllJobs 或 SparkContext.cancelJobGroup 时会发生同样的事情。问题是,即使我设法继续我的程序,已取消作业的 运行ning 任务仍在占用宝贵的资源(这最终会使我的速度减慢到几乎停止)。
总结一下:如何终止 Spark 作业同时终止该作业的所有 运行ning 任务? (与现在发生的情况相反,即停止 运行ning 新任务的工作,但让当前 运行ning 任务完成)
更新:
在长时间忽略这个问题之后,我们找到了一个凌乱但有效的小解决方法。我们没有尝试从 Spark 应用程序中终止相应的 Spark Job/Stage,而是在发生超时时简单地记录了所有活动阶段的阶段 ID,并向 URL 发出了一个 HTTP GET 请求Spark Web UI 用于终止所述阶段。
根据 setJobGroup:
"If interruptOnCancel is set to true for the job group, then job cancellation will result in Thread.interrupt() being called on the job's executor threads."
因此您地图中的 anno 函数必须像这样可中断:
val future = sc.parallelize(lst).map(while (!Thread.interrupted) _).collectAsync()
我不知道这回答了你的问题。 我的需要是终止挂起时间过长的作业(我的作业从 Oracle 表中提取数据,但由于某些未知原因,连接很少永远挂起)。
经过一番研究,我得出了这个解决方案:
val MAX_JOB_SECONDS = 100
val statusTracker = sc.statusTracker;
val sparkListener = new SparkListener()
{
override def onJobStart(jobStart : SparkListenerJobStart)
{
val jobId = jobStart.jobId
val f = Future
{
var c = MAX_JOB_SECONDS;
var mustCancel = false;
var running = true;
while(!mustCancel && running)
{
Thread.sleep(1000);
c = c - 1;
mustCancel = c <= 0;
val jobInfo = statusTracker.getJobInfo(jobId);
if(jobInfo!=null)
{
val v = jobInfo.get.status()
running = v == JobExecutionStatus.RUNNING
}
else
running = false;
}
if(mustCancel)
{
sc.cancelJob(jobId)
}
}
}
}
sc.addSparkListener(sparkListener)
try
{
val df = spark.sql("SELECT * FROM VERY_BIG_TABLE") //just an example of long-running-job
println(df.count)
}
catch
{
case exc: org.apache.spark.SparkException =>
{
if(exc.getMessage.contains("cancelled"))
throw new Exception("Job forcibly cancelled")
else
throw exc
}
case ex : Throwable =>
{
println(s"Another exception: $ex")
}
}
finally
{
sc.removeSparkListener(sparkListener)
}
为了将来的访问者,Spark 从 2.0.3 开始引入了 Spark task reaper,它确实(或多或少)解决了这种情况,并且是 built-in 解决方案。 请注意,如果任务没有响应,它最终会杀死一个执行器。
此外,some built-in Spark sources of data 已经过重构,可以更好地响应 spark:
对于 1.6.0 版本,Zohar 的解决方案是“混乱但高效”的解决方案。