将模式应用于 java 对象的 Spark 数据集
Applying a schema to a Spark's Dataset of a java object
这里有一个类似的问题:
但是我面临的问题是我有一个已经预定义的 Dataset<Obj1>
并且我想定义一个模式来匹配它的数据成员。最终目标是能够连接两个 java 对象。
示例代码:
Dataset<Row> rowDataset = spark.getSpark().sqlContext().createDataFrame(rowRDD, schema).toDF();
Dataset<MyObj> objResult = rowDataset.map((MapFunction<Row, MyObj>) row ->
new MyObj(
row.getInt(row.fieldIndex("field1")),
row.isNullAt(row.fieldIndex("field2")) ? "" : row.getString(row.fieldIndex("field2")),
row.isNullAt(row.fieldIndex("field3")) ? "" : row.getString(row.fieldIndex("field3")),
row.isNullAt(row.fieldIndex("field4")) ? "" : row.getString(row.fieldIndex("field4"))
), Encoders.javaSerialization(MyObj.class));
如果我正在打印行数据集的模式,我会得到预期的模式:
rowDataset.printSchema();
root
|-- field1: integer (nullable = false)
|-- field2: string (nullable = false)
|-- field3: string (nullable = false)
|-- field4: string (nullable = false)
如果我正在打印对象数据集,我将丢失实际架构
objResult.printSchema();
root
|-- value: binary (nullable = true)
问题是如何为 Dataset<MyObj>
应用模式?
下面是代码片段,我试过并且 spark 的行为符合预期,看来你的问题的根本原因不是 map 函数。
SparkSession session = SparkSession.builder().config(conf).getOrCreate();
Dataset<Row> ds = session.read().text("<some path>");
Encoder<Employee> employeeEncode = Encoders.bean(Employee.class);
ds.map(new MapFunction<Row, Employee>() {
@Override
public Employee call(Row value) throws Exception {
return new Employee(value.getString(0).split(","));
}
}, employeeEncode).printSchema();
输出:
root
|-- age: integer (nullable = true)
|-- name: string (nullable = true)
//员工Bean
public class Employee {
public String name;
public Integer age;
public Employee(){
}
public Employee(String [] args){
this.name=args[0];
this.age=Integer.parseInt(args[1]);
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public Integer getAge() {
return age;
}
public void setAge(Integer age) {
this.age = age;
}
}
这里有一个类似的问题:
但是我面临的问题是我有一个已经预定义的 Dataset<Obj1>
并且我想定义一个模式来匹配它的数据成员。最终目标是能够连接两个 java 对象。
示例代码:
Dataset<Row> rowDataset = spark.getSpark().sqlContext().createDataFrame(rowRDD, schema).toDF();
Dataset<MyObj> objResult = rowDataset.map((MapFunction<Row, MyObj>) row ->
new MyObj(
row.getInt(row.fieldIndex("field1")),
row.isNullAt(row.fieldIndex("field2")) ? "" : row.getString(row.fieldIndex("field2")),
row.isNullAt(row.fieldIndex("field3")) ? "" : row.getString(row.fieldIndex("field3")),
row.isNullAt(row.fieldIndex("field4")) ? "" : row.getString(row.fieldIndex("field4"))
), Encoders.javaSerialization(MyObj.class));
如果我正在打印行数据集的模式,我会得到预期的模式:
rowDataset.printSchema();
root
|-- field1: integer (nullable = false)
|-- field2: string (nullable = false)
|-- field3: string (nullable = false)
|-- field4: string (nullable = false)
如果我正在打印对象数据集,我将丢失实际架构
objResult.printSchema();
root
|-- value: binary (nullable = true)
问题是如何为 Dataset<MyObj>
应用模式?
下面是代码片段,我试过并且 spark 的行为符合预期,看来你的问题的根本原因不是 map 函数。
SparkSession session = SparkSession.builder().config(conf).getOrCreate();
Dataset<Row> ds = session.read().text("<some path>");
Encoder<Employee> employeeEncode = Encoders.bean(Employee.class);
ds.map(new MapFunction<Row, Employee>() {
@Override
public Employee call(Row value) throws Exception {
return new Employee(value.getString(0).split(","));
}
}, employeeEncode).printSchema();
输出:
root
|-- age: integer (nullable = true)
|-- name: string (nullable = true)
//员工Bean
public class Employee {
public String name;
public Integer age;
public Employee(){
}
public Employee(String [] args){
this.name=args[0];
this.age=Integer.parseInt(args[1]);
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public Integer getAge() {
return age;
}
public void setAge(Integer age) {
this.age = age;
}
}