如何使用 JAVA 在 Spark DataFrame 上调用 UDF?

How do I call a UDF on a Spark DataFrame using JAVA?

here 类似的问题,但没有足够的分数在那里发表评论。

根据最新的 Spark documentationudf 可以以两种不同的方式使用,一种使用 SQL,另一种使用 DataFrame。我找到了多个关于如何将 udf 与 sql 一起使用的示例,但未能找到任何有关如何直接在 DataFrame 上使用 udf 的示例。

o.p提供的解决方案。在上面链接的问题上使用 __callUDF()___deprecated_ 并将根据 Spark Java API 文档在 Spark 2.0 中删除。在那里,它说:

"since it's redundant with udf()"

所以这意味着我应该能够使用 __udf()__ 来计算我的 udf,但我不知道该怎么做。我没有偶然发现任何阐明 Java-Spark 程序语法的内容。我错过了什么?

import org.apache.spark.sql.api.java.UDF1;
.
.    
UDF1 mode = new UDF1<String[], String>() {
    public String call(final String[] types) throws Exception {
        return types[0];
    }
};

sqlContext.udf().register("mode", mode, DataTypes.StringType);
df.???????? how do I call my udf (mode) on a given column of my DataFrame df?

火花 >= 2.3

Scala风格udf可以直接调用:

import static org.apache.spark.sql.functions.*;
import org.apache.spark.sql.expressions.UserDefinedFunction;

UserDefinedFunction mode = udf(
  (Seq<String> ss) -> ss.headOption(), DataTypes.StringType
);

df.select(mode.apply(col("vs"))).show();

Spark < 2.3

即使我们假设您的 UDF 很有用并且不能被简单的 getItem 调用替换,它的签名也不正确。数组列是使用 Scala WrappedArray 而不是普通的 Java 数组公开的,因此您必须调整签名:

UDF1 mode = new UDF1<Seq<String>, String>() {
  public String call(final Seq<String> types) throws Exception {
    return types.headOption();
  }
};

如果 UDF 已经注册:

sqlContext.udf().register("mode", mode, DataTypes.StringType);

你可以简单地使用callUDF(这是1.5中引入的新功能)通过名称调用它:

df.select(callUDF("mode", col("vs"))).show();

您也可以在selectExprs中使用它:

df.selectExpr("mode(vs)").show();