什么是无类型 Scala UDF 和有类型 Scala UDF?他们有什么区别?

What are Untyped Scala UDF and Typed Scala UDF? What are their differences?

我使用 Spark 2.4 有一段时间了,最​​近几天才开始切换到 Spark 3.0。 运行 udf((x: Int) => x, IntegerType):

切换到 Spark 3.0 后出现此错误
Caused by: org.apache.spark.sql.AnalysisException: You're using untyped Scala UDF, which does not have the input type information. Spark may blindly pass null to the Scala closure with primitive-type argument, and the closure will see the default value of the Java type for the null argument, e.g. `udf((x: Int) => x, IntegerType)`, the result is 0 for null input. To get rid of this error, you could:
1. use typed Scala UDF APIs(without return type parameter), e.g. `udf((x: Int) => x)`
2. use Java UDF APIs, e.g. `udf(new UDF1[String, Integer] { override def call(s: String): Integer = s.length() }, IntegerType)`, if input types are all non primitive
3. set spark.sql.legacy.allowUntypedScalaUDF to true and use this API with caution;

解决方案是Spark自己提出的,google了一段时间后,我来到了Spark Migration guide页面:

In Spark 3.0, using org.apache.spark.sql.functions.udf(AnyRef, DataType) is not allowed by default. Remove the return type parameter to automatically switch to typed Scala udf is recommended, or set spark.sql.legacy.allowUntypedScalaUDF to true to keep using it. In Spark version 2.4 and below, if org.apache.spark.sql.functions.udf(AnyRef, DataType) gets a Scala closure with primitive-type argument, the returned UDF returns null if the input values is null. However, in Spark 3.0, the UDF returns the default value of the Java type if the input value is null. For example, val f = udf((x: Int) => x, IntegerType), f($"x") returns null in Spark 2.4 and below if column x is null, and return 0 in Spark 3.0. This behavior change is introduced because Spark 3.0 is built with Scala 2.12 by default.

source: Spark Migration Guide

我注意到我通常使用 function.udf API 的方法称为 udf(AnyRef, DataType),建议的解决方案是 udf(AnyRef), 称为 Typed Scala UDF.

我的理解对吗?即使经过更深入的搜索,我仍然找不到任何 material 解释什么是 UnTyped Scala UDF 和什么是 Typed Scala UDF。

所以我的问题是:它们是什么?它们有什么区别?

在类型化 Scala UDF 中,UDF 知道作为参数传递的列的类型,而在非类型化 Scala UDF 中,UDF 不知道作为参数传递的列的类型

在创建类型化 Scala UDF 时,作为参数传递的列类型和 UDF 的输出是从函数参数和输出类型推断的,而在创建非类型化 Scala UDF 时,根本没有类型推断,无论是对于参数或输出。

令人困惑的是,在创建类型化 UDF 时,类型是从函数中推断出来的,而不是作为参数显式传递的。更明确地说,您可以按如下方式编写类型化 UDF 创建:

val my_typed_udf = udf[Int, Int]((x: Int) => Int)

现在,让我们看看您提出的两点。

To my understanding, the first one (eg udf(AnyRef, DataType)) looks more strictly typed than the second one (eg udf(AnyRef)) where the first one has its output type explicitly defined and the second one does not, hence my confusion on why it's called UnTyped.

根据 spark functions scaladoc,将函数转换为 UDF 的 udf 函数的签名实际上是,对于第一个:

def udf(f: AnyRef, dataType: DataType): UserDefinedFunction 

第二个:

def udf[RT: TypeTag, A1: TypeTag](f: Function1[A1, RT]): UserDefinedFunction

所以第二个实际上比第一个有更多类型,因为第二个考虑了作为参数传递的函数的类型,而第一个删除了函数的类型。

这就是为什么在第一个你需要定义 return 类型的原因,因为 spark 需要这个信息但是不能从作为参数传递的函数推断它,因为它的 return 类型被删除了,而在第二个中,return 类型是从作为参数传递的函数中推断出来的。

Also the function got passed to udf, which is (x:Int) => x, clearly has its input type defined but Spark claiming You're using untyped Scala UDF, which does not have the input type information?

这里重要的不是函数,而是 Spark 如何从这个函数创建 UDF。

在这两种情况下,要转换为 UDF 的函数都定义了输入和 return 类型,但在使用 udf(AnyRef, DataType) 创建 UDF 时,这些类型会被删除且不会被考虑在内。

这并没有回答您最初关于不同 UDF 是什么的问题,但如果您想消除错误,在 Python 中,您可以在脚本中包含这一行:spark.sql("set spark.sql.legacy.allowUntypedScalaUDF=true").