尝试访问 UDF 内的广播变量时​​,Spark 无限期冻结

Spark freezes indefinitely when trying to access broadcast variable inside UDF

 public Dataset<Row> myfunc(SparkSession spark, Dataset<Row> dfa, Dataset<Row> dfb){
    JavaSparkContext jsc = JavaSparkContext.fromSparkContext(spark.sparkContext());

    Broadcast<Dataset<Row>> excRateBrdCast = jsc.broadcast(dfa); // very small local test DS . 5 rows 4 cols
    log.info(" ##### " + excRateBrdCast.value().count()); //works
    spark.udf().register("someudf", new UDF4<Date, String, String, Double, Double>(){
        @Override
        public Double call(Date cola, String colb, String colc, Double original){
            Dataset<Row> excBrdcastRecv = excRateBrdCast.value();
            Double newRate = original; // If I set a debugger here and do excBrdcastRecv.count() . It freezes no error. But no result
            if(!colc.equals("SOME")){
                Dataset<Row> ds6 = excBrdcastRecv.filter(row -> {
                    boolean cond1 = row.getAs("cola").toString().equals(cola.toString());
                    boolean cond2 = row.getAs("colb").toString().equals(colb);
                    return cond1 && cond2;
                });
                Double val9 = ds6.first().getAs("colc"); //Spark in local mode freezes here . No error. Just dont proceed
                newRate = newRate*val9;
            }
            return newRate;
        }
    }, DataTypes.DoubleType);

    Dataset<Row> newDs = dfb.withColumn
            ("addedColumn", callUDF("someudf", col("cola"), col("colb"), col("colc"), col("cold")));

    return newDs;
}

一些提示 -

  1. 如果我删除对 excRateBrdCast.value() 的访问并发回硬编码值,它就可以正常工作。
  2. 将 spark 2.11 与 java
  3. 结合使用
  4. 所有数据集都是非常小的本地测试数据集,因此大小不是问题。
  5. 在尝试访问广播变量 Double newRate = original; // If I set a debugger here and do excBrdcastRecv.count() . It freezes no error. But no result 时没有收到 ant 错误,只是处理卡住了。与调用操作时相同
  6. 日志卡在 INFO DAGScheduler - Submitting 1 missing tasks from ResultStage 46 (MapPartitionsRDD[267] at first at PositionsControls.java:178) (first 15 tasks are for partitions Vector(0)) INFO TaskSchedulerImpl - Adding task set 46.0 with 1 tasks
  7. 运行 本地模式

嗯,上面的代码中有一个 class 逻辑错误。所以变量 excRateBrdCast 被广播了。然后 new UDF4 被注册。 Spark 实际上会在多个执行器机器上执行该 UDF。在那些机器上,spark 将无法看到 excRateBrdCast,因此它将永远停止以等待 excRateBrdCast.value() 到达。所以一些我们需要如何将 excRateBrdCast 传递给 UDF。他们在代码块中的连续出现是一种欺骗。

所以你需要做的是在另一个 class 中使用 UDF。并且不要进行内联初始化。在 UDF 中定义一个参数化构造函数,它采用广播变量 excRateBrdCast 并在初始化期间传递它。

然后就可以看到广播变量了