在 Spark 的驱动程序上捕获 Dataset foreachPartition() 函数中抛出的异常?
Catch an exception thrown in Dataset foreachPartition() function on the driver in Spark?
我试图找到一种方法来捕获 Spark 在其驱动程序的 foreachPartition() 方法中抛出的异常。在 Dataset 上使用 foreachPartition() 时,我传递了一个在各个 worker 中执行的 lambda 函数。此过程可能会引发异常。但是我想不出在驱动程序上捕获此异常的方法。看来这是设计使然。我可以做些什么来改变这个吗?这是我目前正在做的一个例子:
public static void driverClassExecute() {
Dataset<ModelDTO> dataset = getSomeData();
dataset.foreachPartition(AClass::methodCanThrowException);
//How can I recover if the above throws an exception?
}
public static void methodCanThrowException(Iterator<ModelDTO> it) throws Exception {
//do stuff. If bad, throw exception. This crashes the driver.
throw new Exception("any exception");
}
我也在使用 Eclipse Oxygen IDE 以防编译器很重要。
在这种情况下,foreachPartition 将抛出异常,因此您可以将该调用包装在 try-catch 中并像处理任何其他异常一样处理,尽管 spark 作业已经失败。为避免作业失败,您必须处理 methodCanThrowException 中的异常。
我认为您的做法可能略有不同,但 Spark 分区 DS 的典型执行将按以下步骤完成:
创建数据集 ==> 这已经通过 getSomeData() 函数完成。
使用dataset.foreachPartition() ==> 遍历分区您在 getSomeData().
中的拆分逻辑
下一级迭代通过 partition.forEachRemaining() 由执行者和 运行 适当的业务逻辑获取个人记录。
当您 运行 第 3 步时,您应该在此处传递每条记录您的 methodCanThrowException() 就像 Java -
partition.forEachRemaining(record -> {
try {
methodCanThrowException(record);
} catch (Exception e){
e.printStackTrace();
Logger.info(<record-key> "etc for debugging or tracking");
}
);
显然你可以通过多种方式处理异常,写入审计文件进行更正等。但是你的处理不会因为处理方法中的异常而失败。
我试图找到一种方法来捕获 Spark 在其驱动程序的 foreachPartition() 方法中抛出的异常。在 Dataset 上使用 foreachPartition() 时,我传递了一个在各个 worker 中执行的 lambda 函数。此过程可能会引发异常。但是我想不出在驱动程序上捕获此异常的方法。看来这是设计使然。我可以做些什么来改变这个吗?这是我目前正在做的一个例子:
public static void driverClassExecute() {
Dataset<ModelDTO> dataset = getSomeData();
dataset.foreachPartition(AClass::methodCanThrowException);
//How can I recover if the above throws an exception?
}
public static void methodCanThrowException(Iterator<ModelDTO> it) throws Exception {
//do stuff. If bad, throw exception. This crashes the driver.
throw new Exception("any exception");
}
我也在使用 Eclipse Oxygen IDE 以防编译器很重要。
在这种情况下,foreachPartition 将抛出异常,因此您可以将该调用包装在 try-catch 中并像处理任何其他异常一样处理,尽管 spark 作业已经失败。为避免作业失败,您必须处理 methodCanThrowException 中的异常。
我认为您的做法可能略有不同,但 Spark 分区 DS 的典型执行将按以下步骤完成:
创建数据集 ==> 这已经通过 getSomeData() 函数完成。
使用dataset.foreachPartition() ==> 遍历分区您在 getSomeData().
中的拆分逻辑
下一级迭代通过 partition.forEachRemaining() 由执行者和 运行 适当的业务逻辑获取个人记录。
当您 运行 第 3 步时,您应该在此处传递每条记录您的 methodCanThrowException() 就像 Java -
partition.forEachRemaining(record -> {
try {
methodCanThrowException(record);
} catch (Exception e){
e.printStackTrace();
Logger.info(<record-key> "etc for debugging or tracking");
}
);
显然你可以通过多种方式处理异常,写入审计文件进行更正等。但是你的处理不会因为处理方法中的异常而失败。