在 Spark Scala UDF 中定义 return 值

Define return value in Spark Scala UDF

想象一下下面的代码:

def myUdf(arg: Int) = udf((vector: MyData) => {
  // complex logic that returns a Double
})

如何为 myUdf 定义 return 类型,以便查看代码的人立即知道它 return 是一个 Double?

带有 lambda 函数的 UDF 没有什么特别之处,它们的行为就像 scala lambda 函数(参见 Specifying the lambda return type in Scala)所以你可以这样做:

def myUdf(arg: Int) = udf(((vector: MyData) => {
  // complex logic that returns a Double
}): (MyData => Double))

或者明确定义您的函数:

def myFuncWithArg(arg: Int) {
  def myFunc(vector: MyData): Double = {
     // complex logic that returns a Double. Use arg here
  }
  myFunc _
}

def myUdf(arg: Int) = udf(myFuncWithArg(arg))

我看到有两种方法,要么先定义一个方法,然后将其提升为一个函数

def myMethod(vector:MyData) : Double = {
  // complex logic that returns a Double
}

val myUdf = udf(myMethod _)

或者先定义一个显式类型的函数:

val myFunction: Function1[MyData,Double] = (vector:MyData) => {
  // complex logic that returns a Double
}

val myUdf = udf(myFunction)

我通常对我的 UDF 使用第一种方法

Spark functions 定义了几个 udf 具有以下 modifier/type 的方法:static <RT,A1, ..., A10> UserDefinedFunction

您可以在方括号中指定 input/output 数据类型,如下所示:

def myUdf(arg: Int) = udf[Double, MyData]((vector: MyData) => {
  // complex logic that returns a Double
})

您可以将类型参数传递给 udf,但您需要先传递 return 类型,这似乎违反直觉,然后是 [ReturnType, ArgTypes...] 之类的输入类型,至少Spark 2.3.x。使用原始示例(这似乎是基于 arg 的柯里化函数):

def myUdf(arg: Int) = udf[Double, Seq[Int]]((vector: Seq[Int]) => {
  13.37 // whatever
})