使用 Java 在 Apache Spark 中的数据集的单个列上应用函数
Apply function on a single column of a Dataset in Apache Spark using Java
假设我有一个数据集:
Dataset<Row> sqlDF = this.spark.sql("SELECT first_name, last_name, age from persons";
这将 return 一个包含三列的 Dataset
:first_name、last_name、年龄。
我想应用一个函数,将 5 添加到 age
列和 return 一个新数据集,其列与原始数据集相同,但年龄值已更改:
public int add_age(int old_age){
return old_age + 5;
}
如何在 Java 上使用 Apache Spark 执行此操作?
我通过制作一个 StructType 并向其添加三列来解决这个问题,然后将每个列映射到新构造的行并将函数应用到行列 age
使用 RowFactory
:
StructType customStructType = new StructType();
customStructType = customStructType.add("first_name", DataTypes.StringType, true);
customStructType = customStructType.add("last_name", DataTypes.StringType, true);
customStructType = customStructType.add("age", DataTypes.IntegerType, true);
ExpressionEncoder<Row> customTypeEncoder = null;
Dataset<Row> changed_data = sqlDF.map(row->{
return RowFactory.create(row.get(0),row.get(1), add_age(row.get(2)));
}, RowEncoder.apply(customStructType));
假设我有一个数据集:
Dataset<Row> sqlDF = this.spark.sql("SELECT first_name, last_name, age from persons";
这将 return 一个包含三列的 Dataset
:first_name、last_name、年龄。
我想应用一个函数,将 5 添加到 age
列和 return 一个新数据集,其列与原始数据集相同,但年龄值已更改:
public int add_age(int old_age){
return old_age + 5;
}
如何在 Java 上使用 Apache Spark 执行此操作?
我通过制作一个 StructType 并向其添加三列来解决这个问题,然后将每个列映射到新构造的行并将函数应用到行列 age
使用 RowFactory
:
StructType customStructType = new StructType();
customStructType = customStructType.add("first_name", DataTypes.StringType, true);
customStructType = customStructType.add("last_name", DataTypes.StringType, true);
customStructType = customStructType.add("age", DataTypes.IntegerType, true);
ExpressionEncoder<Row> customTypeEncoder = null;
Dataset<Row> changed_data = sqlDF.map(row->{
return RowFactory.create(row.get(0),row.get(1), add_age(row.get(2)));
}, RowEncoder.apply(customStructType));