在 Spark 数据集中使用自定义 UDF withColumn<Row>; java.lang.String 无法转换为 org.apache.spark.sql.Row
Using custome UDF withColumn in a Spark Dataset<Row>; java.lang.String cannot be cast to org.apache.spark.sql.Row
我有一个包含许多字段的 JSON 文件。我在 java.
中使用 spark 的数据集读取了文件
Spark 版本 2.2.0
java jdk 1.8.0_121
下面是代码。
SparkSession spark = SparkSession
.builder()
.appName("Java Spark SQL basic example")
.config("spark.some.config.option", "some-value")
.master("local")
.getOrCreate();
Dataset<Row> df = spark.read().json("jsonfile.json");
我想使用带有自定义 UDF 的 withColumn 函数来添加新列。
UDF1 someudf = new UDF1<Row,String>(){
public String call(Row fin) throws Exception{
String some_str = fin.getAs("String");
return some_str;
}
};
spark.udf().register( "some_udf", someudf, DataTypes.StringType );
df.withColumn( "procs", callUDF( "some_udf", col("columnx") ) ).show();
当我 运行 上面的代码时,我得到一个转换错误。
java.lang.String 无法转换为 org.apache.spark.sql.Row
问题:
1 - 读取行数据集是唯一的选择吗?我可以将 df 转换为字符串 df。但我将无法 select 字段。
2 - 已尝试但未能定义用户定义的数据类型。我无法使用此自定义 UDDatatype 注册 UDF。我在这里需要用户定义的数据类型吗?
3 - 主要问题是,如何从 String 转换为 Row?
部分日志复制如下:
Caused by: java.lang.ClassCastException: java.lang.String cannot be cast to org.apache.spark.sql.Row
at Risks.readcsv.call(readcsv.java:1)
at org.apache.spark.sql.UDFRegistration$$anonfun.apply(UDFRegistration.scala:512)
... 16 more
Caused by: org.apache.spark.SparkException: Failed to execute user defined function($anonfun: (string) => string)
非常感谢您的帮助。
您收到该异常是因为 UDF
将在非 Row
的列的数据类型上执行。假设我们有 Dataset<Row> ds
,其中有两列 col1
和 col2
都是字符串类型。现在,如果我们想使用 UDF
.
将 col2
的值转换为大写
我们可以像下面这样注册和调用UDF
。
spark.udf().register("toUpper", toUpper, DataTypes.StringType);
ds.select(col("*"),callUDF("toUpper", col("col2"))).show();
或使用withColumn
ds.withColumn("Upper",callUDF("toUpper", col("col2"))).show();
和UDF
应该像下面这样。
private static UDF1 toUpper = new UDF1<String, String>() {
public String call(final String str) throws Exception {
return str.toUpperCase();
}
};
改进@abaghel 写的内容。
如果你使用下面的 import
import org.apache.spark.sql.functions;
使用withColumn
,代码应该如下:
ds.withColumn("Upper",functions.callUDF("toUpper", ds.col("col2"))).show();
我有一个包含许多字段的 JSON 文件。我在 java.
中使用 spark 的数据集读取了文件Spark 版本 2.2.0
java jdk 1.8.0_121
下面是代码。
SparkSession spark = SparkSession
.builder()
.appName("Java Spark SQL basic example")
.config("spark.some.config.option", "some-value")
.master("local")
.getOrCreate();
Dataset<Row> df = spark.read().json("jsonfile.json");
我想使用带有自定义 UDF 的 withColumn 函数来添加新列。
UDF1 someudf = new UDF1<Row,String>(){
public String call(Row fin) throws Exception{
String some_str = fin.getAs("String");
return some_str;
}
};
spark.udf().register( "some_udf", someudf, DataTypes.StringType );
df.withColumn( "procs", callUDF( "some_udf", col("columnx") ) ).show();
当我 运行 上面的代码时,我得到一个转换错误。 java.lang.String 无法转换为 org.apache.spark.sql.Row
问题:
1 - 读取行数据集是唯一的选择吗?我可以将 df 转换为字符串 df。但我将无法 select 字段。
2 - 已尝试但未能定义用户定义的数据类型。我无法使用此自定义 UDDatatype 注册 UDF。我在这里需要用户定义的数据类型吗?
3 - 主要问题是,如何从 String 转换为 Row?
部分日志复制如下:
Caused by: java.lang.ClassCastException: java.lang.String cannot be cast to org.apache.spark.sql.Row
at Risks.readcsv.call(readcsv.java:1)
at org.apache.spark.sql.UDFRegistration$$anonfun.apply(UDFRegistration.scala:512)
... 16 more
Caused by: org.apache.spark.SparkException: Failed to execute user defined function($anonfun: (string) => string)
非常感谢您的帮助。
您收到该异常是因为 UDF
将在非 Row
的列的数据类型上执行。假设我们有 Dataset<Row> ds
,其中有两列 col1
和 col2
都是字符串类型。现在,如果我们想使用 UDF
.
col2
的值转换为大写
我们可以像下面这样注册和调用UDF
。
spark.udf().register("toUpper", toUpper, DataTypes.StringType);
ds.select(col("*"),callUDF("toUpper", col("col2"))).show();
或使用withColumn
ds.withColumn("Upper",callUDF("toUpper", col("col2"))).show();
和UDF
应该像下面这样。
private static UDF1 toUpper = new UDF1<String, String>() {
public String call(final String str) throws Exception {
return str.toUpperCase();
}
};
改进@abaghel 写的内容。 如果你使用下面的 import
import org.apache.spark.sql.functions;
使用withColumn
,代码应该如下:
ds.withColumn("Upper",functions.callUDF("toUpper", ds.col("col2"))).show();