如何使用 Java 和 Spark SQL 打印数据集中行的内容?
How can I print the content of rows in a Dataset using Java and the Spark SQL?
我想做一个简单的 Spark SQL 代码来读取名为 u.data
的文件,其中包含电影评级,创建 Dataset
的 Rows
,然后打印数据集的第一行。
我已经将文件读取为 JavaRDD
,并根据 ratingsObject
映射 RDD(该对象有两个参数,movieID
和 rating
).所以我只想打印此数据集中的第一行。
我正在使用 Java 语言和 Spark SQL。
public static void main(String[] args){
App obj = new App();
SparkSession spark = SparkSession.builder().appName("Java Spark SQL basic example").getOrCreate();
Map<Integer,String> movieNames = obj.loadMovieNames();
JavaRDD<String> lines = spark.read().textFile("hdfs:///ml-100k/u.data").javaRDD();
JavaRDD<MovieRatings> movies = lines.map(line -> {
String[] parts = line.split(" ");
MovieRatings ratingsObject = new MovieRatings();
ratingsObject.setMovieID(Integer.parseInt(parts[1].trim()));
ratingsObject.setRating(Integer.parseInt(parts[2].trim()));
return ratingsObject;
});
Dataset<Row> movieDataset = spark.createDataFrame(movies, MovieRatings.class);
Encoder<Integer> intEncoder = Encoders.INT();
Dataset<Integer> HUE = movieDataset.map(
new MapFunction<Row, Integer>(){
private static final long serialVersionUID = -5982149277350252630L;
@Override
public Integer call(Row row) throws Exception{
return row.getInt(0);
}
}, intEncoder
);
HUE.show();
//stop the session
spark.stop();
}
我已经尝试了很多我找到的可能的解决方案,但他们都遇到了同样的错误:
Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 0.0 (TID 3, localhost, executor 1): java.lang.ArrayIndexOutOfBoundsException: 1
at com.ericsson.SparkMovieRatings.App.lambda$maine634467(App.java:63)
at org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction.apply(JavaPairRDD.scala:1040)
at scala.collection.Iterator$$anon.next(Iterator.scala:409)
at scala.collection.Iterator$$anon.next(Iterator.scala:409)
at scala.collection.Iterator$$anon.next(Iterator.scala:409)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$$anon.hasNext(WholeStageCodegenExec.scala:614)
at org.apache.spark.sql.execution.SparkPlan$$anonfun.apply(SparkPlan.scala:253)
at org.apache.spark.sql.execution.SparkPlan$$anonfun.apply(SparkPlan.scala:247)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$$anonfun$apply.apply(RDD.scala:830)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$$anonfun$apply.apply(RDD.scala:830)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:109)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
这里是 u.data
文件的示例:
196 242 3 881250949
186 302 3 891717742
22 377 1 878887116
244 51 2 880606923
166 346 1 886397596
298 474 4 884182806
115 265 2 881171488
253 465 5 891628467
305 451 3 886324817
6 86 3 883603013
62 257 2 879372434
286 1014 5 879781125
200 222 5 876042340
210 40 3 891035994
224 29 3 888104457
303 785 3 879485318
122 387 5 879270459
194 274 2 879539794
其中第一列代表deUserID
,第二列代表MovieID
,第三列代表rating
,最后一列代表时间戳
如前所述,您的数据未 space 分开。
我将向您展示两种可能的解决方案,第一种基于 RDD,第二种基于 spark sql,一般来说,就性能而言,这是最好的解决方案。
RDD(你应该使用内置类型来减少开销):
public class SparkDriver {
public static void main (String args[]) {
// Create a configuration object and set the name of
// the application
SparkConf conf = new SparkConf().setAppName("application_name");
// Create a spark Context object
JavaSparkContext context = new JavaSparkContext(conf);
// Create final rdd (suppose you have a text file)
JavaPairRDD<Integer,Integer> movieRatingRDD =
contextFile("u.data.txt")
.mapToPair(line -> {(
String[] tokens = line.split("\s+");
int movieID = Integer.parseInt(tokens[0]);
int rating = Integer.parseInt(tokens[1]);
return new Tuple2<Integer, Integer>(movieID, rating);});
// Keep in mind that take operation takes the first n elements
// and the order is the order of the file.
ArrayList<Tuple2<Integer, Integer> list = new ArrayList<>(movieRatingRDD.take(10));
System.out.println("MovieID\tRating");
for(tuple : list) {
System.out.println(tuple._1 + "\t" + tuple._2);
}
context.close();
}}
SQL
public class SparkDriver {
public static void main(String[] args) {
// Create spark session
SparkSession session = SparkSession.builder().appName("[Spark app sql version]").getOrCreate();
Dataset<MovieRatings> personsDataframe = session.read()
.format("tct")
.option("header", false)
.option("inferSchema", true)
.option("delimiter", "\s+")
.load("u.data.txt")
.map(row -> {
int movieID = row.getInteger(0);
int rating = row.getInteger(1);
return new MovieRatings(movieID, rating);
}).as(Encoders.bean(MovieRatings.class);
// Stop session
session.stop();
}
}
我想做一个简单的 Spark SQL 代码来读取名为 u.data
的文件,其中包含电影评级,创建 Dataset
的 Rows
,然后打印数据集的第一行。
我已经将文件读取为 JavaRDD
,并根据 ratingsObject
映射 RDD(该对象有两个参数,movieID
和 rating
).所以我只想打印此数据集中的第一行。
我正在使用 Java 语言和 Spark SQL。
public static void main(String[] args){
App obj = new App();
SparkSession spark = SparkSession.builder().appName("Java Spark SQL basic example").getOrCreate();
Map<Integer,String> movieNames = obj.loadMovieNames();
JavaRDD<String> lines = spark.read().textFile("hdfs:///ml-100k/u.data").javaRDD();
JavaRDD<MovieRatings> movies = lines.map(line -> {
String[] parts = line.split(" ");
MovieRatings ratingsObject = new MovieRatings();
ratingsObject.setMovieID(Integer.parseInt(parts[1].trim()));
ratingsObject.setRating(Integer.parseInt(parts[2].trim()));
return ratingsObject;
});
Dataset<Row> movieDataset = spark.createDataFrame(movies, MovieRatings.class);
Encoder<Integer> intEncoder = Encoders.INT();
Dataset<Integer> HUE = movieDataset.map(
new MapFunction<Row, Integer>(){
private static final long serialVersionUID = -5982149277350252630L;
@Override
public Integer call(Row row) throws Exception{
return row.getInt(0);
}
}, intEncoder
);
HUE.show();
//stop the session
spark.stop();
}
我已经尝试了很多我找到的可能的解决方案,但他们都遇到了同样的错误:
Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 0.0 (TID 3, localhost, executor 1): java.lang.ArrayIndexOutOfBoundsException: 1
at com.ericsson.SparkMovieRatings.App.lambda$maine634467(App.java:63)
at org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction.apply(JavaPairRDD.scala:1040)
at scala.collection.Iterator$$anon.next(Iterator.scala:409)
at scala.collection.Iterator$$anon.next(Iterator.scala:409)
at scala.collection.Iterator$$anon.next(Iterator.scala:409)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$$anon.hasNext(WholeStageCodegenExec.scala:614)
at org.apache.spark.sql.execution.SparkPlan$$anonfun.apply(SparkPlan.scala:253)
at org.apache.spark.sql.execution.SparkPlan$$anonfun.apply(SparkPlan.scala:247)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$$anonfun$apply.apply(RDD.scala:830)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$$anonfun$apply.apply(RDD.scala:830)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:109)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
这里是 u.data
文件的示例:
196 242 3 881250949
186 302 3 891717742
22 377 1 878887116
244 51 2 880606923
166 346 1 886397596
298 474 4 884182806
115 265 2 881171488
253 465 5 891628467
305 451 3 886324817
6 86 3 883603013
62 257 2 879372434
286 1014 5 879781125
200 222 5 876042340
210 40 3 891035994
224 29 3 888104457
303 785 3 879485318
122 387 5 879270459
194 274 2 879539794
其中第一列代表deUserID
,第二列代表MovieID
,第三列代表rating
,最后一列代表时间戳
如前所述,您的数据未 space 分开。 我将向您展示两种可能的解决方案,第一种基于 RDD,第二种基于 spark sql,一般来说,就性能而言,这是最好的解决方案。
RDD(你应该使用内置类型来减少开销):
public class SparkDriver { public static void main (String args[]) { // Create a configuration object and set the name of // the application SparkConf conf = new SparkConf().setAppName("application_name"); // Create a spark Context object JavaSparkContext context = new JavaSparkContext(conf); // Create final rdd (suppose you have a text file) JavaPairRDD<Integer,Integer> movieRatingRDD = contextFile("u.data.txt") .mapToPair(line -> {( String[] tokens = line.split("\s+"); int movieID = Integer.parseInt(tokens[0]); int rating = Integer.parseInt(tokens[1]); return new Tuple2<Integer, Integer>(movieID, rating);}); // Keep in mind that take operation takes the first n elements // and the order is the order of the file. ArrayList<Tuple2<Integer, Integer> list = new ArrayList<>(movieRatingRDD.take(10)); System.out.println("MovieID\tRating"); for(tuple : list) { System.out.println(tuple._1 + "\t" + tuple._2); } context.close(); }}
SQL
public class SparkDriver {
public static void main(String[] args) { // Create spark session SparkSession session = SparkSession.builder().appName("[Spark app sql version]").getOrCreate(); Dataset<MovieRatings> personsDataframe = session.read() .format("tct") .option("header", false) .option("inferSchema", true) .option("delimiter", "\s+") .load("u.data.txt") .map(row -> { int movieID = row.getInteger(0); int rating = row.getInteger(1); return new MovieRatings(movieID, rating); }).as(Encoders.bean(MovieRatings.class); // Stop session session.stop(); } }