创建与 DataFrame 和 SQL API 兼容的 UDF

Creating UDF compatible with DataFrame and SQL API

我正在尝试编写可在 Spark 中的 Dataframes 中运行的 UDF SQL。

这是代码

def Timeformat (timecol1: Int) = {
    if (timecol1 >= 1440)  
        ("%02d:%02d".format((timecol1-1440)/60, (timecol1-1440)%60))  
    else 
        ("%02d:%02d".format((timecol1)/60, (timecol1)%60))
}

sqlContext.udf.register("Timeformat", Timeformat _)

此方法非常适用于 sqlcontext

val dataa = sqlContext.sql("""select Timeformat(abc.time_band) from abc""")

使用 DF - 出现错误 val fcstdataa = abc.select(Timeformat(abc("time_band_start")))

此方法抛出类型不匹配错误。

<console>:41: error: type mismatch;
 found   : org.apache.spark.sql.Column
 required: Int

当我如下重写 UDF 时,对 DF 非常适用,但在 Sqlcontext 中不起作用。有没有办法解决这个问题而不用创建多个 UDF 来做同样的事情

val Timeformat = udf((timecol1: Int) => 
    if (timecol1 >= 1440)  
        ("%02d:%02d".format((timecol1-1440)/60, (timecol1-1440)%60))  
    else 
        ("%02d:%02d".format((timecol1)/60, (timecol1)%60))
)

我是 scala 和 spark 的新手,这两个声明有什么区别。一种方法比另一种更好吗?

在这里使用 UDF 没有任何意义,但如果你真的想要这个,就不要使用匿名函数。获取你已有的函数(Int => String)并使用 UDF 包装它:

def Timeformat(timecol1: Int): String = ??? 
sqlContext.udf.register("Timeformat", Timeformat _)
val timeformat_ = udf(Timeformat _)

或者您可以 callUDF:

import org.apache.spark.sql.functions.callUDF

abc.select(callUDF("Timeformat", $"time_band_start"))

据说大多数时候应该首选非 UDF 解决方案:

import org.apache.spark.sql.Column
import org.apache.spark.sql.functions.{when, format_string}

def timeformatExpr(col: Column) = {
  val offset = when(col >= 1440, 1440).otherwise(0)
  val x = ((col - offset) / 60).cast("int")
  val y = (col - offset) % 60
  format_string("%02d:%02d", x, y)
}

相当于以下 SQL:

val expr = """CASE
  WHEN time_band >= 1440 THEN
      FORMAT_STRING(
          '%02d:%02d', 
          CAST((time_band - 1440) / 60 AS INT),
          (time_band - 1440) % 60
      )
  ELSE 
      FORMAT_STRING(
          '%02d:%02d', 
          CAST(time_band / 60 AS INT),
          time_band % 60
      )
END"""

可用于原始 SQL 以及具有 selectExprexpr 功能的 DataFrame

例子:

val df = Seq((1L, 123123), (2L, 42132), (3L, 99)).toDF("id", "time_band")

df.select($"*", timeformatExpr($"time_band").alias("tf")).show
// +---+---------+-------+
// | id|time_band|     tf|
// +---+---------+-------+
// |  1|   123123|2028:03|
// |  2|    42132| 678:12|
// |  3|       99|  01:39|
// +---+---------+-------+

df.registerTempTable("df")

sqlContext.sql(s"SELECT *, $expr AS tf FROM df").show
// +---+---------+-------+
// | id|time_band|     tf|
// +---+---------+-------+
// |  1|   123123|2028:03|
// |  2|    42132| 678:12|
// |  3|       99|  01:39|
// +---+---------+-------+

df.selectExpr("*", s"$expr AS tf").show
// +---+---------+-------+
// | id|time_band|     tf|
// +---+---------+-------+
// |  1|   123123|2028:03|
// |  2|    42132| 678:12|
// |  3|       99|  01:39|
// +---+---------+-------+