在 java Spark 中尝试 zipWithIndex 时出错
Error when trying zipWithIndex in java Spark
我尝试在 spark
中使用 zipWithIndex
添加具有行号的列,如下所示
val df = sc.parallelize(Seq((1.0, 2.0), (0.0, -1.0), (3.0, 4.0), (6.0, -2.3))).toDF("x", "y")
val rddzip = df.rdd.zipWithIndex;
val newSchema = StructType(df.schema.fields ++ Array(StructField("rowid", LongType, false)))
val dfZippedWithId = spark.createDataFrame(rddzip.map{ case (row, index) => Row.fromSeq(row.toSeq ++ Array(index))}, newSchema)
但是我在 JAVA 中尝试做的事情如下
JavaRDD<Row> rdd = (JavaRDD) df.toJavaRDD().zipWithIndex().map(t -> {
Row r = t._1;
Long index = t._2 + 1;
ArrayList<Object> list = new ArrayList<>();
for(Object item: JavaConverters.seqAsJavaListConverter(r.toSeq()).asJava()) {
list.add(item);
}
return RowFactory.create(JavaConverters.seqAsJavaListConverter(t._1.toSeq()).asJava().add(t._2));
});
StructType newSchema = df.schema()
.add(new StructField(name, DataTypes.LongType, true, Metadata.empty()));
return df.sparkSession().createDataFrame(rdd, newSchema);
我得到以下错误
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 23.0 failed 1 times, most recent failure: Lost task 0.0 in stage 23.0 (T
ID 523, localhost, executor driver): java.lang.UnsupportedOperationException
at java.util.AbstractList.add(Unknown Source)
at java.util.AbstractList.add(Unknown Source)
at com.nielsen.media.mediaView.adintel.pivot.datareader.AIReportViewerProcessor.lambda$zipWithIndex45ab51(AIReportViewerProcessor.java:3071)
at org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction.apply(JavaPairRDD.scala:1040)
at scala.collection.Iterator$$anon.next(Iterator.scala:410)
at scala.collection.Iterator$$anon.next(Iterator.scala:410)
at scala.collection.Iterator$$anon.next(Iterator.scala:410)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$$anon.hasNext(WholeStageCodegenExec.scala:636)
at scala.collection.Iterator$$anon.hasNext(Iterator.scala:409)
at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
at org.apache.spark.scheduler.Task.run(Task.scala:121)
at org.apache.spark.executor.Executor$TaskRunner$$anonfun.apply(Executor.scala:408)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
at java.lang.Thread.run(Unknown Source)
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1889)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage.apply(DAGScheduler.scala:1877)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage.apply(DAGScheduler.scala:1876)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
有什么帮助吗?
在 scala 版本中,您传递给 spark.createDataFrame
RDD[Row]
,在 java 中,您传递的是 JavaPairRDD
,您应该将其映射到 JavaRDD[Row]
。
Dataset<Row> df = ss.range(10).toDF();
df.show();
JavaPairRDD<Row, Long> rddzip = df.toJavaRDD().zipWithIndex();
JavaRDD<Row> rdd = rddzip.map(s->{
Row r = s._1;
Object[] arr = new Object[r.size()+1];
for (int i = 0; i < arr.length-1; i++) {
arr[i] = r.get(i);
}
arr[arr.length-1] = s._2;
return RowFactory.create(arr);
});
StructType newSchema = df.schema().add(new StructField("rowid",
DataTypes.LongType, false, Metadata.empty()));
Dataset<Row> df2 = ss.createDataFrame(rdd,newSchema);
df2.show();
+---+
| id|
+---+
| 0|
| 1|
| 2|
| 3|
| 4|
| 5|
| 6|
| 7|
| 8|
| 9|
+---+
+---+-----+
| id|rowid|
+---+-----+
| 0| 0|
| 1| 1|
| 2| 2|
| 3| 3|
| 4| 4|
| 5| 5|
| 6| 6|
| 7| 7|
| 8| 8|
| 9| 9|
+---+-----+
我尝试在 spark
中使用zipWithIndex
添加具有行号的列,如下所示
val df = sc.parallelize(Seq((1.0, 2.0), (0.0, -1.0), (3.0, 4.0), (6.0, -2.3))).toDF("x", "y")
val rddzip = df.rdd.zipWithIndex;
val newSchema = StructType(df.schema.fields ++ Array(StructField("rowid", LongType, false)))
val dfZippedWithId = spark.createDataFrame(rddzip.map{ case (row, index) => Row.fromSeq(row.toSeq ++ Array(index))}, newSchema)
但是我在 JAVA 中尝试做的事情如下
JavaRDD<Row> rdd = (JavaRDD) df.toJavaRDD().zipWithIndex().map(t -> {
Row r = t._1;
Long index = t._2 + 1;
ArrayList<Object> list = new ArrayList<>();
for(Object item: JavaConverters.seqAsJavaListConverter(r.toSeq()).asJava()) {
list.add(item);
}
return RowFactory.create(JavaConverters.seqAsJavaListConverter(t._1.toSeq()).asJava().add(t._2));
});
StructType newSchema = df.schema()
.add(new StructField(name, DataTypes.LongType, true, Metadata.empty()));
return df.sparkSession().createDataFrame(rdd, newSchema);
我得到以下错误
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 23.0 failed 1 times, most recent failure: Lost task 0.0 in stage 23.0 (T
ID 523, localhost, executor driver): java.lang.UnsupportedOperationException
at java.util.AbstractList.add(Unknown Source)
at java.util.AbstractList.add(Unknown Source)
at com.nielsen.media.mediaView.adintel.pivot.datareader.AIReportViewerProcessor.lambda$zipWithIndex45ab51(AIReportViewerProcessor.java:3071)
at org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction.apply(JavaPairRDD.scala:1040)
at scala.collection.Iterator$$anon.next(Iterator.scala:410)
at scala.collection.Iterator$$anon.next(Iterator.scala:410)
at scala.collection.Iterator$$anon.next(Iterator.scala:410)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$$anon.hasNext(WholeStageCodegenExec.scala:636)
at scala.collection.Iterator$$anon.hasNext(Iterator.scala:409)
at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
at org.apache.spark.scheduler.Task.run(Task.scala:121)
at org.apache.spark.executor.Executor$TaskRunner$$anonfun.apply(Executor.scala:408)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
at java.lang.Thread.run(Unknown Source)
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1889)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage.apply(DAGScheduler.scala:1877)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage.apply(DAGScheduler.scala:1876)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
有什么帮助吗?
在 scala 版本中,您传递给 spark.createDataFrame
RDD[Row]
,在 java 中,您传递的是 JavaPairRDD
,您应该将其映射到 JavaRDD[Row]
。
Dataset<Row> df = ss.range(10).toDF();
df.show();
JavaPairRDD<Row, Long> rddzip = df.toJavaRDD().zipWithIndex();
JavaRDD<Row> rdd = rddzip.map(s->{
Row r = s._1;
Object[] arr = new Object[r.size()+1];
for (int i = 0; i < arr.length-1; i++) {
arr[i] = r.get(i);
}
arr[arr.length-1] = s._2;
return RowFactory.create(arr);
});
StructType newSchema = df.schema().add(new StructField("rowid",
DataTypes.LongType, false, Metadata.empty()));
Dataset<Row> df2 = ss.createDataFrame(rdd,newSchema);
df2.show();
+---+
| id|
+---+
| 0|
| 1|
| 2|
| 3|
| 4|
| 5|
| 6|
| 7|
| 8|
| 9|
+---+
+---+-----+
| id|rowid|
+---+-----+
| 0| 0|
| 1| 1|
| 2| 2|
| 3| 3|
| 4| 4|
| 5| 5|
| 6| 6|
| 7| 7|
| 8| 8|
| 9| 9|
+---+-----+