Spark UDF 作为函数参数,UDF 不在函数范围内

Spark UDF as function parameter, UDF is not in function scope

我有一些 UDF 想作为函数参数与数据帧一起传递。

实现此目的的一种方法可能是在函数内创建 UDF,但这会创建和销毁 UDF 的多个实例而不重用它,这可能不是解决此问题的最佳方法。

这是一段示例代码 -

val lkpUDF = udf{(i: Int) => if (i > 0) 1 else 0}

val df =   inputDF1
    .withColumn("new_col", lkpUDF(col("c1")))
val df2 =   inputDF2.
  .withColumn("new_col", lkpUDF(col("c1")))

我不想做上面的事情,而是想做这样的事情 -

val lkpUDF = udf{(i: Int) => if (i > 0) 1 else 0}

def appendCols(df: DataFrame, lkpUDF: ?): DataFrame = {

    df
      .withColumn("new_col", lkpUDF(col("c1")))

  }
val df = appendCols(inputDF, lkpUDF)

上面的 UDF 非常简单,但在我的例子中,它可以是 return 基本类型或用户定义的案例 class 类型。任何想法/指示将不胜感激。谢谢

具有适当签名的函数必须是这样的:

import org.apache.spark.sql.UserDefinedFunction

def appendCols(df: DataFrame, func: UserDefinedFunction): DataFrame = {
    df.withColumn("new_col", func(col("col1")))
}

scala REPL 在return初始化值的类型方面很有帮助。

scala> val lkpUDF = udf{(i: Int) => if (i > 0) 1 else 0}
lkpUDF: org.apache.spark.sql.UserDefinedFunction = UserDefinedFunction(<function1>,IntegerType,List(IntegerType))

此外,如果您传递给 udf 包装器的函数签名由 Any return 类型组成(如果函数可以 return 原语或用户定义的情况 class), UDF 将无法编译并出现如下异常:

java.lang.UnsupportedOperationException: Schema for type Any is not supported