Scala 和 Spark UDF 函数
Scala and Spark UDF function
我制作了一个简单的 UDF 来转换或提取 spark 中 temptabl 中时间字段的一些值。我注册了该函数,但是当我使用 sql 调用该函数时,它会抛出 NullPointerException。下面是我的功能和执行过程。我正在使用齐柏林飞艇。奇怪的是,这昨天还在工作,但今天早上就停止工作了。
函数
def convert( time:String ) : String = {
val sdf = new java.text.SimpleDateFormat("HH:mm")
val time1 = sdf.parse(time)
return sdf.format(time1)
}
注册函数
sqlContext.udf.register("convert",convert _)
在没有 SQL 的情况下测试函数 -- 这有效
convert(12:12:12) -> returns 12:12
在 Zeppelin 中使用 SQL 测试函数失败。
%sql
select convert(time) from temptable limit 10
诱惑的结构
root
|-- date: string (nullable = true)
|-- time: string (nullable = true)
|-- serverip: string (nullable = true)
|-- request: string (nullable = true)
|-- resource: string (nullable = true)
|-- protocol: integer (nullable = true)
|-- sourceip: string (nullable = true)
我得到的部分堆栈跟踪。
java.lang.NullPointerException
at org.apache.hadoop.hive.ql.exec.FunctionRegistry.getFunctionInfo(FunctionRegistry.java:643)
at org.apache.hadoop.hive.ql.exec.FunctionRegistry.getFunctionInfo(FunctionRegistry.java:652)
at org.apache.spark.sql.hive.HiveFunctionRegistry.lookupFunction(hiveUdfs.scala:54)
at org.apache.spark.sql.hive.HiveContext$$anon.org$apache$spark$sql$catalyst$analysis$OverrideFunctionRegistry$$super$lookupFunction(HiveContext.scala:376)
at org.apache.spark.sql.catalyst.analysis.OverrideFunctionRegistry$$anonfun$lookupFunction.apply(FunctionRegistry.scala:44)
at org.apache.spark.sql.catalyst.analysis.OverrideFunctionRegistry$$anonfun$lookupFunction.apply(FunctionRegistry.scala:44)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.sql.catalyst.analysis.OverrideFunctionRegistry$class.lookupFunction(FunctionRegistry.scala:44)
使用udf而不是直接定义一个函数
import org.apache.spark.sql.functions._
val convert = udf[String, String](time => {
val sdf = new java.text.SimpleDateFormat("HH:mm")
val time1 = sdf.parse(time)
sdf.format(time1)
}
)
一个udf的入参是Column(或Columns)。 return 类型是 Column。
case class UserDefinedFunction protected[sql] (
f: AnyRef,
dataType: DataType,
inputTypes: Option[Seq[DataType]]) {
def apply(exprs: Column*): Column = {
Column(ScalaUDF(f, dataType, exprs.map(_.expr), inputTypes.getOrElse(Nil)))
}
}
您必须将函数定义为 UDF。
import org.apache.spark.sql.expressions.UserDefinedFunction
import org.apache.spark.sql.functions.udf
val convertUDF: UserDefinedFunction = udf((time:String) => {
val sdf = new java.text.SimpleDateFormat("HH:mm")
val time1 = sdf.parse(time)
sdf.format(time1)
})
接下来,您将在 DataFrame 上应用您的 UDF。
// assuming your DataFrame is already defined
dataFrame.withColumn("time", convertUDF(col("time"))) // using the same name replaces existing
现在,关于您的实际问题,您收到此错误的原因之一可能是您的 DataFrame 包含空行。如果您在应用 UDF 之前过滤掉它们,您应该能够继续没有问题。
dataFrame.filter(col("time").isNotNull)
我很好奇当 运行 除了遇到 null 之外的 UDF 时,还有什么会导致 NullPointerException,如果您发现与我的建议不同的原因,我很乐意知道。
我制作了一个简单的 UDF 来转换或提取 spark 中 temptabl 中时间字段的一些值。我注册了该函数,但是当我使用 sql 调用该函数时,它会抛出 NullPointerException。下面是我的功能和执行过程。我正在使用齐柏林飞艇。奇怪的是,这昨天还在工作,但今天早上就停止工作了。
函数
def convert( time:String ) : String = {
val sdf = new java.text.SimpleDateFormat("HH:mm")
val time1 = sdf.parse(time)
return sdf.format(time1)
}
注册函数
sqlContext.udf.register("convert",convert _)
在没有 SQL 的情况下测试函数 -- 这有效
convert(12:12:12) -> returns 12:12
在 Zeppelin 中使用 SQL 测试函数失败。
%sql
select convert(time) from temptable limit 10
诱惑的结构
root
|-- date: string (nullable = true)
|-- time: string (nullable = true)
|-- serverip: string (nullable = true)
|-- request: string (nullable = true)
|-- resource: string (nullable = true)
|-- protocol: integer (nullable = true)
|-- sourceip: string (nullable = true)
我得到的部分堆栈跟踪。
java.lang.NullPointerException
at org.apache.hadoop.hive.ql.exec.FunctionRegistry.getFunctionInfo(FunctionRegistry.java:643)
at org.apache.hadoop.hive.ql.exec.FunctionRegistry.getFunctionInfo(FunctionRegistry.java:652)
at org.apache.spark.sql.hive.HiveFunctionRegistry.lookupFunction(hiveUdfs.scala:54)
at org.apache.spark.sql.hive.HiveContext$$anon.org$apache$spark$sql$catalyst$analysis$OverrideFunctionRegistry$$super$lookupFunction(HiveContext.scala:376)
at org.apache.spark.sql.catalyst.analysis.OverrideFunctionRegistry$$anonfun$lookupFunction.apply(FunctionRegistry.scala:44)
at org.apache.spark.sql.catalyst.analysis.OverrideFunctionRegistry$$anonfun$lookupFunction.apply(FunctionRegistry.scala:44)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.sql.catalyst.analysis.OverrideFunctionRegistry$class.lookupFunction(FunctionRegistry.scala:44)
使用udf而不是直接定义一个函数
import org.apache.spark.sql.functions._
val convert = udf[String, String](time => {
val sdf = new java.text.SimpleDateFormat("HH:mm")
val time1 = sdf.parse(time)
sdf.format(time1)
}
)
一个udf的入参是Column(或Columns)。 return 类型是 Column。
case class UserDefinedFunction protected[sql] (
f: AnyRef,
dataType: DataType,
inputTypes: Option[Seq[DataType]]) {
def apply(exprs: Column*): Column = {
Column(ScalaUDF(f, dataType, exprs.map(_.expr), inputTypes.getOrElse(Nil)))
}
}
您必须将函数定义为 UDF。
import org.apache.spark.sql.expressions.UserDefinedFunction
import org.apache.spark.sql.functions.udf
val convertUDF: UserDefinedFunction = udf((time:String) => {
val sdf = new java.text.SimpleDateFormat("HH:mm")
val time1 = sdf.parse(time)
sdf.format(time1)
})
接下来,您将在 DataFrame 上应用您的 UDF。
// assuming your DataFrame is already defined
dataFrame.withColumn("time", convertUDF(col("time"))) // using the same name replaces existing
现在,关于您的实际问题,您收到此错误的原因之一可能是您的 DataFrame 包含空行。如果您在应用 UDF 之前过滤掉它们,您应该能够继续没有问题。
dataFrame.filter(col("time").isNotNull)
我很好奇当 运行 除了遇到 null 之外的 UDF 时,还有什么会导致 NullPointerException,如果您发现与我的建议不同的原因,我很乐意知道。