Spark 中的 UDF SQL DSL

UDF in Spark SQL DSL

我正在尝试在 Spark SQL 作业中使用 DSL 而不是纯 SQL,但我无法让我的 UDF 工作。

sqlContext.udf.register("subdate",(dateTime: Long)=>dateTime.toString.dropRight(6))

这行不通

rdd1.toDF.join(rdd2.toDF).where("subdate(rdd1(date_time)) === subdate(rdd2(dateTime))")

我还想添加另一个连接条件,就像在这个工作纯 SQL

val results=sqlContext.sql("select * from rdd1 join rdd2 on rdd1.id=rdd2.idand subdate(rdd1.date_time)=subdate(rdd2.dateTime)")

感谢您的帮助

SQL 您传递给 where 方法的表达式不正确至少有以下几个原因:

  • === 是一个 Column 方法,不是有效的 SQL 等式。您应该使用单个等号 =
  • 方括号表示法 (table(column)) 不是引用 SQL 中列的有效方式。在此上下文中,它将被识别为函数调用。 SQL 使用点符号 (table.column)
  • 即使 rdd1rdd2 都不是有效的 table 别名

由于列名看起来很明确,您可以简单地使用以下代码:

df1.join(df2).where("subdate(date_time) = subdate(dateTime)")

如果不是这种情况,则不先提供别名就无法使用点语法。例如参见 [​​=22=]

此外,当您一直使用原始 SQL 时,注册 UDF 最有意义。如果要使用DataFrame API最好直接使用UDF:

import org.apache.spark.sql.functions.udf

val subdate = udf((dateTime: Long) => dateTime.toString.dropRight(6)) 

val df1 = rdd1.toDF
val df2 = rdd2.toDF

df1.join(df2, subdate($"date_time") === subdate($"dateTime"))

或者如果列名不明确:

df1.join(df2, subdate(df1("date_time")) === subdate(df2("date_time")))

最后,对于像这样的简单函数,编写内置表达式比创建 UDF 更好。