在 SQLContext 之外的 Java 中创建 SparkSQL UDF

Creating a SparkSQL UDF in Java outside of SQLContext

问题

我想在 Java 中创建一个用户定义的函数,它可以在 Apache Spark 运算符链中作为 Java 方法调用。我无法找到不需要 UDF 存在于 SQL 查询中的 Java 示例。

版本

我试过的有效方法

我可以在 Java 中成功创建 UDF。但是,除非它在 ​​SQL 查询中,否则我不能使用它:

import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.types.DataTypes;

sqlContext.udf().register("udfUppercase",
    (String string) -> string.toUpperCase(), DataTypes.StringType);

DataFrame oldDF = // a simple DataFrame with a "name" column
oldDF.registerTempTable("df");
DataFrame newDF = sqlContext.sql("SELECT udfUppercase(name) AS name_upper FROM df");

我被困在哪里

我希望 Java 中的非 SQL 方法调用风格的 UDF 看起来像这样:

import static org.apache.spark.sql.functions.udf;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.UserDefinedFunction;
import org.apache.spark.sql.types.DataTypes;

UserDefinedFunction udfUppercase = udf(
    (String string) -> string.toUpperCase(), DataTypes.StringType);

DataFrame oldDF = // a simple DataFrame with a "name" column
newDF = oldDF.withColumn("name_upper", udfUppercase(oldDF.col("name")));

编译它会导致以 "UserDefinedFunction" 开头的行出现编译器错误,所以显然我猜测正确签名的尝试是不正确的:

error: no suitable method found for udf((String st[...]ase(),DataType)
    UserDefinedFunction udfUppercase = udf((String string) -> string.toUpperCase(), DataTypes.StringType);
method functions.<RT#1>udf(Function0<RT#1>,TypeTags.TypeTag<RT#1>) is not applicable
    (cannot infer type-variable(s) RT#1
    (argument mismatch; Function0 is not a functional interface
    multiple non-overriding abstract methods found in interface Function0))

此错误继续详细说明每个已尝试的推断 udf() 签名。

我需要什么

我需要修复 Java 代码,以便我可以定义和使用 udfUppercase UDF 而无需将其嵌入到 SQL 查询中。我觉得我错过了一些非常简单、基本的东西,可能还有语法问题,但可能完全偏离了基础。

工作解决方案(由下面的 zero323 提供)

没有很好的方法来注册和使用 Java UDF 作为 Java 方法,但是在 SQLContext 中注册的 UDF 可以插入到运算符链中callUDF().

import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.types.DataTypes;

sqlContext.udf().register("udfUppercase",
    (String string) -> string.toUpperCase(), DataTypes.StringType);

DataFrame oldDF = // a simple DataFrame with a "name" column
newDF = oldDF.withColumn("name_upper", callUDF("udfUppercase", oldDF.col("name")));

此外,请务必使用 callUDF() 而不是已弃用的 callUdf(),后者具有不同的方法签名。

火花 >= 2.3

SPARK-22945 (add java UDF APIs in the functions object) 添加简化的udf API, 类似于 Scala 和 Python:

import static org.apache.spark.sql.functions.*;
import org.apache.spark.sql.expressions.UserDefinedFunction;

UserDefinedFunction udfUppercase = udf(
  (String s) -> s.toUpperCase(), DataTypes.StringType
);

df.select(udfUppercase.apply(col("name")));

Spark < 2.3

长话短说 functions.udf 方法不是为 Java 互操作性而设计的。所有变体都需要 TypeTags,虽然可以手动生成这些变体(我很确定我已经看到 Daniel Darabos 展示了如何在 SO 上执行此操作),但您可能希望避免这种情况。

如果出于某种原因你想避免在 Scala 中编写 UDF,最简单的方法就是注册 UDF and call it by name:

sqlContext.udf().register("udfUppercase",
  (String string) -> string.toUpperCase(), DataTypes.StringType);

df.select(callUDF("udfUppercase", col("name")));