Spark:作业重新启动并重试
Spark: Job restart and retries
假设您有 Spark + 独立集群管理器。您使用一些配置打开了 spark 会话,并希望使用不同的参数并行启动 SomeSparkJob
40 次。
问题
- 如何设置作业失败的返还金额?
- 如何在失败时以编程方式重新启动作业?如果作业因缺乏资源而失败,这可能很有用。比我可以一个一个地启动所有需要额外资源的工作。
- 如何在作业失败时重新启动 spark 应用程序? 如果作业即使同时启动也缺少资源,这可能很有用。比起更改内核、CPU 等配置,我需要在独立集群管理器中重新启动应用程序。
我的解决方法
1) 我很确定第 1 点是可能的,因为它可能在 spark local mode。我只是不知道如何在独立模式下做到这一点。
2-3) 可以像 spark.sparkContext().addSparkListener(new SparkListener() {
这样的 spark 上下文传递监听器。但似乎 SparkListener
缺少失败回调。
还有很多方法文档很差。我从未使用过它们,但也许它们可以帮助解决我的问题。
spark.sparkContext().dagScheduler().runJob();
spark.sparkContext().runJob()
spark.sparkContext().submitJob()
spark.sparkContext().taskScheduler().submitTasks();
spark.sparkContext().dagScheduler().handleJobCancellation();
spark.sparkContext().statusTracker()
您可以使用 SparkLauncher 并控制流量。
import org.apache.spark.launcher.SparkLauncher;
public class MyLauncher {
public static void main(String[] args) throws Exception {
Process spark = new SparkLauncher()
.setAppResource("/my/app.jar")
.setMainClass("my.spark.app.Main")
.setMaster("local")
.setConf(SparkLauncher.DRIVER_MEMORY, "2g")
.launch();
spark.waitFor();
}
}
有关详细信息,请参阅 API。
由于它创建了进程,您可以检查进程状态并重试,例如尝试以下操作:
public boolean isAlive()
如果进程未重新启动,请参阅API了解更多详情。
希望这能让我们对如何实现您在问题中提到的目标有更深入的了解。可能有更多方法可以做同样的事情,但我想分享这种方法。
干杯!
检查您的 spark.sql.broadcastTimeout 和 spark.broadcast.blockSize 属性,尝试增加它们。
假设您有 Spark + 独立集群管理器。您使用一些配置打开了 spark 会话,并希望使用不同的参数并行启动 SomeSparkJob
40 次。
问题
- 如何设置作业失败的返还金额?
- 如何在失败时以编程方式重新启动作业?如果作业因缺乏资源而失败,这可能很有用。比我可以一个一个地启动所有需要额外资源的工作。
- 如何在作业失败时重新启动 spark 应用程序? 如果作业即使同时启动也缺少资源,这可能很有用。比起更改内核、CPU 等配置,我需要在独立集群管理器中重新启动应用程序。
我的解决方法
1) 我很确定第 1 点是可能的,因为它可能在 spark local mode。我只是不知道如何在独立模式下做到这一点。
2-3) 可以像 spark.sparkContext().addSparkListener(new SparkListener() {
这样的 spark 上下文传递监听器。但似乎 SparkListener
缺少失败回调。
还有很多方法文档很差。我从未使用过它们,但也许它们可以帮助解决我的问题。
spark.sparkContext().dagScheduler().runJob();
spark.sparkContext().runJob()
spark.sparkContext().submitJob()
spark.sparkContext().taskScheduler().submitTasks();
spark.sparkContext().dagScheduler().handleJobCancellation();
spark.sparkContext().statusTracker()
您可以使用 SparkLauncher 并控制流量。
import org.apache.spark.launcher.SparkLauncher;
public class MyLauncher {
public static void main(String[] args) throws Exception {
Process spark = new SparkLauncher()
.setAppResource("/my/app.jar")
.setMainClass("my.spark.app.Main")
.setMaster("local")
.setConf(SparkLauncher.DRIVER_MEMORY, "2g")
.launch();
spark.waitFor();
}
}
有关详细信息,请参阅 API。
由于它创建了进程,您可以检查进程状态并重试,例如尝试以下操作:
public boolean isAlive()
如果进程未重新启动,请参阅API了解更多详情。
希望这能让我们对如何实现您在问题中提到的目标有更深入的了解。可能有更多方法可以做同样的事情,但我想分享这种方法。
干杯!
检查您的 spark.sql.broadcastTimeout 和 spark.broadcast.blockSize 属性,尝试增加它们。