尝试访问 UDF 内的广播变量时,Spark 无限期冻结
Spark freezes indefinitely when trying to access broadcast variable inside UDF
public Dataset<Row> myfunc(SparkSession spark, Dataset<Row> dfa, Dataset<Row> dfb){
JavaSparkContext jsc = JavaSparkContext.fromSparkContext(spark.sparkContext());
Broadcast<Dataset<Row>> excRateBrdCast = jsc.broadcast(dfa); // very small local test DS . 5 rows 4 cols
log.info(" ##### " + excRateBrdCast.value().count()); //works
spark.udf().register("someudf", new UDF4<Date, String, String, Double, Double>(){
@Override
public Double call(Date cola, String colb, String colc, Double original){
Dataset<Row> excBrdcastRecv = excRateBrdCast.value();
Double newRate = original; // If I set a debugger here and do excBrdcastRecv.count() . It freezes no error. But no result
if(!colc.equals("SOME")){
Dataset<Row> ds6 = excBrdcastRecv.filter(row -> {
boolean cond1 = row.getAs("cola").toString().equals(cola.toString());
boolean cond2 = row.getAs("colb").toString().equals(colb);
return cond1 && cond2;
});
Double val9 = ds6.first().getAs("colc"); //Spark in local mode freezes here . No error. Just dont proceed
newRate = newRate*val9;
}
return newRate;
}
}, DataTypes.DoubleType);
Dataset<Row> newDs = dfb.withColumn
("addedColumn", callUDF("someudf", col("cola"), col("colb"), col("colc"), col("cold")));
return newDs;
}
一些提示 -
- 如果我删除对 excRateBrdCast.value() 的访问并发回硬编码值,它就可以正常工作。
- 将 spark 2.11 与 java
结合使用
- 所有数据集都是非常小的本地测试数据集,因此大小不是问题。
- 在尝试访问广播变量
Double newRate = original; // If I set a debugger here and do excBrdcastRecv.count() . It freezes no error. But no result
时没有收到 ant 错误,只是处理卡住了。与调用操作时相同
- 日志卡在
INFO DAGScheduler - Submitting 1 missing tasks from ResultStage 46 (MapPartitionsRDD[267] at first at PositionsControls.java:178) (first 15 tasks are for partitions Vector(0))
INFO TaskSchedulerImpl - Adding task set 46.0 with 1 tasks
- 运行 本地模式
嗯,上面的代码中有一个 class 逻辑错误。所以变量 excRateBrdCast
被广播了。然后 new UDF4
被注册。 Spark 实际上会在多个执行器机器上执行该 UDF。在那些机器上,spark 将无法看到 excRateBrdCast
,因此它将永远停止以等待 excRateBrdCast.value()
到达。所以一些我们需要如何将 excRateBrdCast
传递给 UDF。他们在代码块中的连续出现是一种欺骗。
所以你需要做的是在另一个 class 中使用 UDF。并且不要进行内联初始化。在 UDF 中定义一个参数化构造函数,它采用广播变量 excRateBrdCast
并在初始化期间传递它。
然后就可以看到广播变量了
public Dataset<Row> myfunc(SparkSession spark, Dataset<Row> dfa, Dataset<Row> dfb){
JavaSparkContext jsc = JavaSparkContext.fromSparkContext(spark.sparkContext());
Broadcast<Dataset<Row>> excRateBrdCast = jsc.broadcast(dfa); // very small local test DS . 5 rows 4 cols
log.info(" ##### " + excRateBrdCast.value().count()); //works
spark.udf().register("someudf", new UDF4<Date, String, String, Double, Double>(){
@Override
public Double call(Date cola, String colb, String colc, Double original){
Dataset<Row> excBrdcastRecv = excRateBrdCast.value();
Double newRate = original; // If I set a debugger here and do excBrdcastRecv.count() . It freezes no error. But no result
if(!colc.equals("SOME")){
Dataset<Row> ds6 = excBrdcastRecv.filter(row -> {
boolean cond1 = row.getAs("cola").toString().equals(cola.toString());
boolean cond2 = row.getAs("colb").toString().equals(colb);
return cond1 && cond2;
});
Double val9 = ds6.first().getAs("colc"); //Spark in local mode freezes here . No error. Just dont proceed
newRate = newRate*val9;
}
return newRate;
}
}, DataTypes.DoubleType);
Dataset<Row> newDs = dfb.withColumn
("addedColumn", callUDF("someudf", col("cola"), col("colb"), col("colc"), col("cold")));
return newDs;
}
一些提示 -
- 如果我删除对 excRateBrdCast.value() 的访问并发回硬编码值,它就可以正常工作。
- 将 spark 2.11 与 java 结合使用
- 所有数据集都是非常小的本地测试数据集,因此大小不是问题。
- 在尝试访问广播变量
Double newRate = original; // If I set a debugger here and do excBrdcastRecv.count() . It freezes no error. But no result
时没有收到 ant 错误,只是处理卡住了。与调用操作时相同 - 日志卡在
INFO DAGScheduler - Submitting 1 missing tasks from ResultStage 46 (MapPartitionsRDD[267] at first at PositionsControls.java:178) (first 15 tasks are for partitions Vector(0)) INFO TaskSchedulerImpl - Adding task set 46.0 with 1 tasks
- 运行 本地模式
嗯,上面的代码中有一个 class 逻辑错误。所以变量 excRateBrdCast
被广播了。然后 new UDF4
被注册。 Spark 实际上会在多个执行器机器上执行该 UDF。在那些机器上,spark 将无法看到 excRateBrdCast
,因此它将永远停止以等待 excRateBrdCast.value()
到达。所以一些我们需要如何将 excRateBrdCast
传递给 UDF。他们在代码块中的连续出现是一种欺骗。
所以你需要做的是在另一个 class 中使用 UDF。并且不要进行内联初始化。在 UDF 中定义一个参数化构造函数,它采用广播变量 excRateBrdCast
并在初始化期间传递它。
然后就可以看到广播变量了