使用 callUDF 创建链接 UDF 调用的方法

Using callUDF to create a method that chains UDF calls

我猴子修补了 org.apache.spark.sql.Column class 以添加 chainUDF 方法。它适用于不带参数的 udfs,我需要帮助使其对带参数的 udfs 通用。

这是当前的 chainUDF 方法定义。

object ColumnExt {

  implicit class ColumnMethods(c: Column) {

    def chainUDF(udfName: String): Column = {
      callUDF(udfName, c)
    }

  }

}

下面是 chainUDF 方法的实际应用。

def appendZ(s: String): String = {
  s"${s}Z"
}

spark.udf.register("appendZUdf", appendZ _)

def prependA(s: String): String = {
  s"A${s}"
}

spark.udf.register("prependAUdf", prependA _)

val hobbiesDf = Seq(
  ("dance"),
  ("sing")
).toDF("word")

val actualDf = hobbiesDf.withColumn(
  "fun",
  col("word").chainUDF("appendZUdf").chainUDF("prependAUdf")
)

我想更新 chainUDF 方法定义,使其接受一个可选的 Column 参数列表。像这样:

def appendWord(s: String, word: String): String = {
  s"${s}${word}"
}

spark.udf.register("appendWordUdf", appendWord _)

val hobbiesDf = Seq(
  ("dance"),
  ("sing")
).toDF("word")

val actualDf = hobbiesDf.withColumn(
  "fun",
  col("word").chainUDF("appendZUdf").chainUDF("appendWordUdf", lit("cool"))
)

我认为我们需要将 chainUDF 方法定义更新为如下内容:

object ColumnExt {

  implicit class ColumnMethods(c: Column) {

    def chainUDF(udfName: String, cols: Column* = some_default_value): Column = {
      callUDF(udfName, c + cols)
    }

  }

}

我敢肯定有一些 Scala 魔术可以实现这一点。

签名是:

def callUDF(udfName: String, cols: Column*): Column

所以你不需要魔法:

def chainUDF(udfName: String, cols: Column* = some_default_value): Column = {
  callUDF(udfName, c +: cols: _*)
}