自定义类型数据集如何调用groupBy方法?
How does a custom type dataset call the groupBy method?
我通过 spark 创建了一个自定义类型数据集。
public class KeyValuePair {
String source;
String target;
int value;
getter...
setter...
}
...
List<KeyValuePair> list = generateList();
Dataset<KeyValuePair> dataset = spark.createDataset(list, Encoders.kryo(KeyValuePair.class));
Dataset<Row> agg = dataset.groupBy("source", "target").agg(avg("value"));
数据集调用groupBy方法时,会抛出异常。
线程“main”中的异常 org.apache.spark.sql.AnalysisException:无法解析(值)
中的列名称“source”
不知道自定义类型数据集是否可以调用groupBy方法。是不是只有Row类型的数据集才能调用这个方法不抛出异常?
如何聚合自定义类型数据集?
异常原因是:您的数据集没有聚合所需的列。您可以在创建数据集时使用 Encoders.bean(class) 获得预期结果。
代码:
public class DatasetAggregation {
public static void main(String[] args) {
SparkSession spark = SparkSession.builder().master("local[*]").getOrCreate();
List<KeyValuePair> list = generateList();
Dataset<KeyValuePair> dataset = spark.createDataset(list, Encoders.bean(KeyValuePair.class));
Dataset<Row> agg = dataset.groupBy("source", "target").agg(avg("value"));
agg.show();
}
public static List<KeyValuePair> generateList() {
KeyValuePair k = new KeyValuePair();
k.setSource("a");
k.setTarget("b");
k.setValue(10);
return Arrays.asList(k, k,k);
}
}
/* output
+------+------+----------+
|source|target|avg(value)|
+------+------+----------+
| a| b| 10.0|
+------+------+----------+
*/
我通过 spark 创建了一个自定义类型数据集。
public class KeyValuePair {
String source;
String target;
int value;
getter...
setter...
}
...
List<KeyValuePair> list = generateList();
Dataset<KeyValuePair> dataset = spark.createDataset(list, Encoders.kryo(KeyValuePair.class));
Dataset<Row> agg = dataset.groupBy("source", "target").agg(avg("value"));
数据集调用groupBy方法时,会抛出异常。 线程“main”中的异常 org.apache.spark.sql.AnalysisException:无法解析(值)
中的列名称“source”不知道自定义类型数据集是否可以调用groupBy方法。是不是只有Row类型的数据集才能调用这个方法不抛出异常?
如何聚合自定义类型数据集?
异常原因是:您的数据集没有聚合所需的列。您可以在创建数据集时使用 Encoders.bean(class) 获得预期结果。
代码:
public class DatasetAggregation {
public static void main(String[] args) {
SparkSession spark = SparkSession.builder().master("local[*]").getOrCreate();
List<KeyValuePair> list = generateList();
Dataset<KeyValuePair> dataset = spark.createDataset(list, Encoders.bean(KeyValuePair.class));
Dataset<Row> agg = dataset.groupBy("source", "target").agg(avg("value"));
agg.show();
}
public static List<KeyValuePair> generateList() {
KeyValuePair k = new KeyValuePair();
k.setSource("a");
k.setTarget("b");
k.setValue(10);
return Arrays.asList(k, k,k);
}
}
/* output
+------+------+----------+
|source|target|avg(value)|
+------+------+----------+
| a| b| 10.0|
+------+------+----------+
*/