创建与 DataFrame 和 SQL API 兼容的 UDF
Creating UDF compatible with DataFrame and SQL API
我正在尝试编写可在 Spark 中的 Dataframes 中运行的 UDF SQL。
这是代码
def Timeformat (timecol1: Int) = {
if (timecol1 >= 1440)
("%02d:%02d".format((timecol1-1440)/60, (timecol1-1440)%60))
else
("%02d:%02d".format((timecol1)/60, (timecol1)%60))
}
sqlContext.udf.register("Timeformat", Timeformat _)
此方法非常适用于 sqlcontext
val dataa = sqlContext.sql("""select Timeformat(abc.time_band) from abc""")
使用 DF - 出现错误
val fcstdataa = abc.select(Timeformat(abc("time_band_start")))
此方法抛出类型不匹配错误。
<console>:41: error: type mismatch;
found : org.apache.spark.sql.Column
required: Int
当我如下重写 UDF 时,对 DF 非常适用,但在 Sqlcontext 中不起作用。有没有办法解决这个问题而不用创建多个 UDF 来做同样的事情
val Timeformat = udf((timecol1: Int) =>
if (timecol1 >= 1440)
("%02d:%02d".format((timecol1-1440)/60, (timecol1-1440)%60))
else
("%02d:%02d".format((timecol1)/60, (timecol1)%60))
)
我是 scala 和 spark 的新手,这两个声明有什么区别。一种方法比另一种更好吗?
在这里使用 UDF 没有任何意义,但如果你真的想要这个,就不要使用匿名函数。获取你已有的函数(Int => String
)并使用 UDF 包装它:
def Timeformat(timecol1: Int): String = ???
sqlContext.udf.register("Timeformat", Timeformat _)
val timeformat_ = udf(Timeformat _)
或者您可以 callUDF
:
import org.apache.spark.sql.functions.callUDF
abc.select(callUDF("Timeformat", $"time_band_start"))
据说大多数时候应该首选非 UDF 解决方案:
import org.apache.spark.sql.Column
import org.apache.spark.sql.functions.{when, format_string}
def timeformatExpr(col: Column) = {
val offset = when(col >= 1440, 1440).otherwise(0)
val x = ((col - offset) / 60).cast("int")
val y = (col - offset) % 60
format_string("%02d:%02d", x, y)
}
相当于以下 SQL:
val expr = """CASE
WHEN time_band >= 1440 THEN
FORMAT_STRING(
'%02d:%02d',
CAST((time_band - 1440) / 60 AS INT),
(time_band - 1440) % 60
)
ELSE
FORMAT_STRING(
'%02d:%02d',
CAST(time_band / 60 AS INT),
time_band % 60
)
END"""
可用于原始 SQL 以及具有 selectExpr
或 expr
功能的 DataFrame
。
例子:
val df = Seq((1L, 123123), (2L, 42132), (3L, 99)).toDF("id", "time_band")
df.select($"*", timeformatExpr($"time_band").alias("tf")).show
// +---+---------+-------+
// | id|time_band| tf|
// +---+---------+-------+
// | 1| 123123|2028:03|
// | 2| 42132| 678:12|
// | 3| 99| 01:39|
// +---+---------+-------+
df.registerTempTable("df")
sqlContext.sql(s"SELECT *, $expr AS tf FROM df").show
// +---+---------+-------+
// | id|time_band| tf|
// +---+---------+-------+
// | 1| 123123|2028:03|
// | 2| 42132| 678:12|
// | 3| 99| 01:39|
// +---+---------+-------+
df.selectExpr("*", s"$expr AS tf").show
// +---+---------+-------+
// | id|time_band| tf|
// +---+---------+-------+
// | 1| 123123|2028:03|
// | 2| 42132| 678:12|
// | 3| 99| 01:39|
// +---+---------+-------+
我正在尝试编写可在 Spark 中的 Dataframes 中运行的 UDF SQL。
这是代码
def Timeformat (timecol1: Int) = {
if (timecol1 >= 1440)
("%02d:%02d".format((timecol1-1440)/60, (timecol1-1440)%60))
else
("%02d:%02d".format((timecol1)/60, (timecol1)%60))
}
sqlContext.udf.register("Timeformat", Timeformat _)
此方法非常适用于 sqlcontext
val dataa = sqlContext.sql("""select Timeformat(abc.time_band) from abc""")
使用 DF - 出现错误
val fcstdataa = abc.select(Timeformat(abc("time_band_start")))
此方法抛出类型不匹配错误。
<console>:41: error: type mismatch;
found : org.apache.spark.sql.Column
required: Int
当我如下重写 UDF 时,对 DF 非常适用,但在 Sqlcontext 中不起作用。有没有办法解决这个问题而不用创建多个 UDF 来做同样的事情
val Timeformat = udf((timecol1: Int) =>
if (timecol1 >= 1440)
("%02d:%02d".format((timecol1-1440)/60, (timecol1-1440)%60))
else
("%02d:%02d".format((timecol1)/60, (timecol1)%60))
)
我是 scala 和 spark 的新手,这两个声明有什么区别。一种方法比另一种更好吗?
在这里使用 UDF 没有任何意义,但如果你真的想要这个,就不要使用匿名函数。获取你已有的函数(Int => String
)并使用 UDF 包装它:
def Timeformat(timecol1: Int): String = ???
sqlContext.udf.register("Timeformat", Timeformat _)
val timeformat_ = udf(Timeformat _)
或者您可以 callUDF
:
import org.apache.spark.sql.functions.callUDF
abc.select(callUDF("Timeformat", $"time_band_start"))
据说大多数时候应该首选非 UDF 解决方案:
import org.apache.spark.sql.Column
import org.apache.spark.sql.functions.{when, format_string}
def timeformatExpr(col: Column) = {
val offset = when(col >= 1440, 1440).otherwise(0)
val x = ((col - offset) / 60).cast("int")
val y = (col - offset) % 60
format_string("%02d:%02d", x, y)
}
相当于以下 SQL:
val expr = """CASE
WHEN time_band >= 1440 THEN
FORMAT_STRING(
'%02d:%02d',
CAST((time_band - 1440) / 60 AS INT),
(time_band - 1440) % 60
)
ELSE
FORMAT_STRING(
'%02d:%02d',
CAST(time_band / 60 AS INT),
time_band % 60
)
END"""
可用于原始 SQL 以及具有 selectExpr
或 expr
功能的 DataFrame
。
例子:
val df = Seq((1L, 123123), (2L, 42132), (3L, 99)).toDF("id", "time_band")
df.select($"*", timeformatExpr($"time_band").alias("tf")).show
// +---+---------+-------+
// | id|time_band| tf|
// +---+---------+-------+
// | 1| 123123|2028:03|
// | 2| 42132| 678:12|
// | 3| 99| 01:39|
// +---+---------+-------+
df.registerTempTable("df")
sqlContext.sql(s"SELECT *, $expr AS tf FROM df").show
// +---+---------+-------+
// | id|time_band| tf|
// +---+---------+-------+
// | 1| 123123|2028:03|
// | 2| 42132| 678:12|
// | 3| 99| 01:39|
// +---+---------+-------+
df.selectExpr("*", s"$expr AS tf").show
// +---+---------+-------+
// | id|time_band| tf|
// +---+---------+-------+
// | 1| 123123|2028:03|
// | 2| 42132| 678:12|
// | 3| 99| 01:39|
// +---+---------+-------+