SparkSQL:如何处理用户定义函数中的空值?
SparkSQL: How to deal with null values in user defined function?
给定 Table 1,其中一列 "x" 类型为 String。
我想创建 Table 2,其列 "y" 是 "x".
中给出的日期字符串的整数表示
基本 是在 "y".
列中保留 null
个值
Table 1(数据框 df1):
+----------+
| x|
+----------+
|2015-09-12|
|2015-09-13|
| null|
| null|
+----------+
root
|-- x: string (nullable = true)
Table 2(数据框 df2):
+----------+--------+
| x| y|
+----------+--------+
| null| null|
| null| null|
|2015-09-12|20150912|
|2015-09-13|20150913|
+----------+--------+
root
|-- x: string (nullable = true)
|-- y: integer (nullable = true)
而将列 "x" 的值转换为列 "y" 的值的用户定义函数 (udf) 是:
val extractDateAsInt = udf[Int, String] (
(d:String) => d.substring(0, 10)
.filterNot( "-".toSet)
.toInt )
并且有效,无法处理空值。
尽管如此,我可以做类似
的事情
val extractDateAsIntWithNull = udf[Int, String] (
(d:String) =>
if (d != null) d.substring(0, 10).filterNot( "-".toSet).toInt
else 1 )
我没找到办法,通过udfs给"produce" null
值(当然,因为Int
s不能null
)。
我目前创建df2 (Table 2)的方案如下:
// holds data of table 1
val df1 = ...
// filter entries from df1, that are not null
val dfNotNulls = df1.filter(df1("x")
.isNotNull)
.withColumn("y", extractDateAsInt(df1("x")))
.withColumnRenamed("x", "right_x")
// create df2 via a left join on df1 and dfNotNull having
val df2 = df1.join( dfNotNulls, df1("x") === dfNotNulls("right_x"), "leftouter" ).drop("right_x")
问题:
- 当前的解决方案似乎很麻烦(并且可能效率不高。性能)。有没有更好的方法?
- @Spark-developers:是否有类型
NullableInt
计划/可用,以便可以使用以下 udf(参见代码摘录)?
代码摘录
val extractDateAsNullableInt = udf[NullableInt, String] (
(d:String) =>
if (d != null) d.substring(0, 10).filterNot( "-".toSet).toInt
else null )
这是 Option
派上用场的地方:
val extractDateAsOptionInt = udf((d: String) => d match {
case null => None
case s => Some(s.substring(0, 10).filterNot("-".toSet).toInt)
})
或者在一般情况下使其稍微更安全:
import scala.util.Try
val extractDateAsOptionInt = udf((d: String) => Try(
d.substring(0, 10).filterNot("-".toSet).toInt
).toOption)
所有功劳归于 Dmitriy Selivanov who've pointed out this solution as a (missing?) edit here。
替代方法是在 UDF 外部处理 null
:
import org.apache.spark.sql.functions.{lit, when}
import org.apache.spark.sql.types.IntegerType
val extractDateAsInt = udf(
(d: String) => d.substring(0, 10).filterNot("-".toSet).toInt
)
df.withColumn("y",
when($"x".isNull, lit(null))
.otherwise(extractDateAsInt($"x"))
.cast(IntegerType)
)
补充代码
使用@zero323 的 nice 答案,我创建了以下代码,以使用户定义的函数可用于处理所描述的空值。希望对其他人有帮助!
/**
* Set of methods to construct [[org.apache.spark.sql.UserDefinedFunction]]s that
* handle `null` values.
*/
object NullableFunctions {
import org.apache.spark.sql.functions._
import scala.reflect.runtime.universe.{TypeTag}
import org.apache.spark.sql.UserDefinedFunction
/**
* Given a function A1 => RT, create a [[org.apache.spark.sql.UserDefinedFunction]] such that
* * if fnc input is null, None is returned. This will create a null value in the output Spark column.
* * if A1 is non null, Some( f(input) will be returned, thus creating f(input) as value in the output column.
* @param f function from A1 => RT
* @tparam RT return type
* @tparam A1 input parameter type
* @return a [[org.apache.spark.sql.UserDefinedFunction]] with the behaviour describe above
*/
def nullableUdf[RT: TypeTag, A1: TypeTag](f: Function1[A1, RT]): UserDefinedFunction = {
udf[Option[RT],A1]( (i: A1) => i match {
case null => None
case s => Some(f(i))
})
}
/**
* Given a function A1, A2 => RT, create a [[org.apache.spark.sql.UserDefinedFunction]] such that
* * if on of the function input parameters is null, None is returned.
* This will create a null value in the output Spark column.
* * if both input parameters are non null, Some( f(input) will be returned, thus creating f(input1, input2)
* as value in the output column.
* @param f function from A1 => RT
* @tparam RT return type
* @tparam A1 input parameter type
* @tparam A2 input parameter type
* @return a [[org.apache.spark.sql.UserDefinedFunction]] with the behaviour describe above
*/
def nullableUdf[RT: TypeTag, A1: TypeTag, A2: TypeTag](f: Function2[A1, A2, RT]): UserDefinedFunction = {
udf[Option[RT], A1, A2]( (i1: A1, i2: A2) => (i1, i2) match {
case (null, _) => None
case (_, null) => None
case (s1, s2) => Some((f(s1,s2)))
} )
}
}
Scala 实际上有一个很好的工厂函数,Option(),它可以使这个更加简洁:
val extractDateAsOptionInt = udf((d: String) =>
Option(d).map(_.substring(0, 10).filterNot("-".toSet).toInt))
在内部,Option 对象的 apply 方法只是为您做 null 检查:
def apply[A](x: A): Option[A] = if (x == null) None else Some(x)
给定 Table 1,其中一列 "x" 类型为 String。 我想创建 Table 2,其列 "y" 是 "x".
中给出的日期字符串的整数表示基本 是在 "y".
列中保留null
个值
Table 1(数据框 df1):
+----------+
| x|
+----------+
|2015-09-12|
|2015-09-13|
| null|
| null|
+----------+
root
|-- x: string (nullable = true)
Table 2(数据框 df2):
+----------+--------+
| x| y|
+----------+--------+
| null| null|
| null| null|
|2015-09-12|20150912|
|2015-09-13|20150913|
+----------+--------+
root
|-- x: string (nullable = true)
|-- y: integer (nullable = true)
而将列 "x" 的值转换为列 "y" 的值的用户定义函数 (udf) 是:
val extractDateAsInt = udf[Int, String] (
(d:String) => d.substring(0, 10)
.filterNot( "-".toSet)
.toInt )
并且有效,无法处理空值。
尽管如此,我可以做类似
的事情val extractDateAsIntWithNull = udf[Int, String] (
(d:String) =>
if (d != null) d.substring(0, 10).filterNot( "-".toSet).toInt
else 1 )
我没找到办法,通过udfs给"produce" null
值(当然,因为Int
s不能null
)。
我目前创建df2 (Table 2)的方案如下:
// holds data of table 1
val df1 = ...
// filter entries from df1, that are not null
val dfNotNulls = df1.filter(df1("x")
.isNotNull)
.withColumn("y", extractDateAsInt(df1("x")))
.withColumnRenamed("x", "right_x")
// create df2 via a left join on df1 and dfNotNull having
val df2 = df1.join( dfNotNulls, df1("x") === dfNotNulls("right_x"), "leftouter" ).drop("right_x")
问题:
- 当前的解决方案似乎很麻烦(并且可能效率不高。性能)。有没有更好的方法?
- @Spark-developers:是否有类型
NullableInt
计划/可用,以便可以使用以下 udf(参见代码摘录)?
代码摘录
val extractDateAsNullableInt = udf[NullableInt, String] (
(d:String) =>
if (d != null) d.substring(0, 10).filterNot( "-".toSet).toInt
else null )
这是 Option
派上用场的地方:
val extractDateAsOptionInt = udf((d: String) => d match {
case null => None
case s => Some(s.substring(0, 10).filterNot("-".toSet).toInt)
})
或者在一般情况下使其稍微更安全:
import scala.util.Try
val extractDateAsOptionInt = udf((d: String) => Try(
d.substring(0, 10).filterNot("-".toSet).toInt
).toOption)
所有功劳归于 Dmitriy Selivanov who've pointed out this solution as a (missing?) edit here。
替代方法是在 UDF 外部处理 null
:
import org.apache.spark.sql.functions.{lit, when}
import org.apache.spark.sql.types.IntegerType
val extractDateAsInt = udf(
(d: String) => d.substring(0, 10).filterNot("-".toSet).toInt
)
df.withColumn("y",
when($"x".isNull, lit(null))
.otherwise(extractDateAsInt($"x"))
.cast(IntegerType)
)
补充代码
使用@zero323 的 nice 答案,我创建了以下代码,以使用户定义的函数可用于处理所描述的空值。希望对其他人有帮助!
/**
* Set of methods to construct [[org.apache.spark.sql.UserDefinedFunction]]s that
* handle `null` values.
*/
object NullableFunctions {
import org.apache.spark.sql.functions._
import scala.reflect.runtime.universe.{TypeTag}
import org.apache.spark.sql.UserDefinedFunction
/**
* Given a function A1 => RT, create a [[org.apache.spark.sql.UserDefinedFunction]] such that
* * if fnc input is null, None is returned. This will create a null value in the output Spark column.
* * if A1 is non null, Some( f(input) will be returned, thus creating f(input) as value in the output column.
* @param f function from A1 => RT
* @tparam RT return type
* @tparam A1 input parameter type
* @return a [[org.apache.spark.sql.UserDefinedFunction]] with the behaviour describe above
*/
def nullableUdf[RT: TypeTag, A1: TypeTag](f: Function1[A1, RT]): UserDefinedFunction = {
udf[Option[RT],A1]( (i: A1) => i match {
case null => None
case s => Some(f(i))
})
}
/**
* Given a function A1, A2 => RT, create a [[org.apache.spark.sql.UserDefinedFunction]] such that
* * if on of the function input parameters is null, None is returned.
* This will create a null value in the output Spark column.
* * if both input parameters are non null, Some( f(input) will be returned, thus creating f(input1, input2)
* as value in the output column.
* @param f function from A1 => RT
* @tparam RT return type
* @tparam A1 input parameter type
* @tparam A2 input parameter type
* @return a [[org.apache.spark.sql.UserDefinedFunction]] with the behaviour describe above
*/
def nullableUdf[RT: TypeTag, A1: TypeTag, A2: TypeTag](f: Function2[A1, A2, RT]): UserDefinedFunction = {
udf[Option[RT], A1, A2]( (i1: A1, i2: A2) => (i1, i2) match {
case (null, _) => None
case (_, null) => None
case (s1, s2) => Some((f(s1,s2)))
} )
}
}
Scala 实际上有一个很好的工厂函数,Option(),它可以使这个更加简洁:
val extractDateAsOptionInt = udf((d: String) =>
Option(d).map(_.substring(0, 10).filterNot("-".toSet).toInt))
在内部,Option 对象的 apply 方法只是为您做 null 检查:
def apply[A](x: A): Option[A] = if (x == null) None else Some(x)