SparkSQL + Java:使用数据集时从 Pojo 到表格格式
SparkSQL + Java: Pojo to Tabular Format while working with Datasets
我是 Spark 的新手 SQL。在执行其中一项训练任务时,我遇到了以下问题并且找不到答案(以下所有示例都有点笨拙,但出于演示目的应该还可以)。
我的应用读取镶木地板文件并根据其内容创建数据集:
DataFrame input = sqlContext.read().parquet("src/test/resources/integration/input/source.gz.parquet");
Dataset<Row> dataset = input.as(RowEncoder$.MODULE$.apply(input.schema()));
dataset.show() 调用结果:
+------------+----------------+--------+
+ Names + Gender + Age +
+------------+----------------+--------+
| Jack, Jill | Male, Female | 30, 25 |
然后我将数据集转换为一个新的数据集,其中包含 Person 类型:
public static Dataset<Person> transformToPerson(Dataset<Row> rawData) {
return rawData
.flatMap((Row sourceRow) -> {
// code to parse an input row and split person data goes here
Person person1 = new Person(name1, gender1, age1);
Person person2 = new Person(name2, gender2, age2);
return Arrays.asList(person1, person2);
}, Encoders.bean(Person.class));
}
哪里
public abstract class Human implements Serializable {
protected String name;
protected String gender;
// getters/setters go here
// default constructor + constructor with the name and gender params
}
public class Person extends Human {
private String age;
// getters/setters for the age param go here
// default constructor + constructor with the age, name and gender params
// overriden toString() method which returns the string: (<name>, <gender>, <age>)
}
最后,当我显示数据集的内容时,我希望看到
+------------+----------------+--------+
+ name + gender + age +
+------------+----------------+--------+
| Jack | Male | 30 |
| Jill | Femail | 25 |
不过,我明白了
+-------------------+----------------+--------+
+ name + gender + age +
+-------------------+----------------+--------+
|(Jack, Male, 30) | | |
|(Jill, Femail, 25) | | |
这是 toString() 方法的结果,而 header 是正确的。
我相信编码器有问题,就好像我使用 Encoders.javaSerizlization(T) 或 Encoders.kryo(T) 它显示
+------------------+
+ value +
+------------------+
|(Jack, Male, 30) |
|(Jill, Femail, 25)|
我最担心的是编码器的不正确使用可能会导致不正确的 SerDe and/or 性能损失。
在我能找到的所有 Spark Java 示例中,我看不出有什么特别之处...
你能告诉我哪里做错了吗?
更新 1
这是我项目的依赖项:
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.10</artifactId>
<version>1.6.2</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.10</artifactId>
<version>1.6.2</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive_2.10</artifactId>
<version>1.6.2</version>
</dependency>
解决方案
根据abaghel的建议,我将版本升级到2.0.2(请注意2.0.0版本有), used Dataset instead of DataFrames everywhere in my code (seems like DataFrames are not a part of Apache Spark starting from 2.0.0),并使用iterator-based flatMap函数从Row转换为人.
只是为了分享,使用 版本 1.6.2 的方法对我不起作用,因为它抛出了 'MyPersonConversion$function1 not Serializable' 异常。
现在一切正常。
您使用的 Spark 是什么版本?您提供的 flatMap 方法未使用 2.2.0 版进行编译。 Return 所需类型为 Iterator<Person>
。请使用下面的 FlatMapFunction,您将获得所需的输出。
public static Dataset<Person> transformToPerson(Dataset<Row> rawData) {
return rawData.flatMap(row -> {
String[] nameArr = row.getString(0).split(",");
String[] genArr = row.getString(1).split(",");
String[] ageArr = row.getString(2).split(",");
Person person1 = new Person(nameArr[0], genArr[0], ageArr[0]);
Person person2 = new Person(nameArr[1], genArr[1], ageArr[1]);
return Arrays.asList(person1, person2).iterator();
}, Encoders.bean(Person.class));
}
//Call function
Dataset<Person> dataset1 = transformToPerson(dataset);
dataset1.show();
我是 Spark 的新手 SQL。在执行其中一项训练任务时,我遇到了以下问题并且找不到答案(以下所有示例都有点笨拙,但出于演示目的应该还可以)。
我的应用读取镶木地板文件并根据其内容创建数据集:
DataFrame input = sqlContext.read().parquet("src/test/resources/integration/input/source.gz.parquet");
Dataset<Row> dataset = input.as(RowEncoder$.MODULE$.apply(input.schema()));
dataset.show() 调用结果:
+------------+----------------+--------+
+ Names + Gender + Age +
+------------+----------------+--------+
| Jack, Jill | Male, Female | 30, 25 |
然后我将数据集转换为一个新的数据集,其中包含 Person 类型:
public static Dataset<Person> transformToPerson(Dataset<Row> rawData) {
return rawData
.flatMap((Row sourceRow) -> {
// code to parse an input row and split person data goes here
Person person1 = new Person(name1, gender1, age1);
Person person2 = new Person(name2, gender2, age2);
return Arrays.asList(person1, person2);
}, Encoders.bean(Person.class));
}
哪里
public abstract class Human implements Serializable {
protected String name;
protected String gender;
// getters/setters go here
// default constructor + constructor with the name and gender params
}
public class Person extends Human {
private String age;
// getters/setters for the age param go here
// default constructor + constructor with the age, name and gender params
// overriden toString() method which returns the string: (<name>, <gender>, <age>)
}
最后,当我显示数据集的内容时,我希望看到
+------------+----------------+--------+
+ name + gender + age +
+------------+----------------+--------+
| Jack | Male | 30 |
| Jill | Femail | 25 |
不过,我明白了
+-------------------+----------------+--------+
+ name + gender + age +
+-------------------+----------------+--------+
|(Jack, Male, 30) | | |
|(Jill, Femail, 25) | | |
这是 toString() 方法的结果,而 header 是正确的。 我相信编码器有问题,就好像我使用 Encoders.javaSerizlization(T) 或 Encoders.kryo(T) 它显示
+------------------+
+ value +
+------------------+
|(Jack, Male, 30) |
|(Jill, Femail, 25)|
我最担心的是编码器的不正确使用可能会导致不正确的 SerDe and/or 性能损失。 在我能找到的所有 Spark Java 示例中,我看不出有什么特别之处...
你能告诉我哪里做错了吗?
更新 1
这是我项目的依赖项:
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.10</artifactId>
<version>1.6.2</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.10</artifactId>
<version>1.6.2</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive_2.10</artifactId>
<version>1.6.2</version>
</dependency>
解决方案
根据abaghel的建议,我将版本升级到2.0.2(请注意2.0.0版本有
只是为了分享,使用
现在一切正常。
您使用的 Spark 是什么版本?您提供的 flatMap 方法未使用 2.2.0 版进行编译。 Return 所需类型为 Iterator<Person>
。请使用下面的 FlatMapFunction,您将获得所需的输出。
public static Dataset<Person> transformToPerson(Dataset<Row> rawData) {
return rawData.flatMap(row -> {
String[] nameArr = row.getString(0).split(",");
String[] genArr = row.getString(1).split(",");
String[] ageArr = row.getString(2).split(",");
Person person1 = new Person(nameArr[0], genArr[0], ageArr[0]);
Person person2 = new Person(nameArr[1], genArr[1], ageArr[1]);
return Arrays.asList(person1, person2).iterator();
}, Encoders.bean(Person.class));
}
//Call function
Dataset<Person> dataset1 = transformToPerson(dataset);
dataset1.show();