将 JavaRDD<Tuple2<Object, long[]>> 转换为 Java 中的 Spark 数据集<Row>
Convert a JavaRDD<Tuple2<Object, long[]>> into a Spark Dataset<Row> in Java
在 Java(不是 Scala!)中,Spark 3.0.1 有一个 JavaRDD 实例对象 neighborIdsRDD
,它的类型是 JavaRDD<Tuple2<Object, long[]>>
.
我的与 JavaRDD 生成相关的部分代码如下:
GraphOps<String, String> graphOps = new GraphOps<>(graph, stringTag, stringTag);
JavaRDD<Tuple2<Object, long[]>> neighborIdsRDD = graphOps.collectNeighborIds(EdgeDirection.Either()).toJavaRDD();
我不得不使用 toJavaRDD()
得到一个 JavaRDD 因为 collectNeighborIds
returns 一个 org.apache.spark.graphx.VertexRDD<long[]>
对象 (VertexRDD doc).
但是,在我的应用程序的其余部分,我需要从 collectNeighborIds
对象构建一个 Spark Dataset<Row>
。
获得 JavaRDD<Tuple2<Object, long[]>> be converted into a Dataset<Row> 的可能性和最佳方法是什么?
评论更新:
我根据评论调整了代码:
GraphOps<String, String> graphOps = new GraphOps<>(graph, stringTag, stringTag);
JavaRDD<Tuple2<Object, long[]>> neighborIdsRDD = graphOps.collectNeighborIds(EdgeDirection.Either()).toJavaRDD();
System.out.println("VertexRDD neighborIdsRDD is:");
for (int i = 0; i < neighborIdsRDD.collect().size(); i++) {
System.out.println(
((Tuple2<Object, long[]>) neighborIdsRDD.collect().get(i))._1() + " -- " +
Arrays.toString(((Tuple2<Object, long[]>) neighborIdsRDD.collect().get(i))._2())
);
}
Dataset<Row> dr = spark_session.createDataFrame(neighborIdsRDD.rdd(), Tuple2.class);
System.out.println("converted Dataset<Row> is:");
dr.show();
但我得到一个空数据集如下:
VertexRDD neighborIdsRDD is:
4 -- [3]
1 -- [2, 3]
5 -- [3, 2]
2 -- [1, 3, 5]
3 -- [1, 2, 5, 4]
converted Dataset<Row> is:
++
||
++
||
||
||
||
||
++
我遇到了同样的情况,但幸运的是我找到了一个解决方案来取回 Dataframe。
解决方案代码在步骤 [1]
、[2]
和 [3]
中进行了注释。
GraphOps<String, String> graphOps = new GraphOps<>(graph, stringTag, stringTag);
System.out.println("VertexRDD neighborIdsRDD is:");
JavaRDD<Tuple2<Object, long[]>> neighborIdsRDD = graphOps.collectNeighborIds(EdgeDirection.Either()).toJavaRDD();
for (int i = 0; i < neighborIdsRDD.collect().size(); i++) {
System.out.println(
((Tuple2<Object, long[]>) neighborIdsRDD.collect().get(i))._1() + " -- " +
Arrays.toString(((Tuple2<Object, long[]>) neighborIdsRDD.collect().get(i))._2())
);
}
// [1] Define encoding schema
StructType graphStruct = new StructType(new StructField[]{
new StructField("father", DataTypes.LongType, false, Metadata.empty()),
new StructField("children", DataTypes.createArrayType(DataTypes.LongType), false, Metadata.empty()),
});
// [2] Build a JavaRDD<Row> from a JavaRDD<Tuple2<Object,long[]>>
JavaRDD<Row> dr = neighborIdsRDD.map(tupla -> RowFactory.create(tupla._1(), tupla._2()));
// [3] Finally build the reqired Dataframe<Row>
Dataset<Row> dsr = spark_session.createDataFrame(dr.rdd(), graphStruct);
System.out.println("DATASET IS:");
dsr.show();
打印输出:
VertexRDD neighborIdsRDD is:
4 -- [3]
1 -- [2, 3]
5 -- [3, 2]
2 -- [1, 3, 5]
3 -- [1, 2, 5, 4]
DATASET IS:
+------+------------+
|father| children|
+------+------------+
| 4| [3]|
| 1| [2, 3]|
| 5| [3, 2]|
| 2| [1, 3, 5]|
| 3|[1, 2, 5, 4]|
+------+------------+
在 Java(不是 Scala!)中,Spark 3.0.1 有一个 JavaRDD 实例对象 neighborIdsRDD
,它的类型是 JavaRDD<Tuple2<Object, long[]>>
.
我的与 JavaRDD 生成相关的部分代码如下:
GraphOps<String, String> graphOps = new GraphOps<>(graph, stringTag, stringTag);
JavaRDD<Tuple2<Object, long[]>> neighborIdsRDD = graphOps.collectNeighborIds(EdgeDirection.Either()).toJavaRDD();
我不得不使用 toJavaRDD()
得到一个 JavaRDD 因为 collectNeighborIds
returns 一个 org.apache.spark.graphx.VertexRDD<long[]>
对象 (VertexRDD doc).
但是,在我的应用程序的其余部分,我需要从 collectNeighborIds
对象构建一个 Spark Dataset<Row>
。
获得 JavaRDD<Tuple2<Object, long[]>> be converted into a Dataset<Row> 的可能性和最佳方法是什么?
评论更新:
我根据评论调整了代码:
GraphOps<String, String> graphOps = new GraphOps<>(graph, stringTag, stringTag);
JavaRDD<Tuple2<Object, long[]>> neighborIdsRDD = graphOps.collectNeighborIds(EdgeDirection.Either()).toJavaRDD();
System.out.println("VertexRDD neighborIdsRDD is:");
for (int i = 0; i < neighborIdsRDD.collect().size(); i++) {
System.out.println(
((Tuple2<Object, long[]>) neighborIdsRDD.collect().get(i))._1() + " -- " +
Arrays.toString(((Tuple2<Object, long[]>) neighborIdsRDD.collect().get(i))._2())
);
}
Dataset<Row> dr = spark_session.createDataFrame(neighborIdsRDD.rdd(), Tuple2.class);
System.out.println("converted Dataset<Row> is:");
dr.show();
但我得到一个空数据集如下:
VertexRDD neighborIdsRDD is:
4 -- [3]
1 -- [2, 3]
5 -- [3, 2]
2 -- [1, 3, 5]
3 -- [1, 2, 5, 4]
converted Dataset<Row> is:
++
||
++
||
||
||
||
||
++
我遇到了同样的情况,但幸运的是我找到了一个解决方案来取回 Dataframe。
解决方案代码在步骤 [1]
、[2]
和 [3]
中进行了注释。
GraphOps<String, String> graphOps = new GraphOps<>(graph, stringTag, stringTag);
System.out.println("VertexRDD neighborIdsRDD is:");
JavaRDD<Tuple2<Object, long[]>> neighborIdsRDD = graphOps.collectNeighborIds(EdgeDirection.Either()).toJavaRDD();
for (int i = 0; i < neighborIdsRDD.collect().size(); i++) {
System.out.println(
((Tuple2<Object, long[]>) neighborIdsRDD.collect().get(i))._1() + " -- " +
Arrays.toString(((Tuple2<Object, long[]>) neighborIdsRDD.collect().get(i))._2())
);
}
// [1] Define encoding schema
StructType graphStruct = new StructType(new StructField[]{
new StructField("father", DataTypes.LongType, false, Metadata.empty()),
new StructField("children", DataTypes.createArrayType(DataTypes.LongType), false, Metadata.empty()),
});
// [2] Build a JavaRDD<Row> from a JavaRDD<Tuple2<Object,long[]>>
JavaRDD<Row> dr = neighborIdsRDD.map(tupla -> RowFactory.create(tupla._1(), tupla._2()));
// [3] Finally build the reqired Dataframe<Row>
Dataset<Row> dsr = spark_session.createDataFrame(dr.rdd(), graphStruct);
System.out.println("DATASET IS:");
dsr.show();
打印输出:
VertexRDD neighborIdsRDD is:
4 -- [3]
1 -- [2, 3]
5 -- [3, 2]
2 -- [1, 3, 5]
3 -- [1, 2, 5, 4]
DATASET IS:
+------+------------+
|father| children|
+------+------------+
| 4| [3]|
| 1| [2, 3]|
| 5| [3, 2]|
| 2| [1, 3, 5]|
| 3|[1, 2, 5, 4]|
+------+------------+