如何使用 JAVA 在 Spark DataFrame 上调用 UDF?
How do I call a UDF on a Spark DataFrame using JAVA?
与 here 类似的问题,但没有足够的分数在那里发表评论。
根据最新的 Spark documentation,udf
可以以两种不同的方式使用,一种使用 SQL,另一种使用 DataFrame。我找到了多个关于如何将 udf
与 sql 一起使用的示例,但未能找到任何有关如何直接在 DataFrame 上使用 udf
的示例。
o.p提供的解决方案。在上面链接的问题上使用 __callUDF()__
即 _deprecated_
并将根据 Spark Java API 文档在 Spark 2.0 中删除。在那里,它说:
"since it's redundant with udf()"
所以这意味着我应该能够使用 __udf()__
来计算我的 udf
,但我不知道该怎么做。我没有偶然发现任何阐明 Java-Spark 程序语法的内容。我错过了什么?
import org.apache.spark.sql.api.java.UDF1;
.
.
UDF1 mode = new UDF1<String[], String>() {
public String call(final String[] types) throws Exception {
return types[0];
}
};
sqlContext.udf().register("mode", mode, DataTypes.StringType);
df.???????? how do I call my udf (mode) on a given column of my DataFrame df?
火花 >= 2.3
Scala风格udf
可以直接调用:
import static org.apache.spark.sql.functions.*;
import org.apache.spark.sql.expressions.UserDefinedFunction;
UserDefinedFunction mode = udf(
(Seq<String> ss) -> ss.headOption(), DataTypes.StringType
);
df.select(mode.apply(col("vs"))).show();
Spark < 2.3
即使我们假设您的 UDF 很有用并且不能被简单的 getItem
调用替换,它的签名也不正确。数组列是使用 Scala WrappedArray
而不是普通的 Java 数组公开的,因此您必须调整签名:
UDF1 mode = new UDF1<Seq<String>, String>() {
public String call(final Seq<String> types) throws Exception {
return types.headOption();
}
};
如果 UDF 已经注册:
sqlContext.udf().register("mode", mode, DataTypes.StringType);
你可以简单地使用callUDF
(这是1.5中引入的新功能)通过名称调用它:
df.select(callUDF("mode", col("vs"))).show();
您也可以在selectExprs
中使用它:
df.selectExpr("mode(vs)").show();
与 here 类似的问题,但没有足够的分数在那里发表评论。
根据最新的 Spark documentation,udf
可以以两种不同的方式使用,一种使用 SQL,另一种使用 DataFrame。我找到了多个关于如何将 udf
与 sql 一起使用的示例,但未能找到任何有关如何直接在 DataFrame 上使用 udf
的示例。
o.p提供的解决方案。在上面链接的问题上使用 __callUDF()__
即 _deprecated_
并将根据 Spark Java API 文档在 Spark 2.0 中删除。在那里,它说:
"since it's redundant with udf()"
所以这意味着我应该能够使用 __udf()__
来计算我的 udf
,但我不知道该怎么做。我没有偶然发现任何阐明 Java-Spark 程序语法的内容。我错过了什么?
import org.apache.spark.sql.api.java.UDF1;
.
.
UDF1 mode = new UDF1<String[], String>() {
public String call(final String[] types) throws Exception {
return types[0];
}
};
sqlContext.udf().register("mode", mode, DataTypes.StringType);
df.???????? how do I call my udf (mode) on a given column of my DataFrame df?
火花 >= 2.3
Scala风格udf
可以直接调用:
import static org.apache.spark.sql.functions.*;
import org.apache.spark.sql.expressions.UserDefinedFunction;
UserDefinedFunction mode = udf(
(Seq<String> ss) -> ss.headOption(), DataTypes.StringType
);
df.select(mode.apply(col("vs"))).show();
Spark < 2.3
即使我们假设您的 UDF 很有用并且不能被简单的 getItem
调用替换,它的签名也不正确。数组列是使用 Scala WrappedArray
而不是普通的 Java 数组公开的,因此您必须调整签名:
UDF1 mode = new UDF1<Seq<String>, String>() {
public String call(final Seq<String> types) throws Exception {
return types.headOption();
}
};
如果 UDF 已经注册:
sqlContext.udf().register("mode", mode, DataTypes.StringType);
你可以简单地使用callUDF
(这是1.5中引入的新功能)通过名称调用它:
df.select(callUDF("mode", col("vs"))).show();
您也可以在selectExprs
中使用它:
df.selectExpr("mode(vs)").show();