有没有办法在 Spark 中捕获 executor killed 异常?
Is there a way to catch executor killed exception in Spark?
在我的 Spark 程序执行期间,有时(对我来说原因仍然是个谜)yarn 会杀死容器(执行程序)并给出超出内存限制的消息。尽管 Spark 通过生成新容器重新执行任务,但我的程序确实恢复了。然而,在我的程序中,一个任务也在磁盘上创建了一些中间文件。当一个容器被杀死时,这些文件就会被留下。有没有一种方法可以捕获被杀死的执行者作为异常,以便我可以删除留下的中间文件。显然,异常处理代码还需要 运行 在执行程序 运行 所在的同一节点上,以便我可以从那里删除文件。
作为一种选择,您可以尝试使用 SparkListener
功能。
因此,您可以创建自己的 class 并实现 SparkListener
接口以连接到不言自明的可用事件。
然后您需要将该自定义侦听器添加到 SparkContext
。
有 2 个选项可用:
SparkContext.addSparkListener(<your custom listener>)
- 通过
spark.extraListeners
属性,更多信息在这里 http://spark.apache.org/docs/latest/configuration.html#available-properties
在 @Taras Matyashovskyy 答案之上添加。
You can Use SparkListener and intercept SparkListener (Executor) events.
下面是可用的侦听器事件列表。
SparkListenerApplicationStart
SparkListenerJobStart
SparkListenerStageSubmitted
SparkListenerTaskStart
SparkListenerTaskGettingResult
SparkListenerTaskEnd
SparkListenerStageCompleted
SparkListenerJobEnd
SparkListenerApplicationEnd
SparkListenerEnvironmentUpdate
SparkListenerBlockManagerAdded
SparkListenerBlockManagerRemoved
SparkListenerBlockUpdated
SparkListenerUnpersistRDD
SparkListenerExecutorAdded
SparkListenerExecutorRemoved
/**
* Lives in the driver to receive heartbeats from executors..
*/
private[spark] class HeartbeatReceiver(sc: SparkContext, clock: Clock)
extends SparkListener with ThreadSafeRpcEndpoint with Logging {
def this(sc: SparkContext) {
this(sc, new SystemClock)
}
sc.addSparkListener(this) ...
请查看removed reason哪个可能适合你(我没试过)
在我的 Spark 程序执行期间,有时(对我来说原因仍然是个谜)yarn 会杀死容器(执行程序)并给出超出内存限制的消息。尽管 Spark 通过生成新容器重新执行任务,但我的程序确实恢复了。然而,在我的程序中,一个任务也在磁盘上创建了一些中间文件。当一个容器被杀死时,这些文件就会被留下。有没有一种方法可以捕获被杀死的执行者作为异常,以便我可以删除留下的中间文件。显然,异常处理代码还需要 运行 在执行程序 运行 所在的同一节点上,以便我可以从那里删除文件。
作为一种选择,您可以尝试使用 SparkListener
功能。
因此,您可以创建自己的 class 并实现 SparkListener
接口以连接到不言自明的可用事件。
然后您需要将该自定义侦听器添加到 SparkContext
。
有 2 个选项可用:
SparkContext.addSparkListener(<your custom listener>)
- 通过
spark.extraListeners
属性,更多信息在这里 http://spark.apache.org/docs/latest/configuration.html#available-properties
在 @Taras Matyashovskyy 答案之上添加。
You can Use SparkListener and intercept SparkListener (Executor) events.
下面是可用的侦听器事件列表。
SparkListenerApplicationStart
SparkListenerJobStart
SparkListenerStageSubmitted
SparkListenerTaskStart
SparkListenerTaskGettingResult
SparkListenerTaskEnd
SparkListenerStageCompleted
SparkListenerJobEnd
SparkListenerApplicationEnd
SparkListenerEnvironmentUpdate
SparkListenerBlockManagerAdded
SparkListenerBlockManagerRemoved
SparkListenerBlockUpdated
SparkListenerUnpersistRDD
SparkListenerExecutorAdded
SparkListenerExecutorRemoved
/**
* Lives in the driver to receive heartbeats from executors..
*/
private[spark] class HeartbeatReceiver(sc: SparkContext, clock: Clock)
extends SparkListener with ThreadSafeRpcEndpoint with Logging {
def this(sc: SparkContext) {
this(sc, new SystemClock)
}
sc.addSparkListener(this) ...
请查看removed reason哪个可能适合你(我没试过)