如何将两个 spark 数据集连接到一个具有 java 个对象的数据集?
How to join two spark dataset to one with java objects?
我在 spark 中连接两个数据集时遇到一点问题,我有这个:
SparkConf conf = new SparkConf()
.setAppName("MyFunnyApp")
.setMaster("local[*]");
SparkSession spark = SparkSession
.builder()
.config(conf)
.config("spark.debug.maxToStringFields", 150)
.getOrCreate();
//...
//Do stuff
//...
Encoder<MyOwnObject1> encoderObject1 = Encoders.bean(MyOwnObject1.class);
Encoder<MyOwnObject2> encoderObject2 = Encoders.bean(MyOwnObject2.class);
Dataset<MyOwnObject1> object1DS = spark.read()
.option("header","true")
.option("delimiter",";")
.option("inferSchema","true")
.csv(pathToFile1)
.as(encoderObject1);
Dataset<MyOwnObject2> object2DS = spark.read()
.option("header","true")
.option("delimiter",";")
.option("inferSchema","true")
.csv(pathToFile2)
.as(encoderObject2);
我可以打印架构并正确显示它。
//Here start the problem
Dataset<Tuple2<MyOwnObject1, MyOwnObject2>> joinObjectDS =
object1DS.join(object2DS, object1DS.col("column01")
.equalTo(object2DS.col("column01")))
.as(Encoders.tuple(MyOwnObject1,MyOwnObject2));
最后一行无法加入并给我这个错误:
Exception in thread "main" org.apache.spark.sql.AnalysisException: Try to map struct<"LIST WITH ALL VARS FROM TWO OBJECT"> to Tuple2, but failed as the number of fields does not line up.;
没错,因为 Tuple2 (object2) 没有所有变量...
然后我尝试了这个:
Dataset<Tuple2<MyOwnObject1, MyOwnObject2>> joinObjectDS = object1DS
.joinWith(object2DS, object1DS
.col("column01")
.equalTo(object2DS.col("column01")));
并且工作正常!但是,我需要一个没有元组的新数据集,我有一个 object3,它有一些来自 object1 和 object2 的变量,然后我遇到了这个问题:
Encoder<MyOwnObject3> encoderObject3 = Encoders.bean(MyOwnObject3.class);
Dataset<MyOwnObject3> object3DS = joinObjectDS.map(tupleObject1Object2 -> {
MyOwnObject1 myOwnObject1 = tupleObject1Object2._1();
MyOwnObject2 myOwnObject2 = tupleObject1Object2._2();
MyOwnObject3 myOwnObject3 = new MyOwnObject3(); //Sets all vars with start values
//...
//Sets data from object 1 and 2 to 3.
//...
return myOwnObject3;
}, encoderObject3);
失败!...这是错误:
17/05/10 12:17:43 ERROR CodeGenerator: failed to compile: org.codehaus.commons.compiler.CompileException: File 'generated.java', Line 593, Column 72: A method named "toString" is not declared in any enclosing class nor any supertype, nor through a static import
超过数千行错误...
我能做什么?我试过:
- 仅使用 String、int(或 Integer)和 double(或 Double)(不再)制作我的对象
- 使用不同的编码器,如 kryo 或 java序列化
- 使用 JavaRDD(有效!但非常慢)并使用 Dataframes with Rows(有效,但我需要更改许多对象)
- 我所有的 java 对象都是可序列化的
- 使用 sparks 2.1.0 和 2.1.1,现在我的 pom.xml
上有 2.1.1
我想使用数据集,使用数据帧的速度和 JavaRDD 的对象语法...
帮忙?
谢谢
终于找到解决办法了,
当我的代码创建数据集时,选项 inferSchema 出现问题。我有一个字符串列,选项 inferSchema return 我是一个整数列,因为所有值都是 "numeric",但我需要将它们用作字符串(如“0001”、“0002”...)我需要做一个模式,但我有很多变量,然后我用我所有的 类:
写这个
List<StructField> fieldsObject1 = new ArrayList<>();
for (Field field : MyOwnObject1.class.getDeclaredFields()) {
fieldsObject1.add(DataTypes.createStructField(
field.getName(),
CatalystSqlParser.parseDataType(field.getType().getSimpleName()),
true)
);
}
StructType schemaObject1 = DataTypes.createStructType(fieldsObject1);
Dataset<MyOwnObject1> object1DS = spark.read()
.option("header","true")
.option("delimiter",";")
.schema(schemaObject1)
.csv(pathToFile1)
.as(encoderObject1);
工作正常。
"best" 解决方案是这样的:
Dataset<MyOwnObject1> object1DS = spark.read()
.option("header","true")
.option("delimiter",";")
.schema(encoderObject1.schema())
.csv(pathToFile1)
.as(encoderObject1);
但是 encoderObject1.schema() return 给我一个 Schema,其中的变量按字母顺序排列,而不是按原始顺序排列,然后当我读取 csv 时此选项失败。也许编码器应该 return 具有原始顺序而不是字母顺序的 vars 的模式
我在 spark 中连接两个数据集时遇到一点问题,我有这个:
SparkConf conf = new SparkConf()
.setAppName("MyFunnyApp")
.setMaster("local[*]");
SparkSession spark = SparkSession
.builder()
.config(conf)
.config("spark.debug.maxToStringFields", 150)
.getOrCreate();
//...
//Do stuff
//...
Encoder<MyOwnObject1> encoderObject1 = Encoders.bean(MyOwnObject1.class);
Encoder<MyOwnObject2> encoderObject2 = Encoders.bean(MyOwnObject2.class);
Dataset<MyOwnObject1> object1DS = spark.read()
.option("header","true")
.option("delimiter",";")
.option("inferSchema","true")
.csv(pathToFile1)
.as(encoderObject1);
Dataset<MyOwnObject2> object2DS = spark.read()
.option("header","true")
.option("delimiter",";")
.option("inferSchema","true")
.csv(pathToFile2)
.as(encoderObject2);
我可以打印架构并正确显示它。
//Here start the problem
Dataset<Tuple2<MyOwnObject1, MyOwnObject2>> joinObjectDS =
object1DS.join(object2DS, object1DS.col("column01")
.equalTo(object2DS.col("column01")))
.as(Encoders.tuple(MyOwnObject1,MyOwnObject2));
最后一行无法加入并给我这个错误:
Exception in thread "main" org.apache.spark.sql.AnalysisException: Try to map struct<"LIST WITH ALL VARS FROM TWO OBJECT"> to Tuple2, but failed as the number of fields does not line up.;
没错,因为 Tuple2 (object2) 没有所有变量...
然后我尝试了这个:
Dataset<Tuple2<MyOwnObject1, MyOwnObject2>> joinObjectDS = object1DS
.joinWith(object2DS, object1DS
.col("column01")
.equalTo(object2DS.col("column01")));
并且工作正常!但是,我需要一个没有元组的新数据集,我有一个 object3,它有一些来自 object1 和 object2 的变量,然后我遇到了这个问题:
Encoder<MyOwnObject3> encoderObject3 = Encoders.bean(MyOwnObject3.class);
Dataset<MyOwnObject3> object3DS = joinObjectDS.map(tupleObject1Object2 -> {
MyOwnObject1 myOwnObject1 = tupleObject1Object2._1();
MyOwnObject2 myOwnObject2 = tupleObject1Object2._2();
MyOwnObject3 myOwnObject3 = new MyOwnObject3(); //Sets all vars with start values
//...
//Sets data from object 1 and 2 to 3.
//...
return myOwnObject3;
}, encoderObject3);
失败!...这是错误:
17/05/10 12:17:43 ERROR CodeGenerator: failed to compile: org.codehaus.commons.compiler.CompileException: File 'generated.java', Line 593, Column 72: A method named "toString" is not declared in any enclosing class nor any supertype, nor through a static import
超过数千行错误...
我能做什么?我试过:
- 仅使用 String、int(或 Integer)和 double(或 Double)(不再)制作我的对象
- 使用不同的编码器,如 kryo 或 java序列化
- 使用 JavaRDD(有效!但非常慢)并使用 Dataframes with Rows(有效,但我需要更改许多对象)
- 我所有的 java 对象都是可序列化的
- 使用 sparks 2.1.0 和 2.1.1,现在我的 pom.xml 上有 2.1.1
我想使用数据集,使用数据帧的速度和 JavaRDD 的对象语法...
帮忙?
谢谢
终于找到解决办法了,
当我的代码创建数据集时,选项 inferSchema 出现问题。我有一个字符串列,选项 inferSchema return 我是一个整数列,因为所有值都是 "numeric",但我需要将它们用作字符串(如“0001”、“0002”...)我需要做一个模式,但我有很多变量,然后我用我所有的 类:
写这个List<StructField> fieldsObject1 = new ArrayList<>();
for (Field field : MyOwnObject1.class.getDeclaredFields()) {
fieldsObject1.add(DataTypes.createStructField(
field.getName(),
CatalystSqlParser.parseDataType(field.getType().getSimpleName()),
true)
);
}
StructType schemaObject1 = DataTypes.createStructType(fieldsObject1);
Dataset<MyOwnObject1> object1DS = spark.read()
.option("header","true")
.option("delimiter",";")
.schema(schemaObject1)
.csv(pathToFile1)
.as(encoderObject1);
工作正常。
"best" 解决方案是这样的:
Dataset<MyOwnObject1> object1DS = spark.read()
.option("header","true")
.option("delimiter",";")
.schema(encoderObject1.schema())
.csv(pathToFile1)
.as(encoderObject1);
但是 encoderObject1.schema() return 给我一个 Schema,其中的变量按字母顺序排列,而不是按原始顺序排列,然后当我读取 csv 时此选项失败。也许编码器应该 return 具有原始顺序而不是字母顺序的 vars 的模式