无法在 flatMap 之后对数据进行分组 (Spark SQL)

Cannot group by with data after flatMap (Spark SQL)

我有一个Spark任务SQL,原始数据是:

+---------+---------+--------------------+--------+------------+---------+----------+--------------+
|InvoiceNo|StockCode|         Description|Quantity| InvoiceDate|UnitPrice|CustomerID|       Country|
+---------+---------+--------------------+--------+------------+---------+----------+--------------+
|   536365|   85123A|WHITE HANGING HEA...|       6|12/1/10 8:26|     2.55|     17850|United Kingdom|
|   536365|    71053| WHITE METAL LANTERN|       6|12/1/10 8:26|     3.39|     17850|United Kingdom|
|   536365|   84406B|CREAM CUPID HEART...|       8|12/1/10 8:26|     2.75|     17850|United Kingdom|
|   536365|   84029G|KNITTED UNION FLA...|       6|12/1/10 8:26|     3.39|     17850|United Kingdom|
|   536365|   84029E|RED WOOLLY HOTTIE...|       6|12/1/10 8:26|     3.39|     17850|United Kingdom|
|   536365|    22752|SET 7 BABUSHKA NE...|       2|12/1/10 8:26|     7.65|     17850|United Kingdom|
|   536365|    21730|GLASS STAR FROSTE...|       6|12/1/10 8:26|     4.25|     17850|United Kingdom|
|   536366|    22633|HAND WARMER UNION...|       6|12/1/10 8:28|     1.85|     17850|United Kingdom|
|   536366|    22632|HAND WARMER RED P...|       6|12/1/10 8:28|     1.85|     17850|United Kingdom|
|   536367|    84879|ASSORTED COLOUR B...|      32|12/1/10 8:34|     1.69|     13047|United Kingdom|
|   536367|    22745|POPPY'S PLAYHOUSE...|       6|12/1/10 8:34|      2.1|     13047|United Kingdom|
|   536367|    22748|POPPY'S PLAYHOUSE...|       6|12/1/10 8:34|      2.1|     13047|United Kingdom|
|   536367|    22749|FELTCRAFT PRINCES...|       8|12/1/10 8:34|     3.75|     13047|United Kingdom|
|   536367|    22310|IVORY KNITTED MUG...|       6|12/1/10 8:34|     1.65|     13047|United Kingdom|
|   536367|    84969|BOX OF 6 ASSORTED...|       6|12/1/10 8:34|     4.25|     13047|United Kingdom|
|   536367|    22623|BOX OF VINTAGE JI...|       3|12/1/10 8:34|     4.95|     13047|United Kingdom|
|   536367|    22622|BOX OF VINTAGE AL...|       2|12/1/10 8:34|     9.95|     13047|United Kingdom|
|   536367|    21754|HOME BUILDING BLO...|       3|12/1/10 8:34|     5.95|     13047|United Kingdom|
|   536367|    21755|LOVE BUILDING BLO...|       3|12/1/10 8:34|     5.95|     13047|United Kingdom|
|   536367|    21777|RECIPE BOX WITH M...|       4|12/1/10 8:34|     7.95|     13047|United Kingdom|
+---------+---------+--------------------+--------+------------+---------+----------+--------------+

在我的任务中,我想统计哪个词是在描述字段中出现最多的词。所以我做了以下操作:使用 flatMap 通过用空格分隔 Description 字段从原始 DataFrame 创建一个新的 DataFrame 然后构建一个新的 table,下面是一个新的 table:

+------+-------+---+
|number|   word|lit|
+------+-------+---+
|     0|  WHITE|  1|
|     1|HANGING|  1|
|     2|  HEART|  1|
|     3|T-LIGHT|  1|
|     4| HOLDER|  1|
|     5|  WHITE|  1|
|     6|  METAL|  1|
|     7|LANTERN|  1|
|     8|  CREAM|  1|
|     9|  CUPID|  1|
|    10| HEARTS|  1|
|    11|   COAT|  1|
|    12| HANGER|  1|
|    13|KNITTED|  1|
|    14|  UNION|  1|
|    15|   FLAG|  1|
|    16|    HOT|  1|
|    17|  WATER|  1|
|    18| BOTTLE|  1|
|    19|    RED|  1|
+------+-------+---+

这是我的代码:

SparkSession spark = SparkSession.builder().appName("Part-4").master("local").getOrCreate();
Dataset<Row> data = spark.read()
        .option("inferSchema", true)
        .option("header", true)
        .csv("hdfs://localhost:9000/retails.csv");

data.flatMap(new FlatMapFunction<Row, Row>() {
    private static final long serialVersionUID = 1L;
    private int cnt = 0;
    
    @Override
    public Iterator<Row> call(Row r) throws Exception {
        List<String> listItem = Arrays.asList(r.getString(2).split(" "));
        
        List<Row> listItemRow = new ArrayList<Row>();
        for (String item : listItem) {
            listItemRow.add(RowFactory.create(cnt, item, 1));
            cnt++;
        }
        
        return listItemRow.iterator();
    }
}, RowEncoder.apply(new StructType().add("number", "integer").add("word", "string").add("lit", "integer"))).createOrReplaceTempView("data");

spark.sql("select * from data").show();

我有一个问题,如果我分组或执行其他复杂的 SQL 操作,程序会出错。

这是我分组时的代码:spark.sql("select word, count(lit) from data group by word").show();

这是我的错误:

java.lang.NullPointerException
        at com.spark.part_4.Main.call(Main.java:33)
        at com.spark.part_4.Main.call(Main.java:27)
        at org.apache.spark.sql.Dataset.$anonfun$flatMap(Dataset.scala:2876)
        at scala.collection.Iterator$$anon.nextCur(Iterator.scala:484)
        at scala.collection.Iterator$$anon.hasNext(Iterator.scala:490)
        at scala.collection.Iterator$$anon.hasNext(Iterator.scala:458)
        at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.agg_doAggregateWithKeys_0$(Unknown Source)
        at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
        at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
        at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon.hasNext(WholeStageCodegenExec.scala:755)
        at scala.collection.Iterator$$anon.hasNext(Iterator.scala:458)
        at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:132)
        at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
        at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
        at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
        at org.apache.spark.scheduler.Task.run(Task.scala:131)
        at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run(Executor.scala:497)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
        at java.base/java.lang.Thread.run(Thread.java:829)
21/12/03 00:08:39 WARN TaskSetManager: Lost task 0.0 in stage 2.0 (TID 2) (192.168.1.10 executor driver): java.lang.NullPointerException
        at com.spark.part_4.Main.call(Main.java:33)
        at com.spark.part_4.Main.call(Main.java:27)
        at org.apache.spark.sql.Dataset.$anonfun$flatMap(Dataset.scala:2876)
        at scala.collection.Iterator$$anon.nextCur(Iterator.scala:484)
        at scala.collection.Iterator$$anon.hasNext(Iterator.scala:490)
        at scala.collection.Iterator$$anon.hasNext(Iterator.scala:458)
        at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.agg_doAggregateWithKeys_0$(Unknown Source)
        at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
        at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
        at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon.hasNext(WholeStageCodegenExec.scala:755)
        at scala.collection.Iterator$$anon.hasNext(Iterator.scala:458)
        at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:132)
        at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
        at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
        at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
        at org.apache.spark.scheduler.Task.run(Task.scala:131)
        at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run(Executor.scala:497)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
        at java.base/java.lang.Thread.run(Thread.java:829)

21/12/03 00:08:39 ERROR TaskSetManager: Task 0 in stage 2.0 failed 1 times; aborting job
21/12/03 00:08:39 INFO TaskSchedulerImpl: Removed TaskSet 2.0, whose tasks have all completed, from pool 
21/12/03 00:08:39 INFO TaskSchedulerImpl: Cancelling stage 2
21/12/03 00:08:39 INFO TaskSchedulerImpl: Killing all running tasks in stage 2: Stage cancelled
21/12/03 00:08:39 INFO DAGScheduler: ShuffleMapStage 2 (show at Main.java:45) failed in 0.298 s due to Job aborted due to stage failure: Task 0 in stage 2.0 failed 1 times, most recent failure: Lost task 0.0 in stage 2.0 (TID 2) (192.168.1.10 executor driver): java.lang.NullPointerException
        at com.spark.part_4.Main.call(Main.java:33)
        at com.spark.part_4.Main.call(Main.java:27)
        at org.apache.spark.sql.Dataset.$anonfun$flatMap(Dataset.scala:2876)
        at scala.collection.Iterator$$anon.nextCur(Iterator.scala:484)
        at scala.collection.Iterator$$anon.hasNext(Iterator.scala:490)
        at scala.collection.Iterator$$anon.hasNext(Iterator.scala:458)
        at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.agg_doAggregateWithKeys_0$(Unknown Source)
        at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
        at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
        at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon.hasNext(WholeStageCodegenExec.scala:755)
        at scala.collection.Iterator$$anon.hasNext(Iterator.scala:458)
        at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:132)
        at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
        at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
        at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
        at org.apache.spark.scheduler.Task.run(Task.scala:131)
        at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run(Executor.scala:497)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
        at java.base/java.lang.Thread.run(Thread.java:829)

Driver stacktrace:
21/12/03 00:08:39 INFO DAGScheduler: Job 2 failed: show at Main.java:45, took 0.312624 s
Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 2.0 failed 1 times, most recent failure: Lost task 0.0 in stage 2.0 (TID 2) (192.168.1.10 executor driver): java.lang.NullPointerException
        at com.spark.part_4.Main.call(Main.java:33)
        at com.spark.part_4.Main.call(Main.java:27)
        at org.apache.spark.sql.Dataset.$anonfun$flatMap(Dataset.scala:2876)
        at scala.collection.Iterator$$anon.nextCur(Iterator.scala:484)
        at scala.collection.Iterator$$anon.hasNext(Iterator.scala:490)
        at scala.collection.Iterator$$anon.hasNext(Iterator.scala:458)
        at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.agg_doAggregateWithKeys_0$(Unknown Source)
        at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
        at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
        at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon.hasNext(WholeStageCodegenExec.scala:755)
        at scala.collection.Iterator$$anon.hasNext(Iterator.scala:458)
        at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:132)
        at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
        at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
        at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
        at org.apache.spark.scheduler.Task.run(Task.scala:131)
        at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run(Executor.scala:497)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
        at java.base/java.lang.Thread.run(Thread.java:829)

Driver stacktrace:
        at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2253)
        at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage(DAGScheduler.scala:2202)
        at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$adapted(DAGScheduler.scala:2201)
        at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
        at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
        at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2201)
        at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed(DAGScheduler.scala:1078)
        at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$adapted(DAGScheduler.scala:1078)
        at scala.Option.foreach(Option.scala:407)
        at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1078)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2440)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2382)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2371)
        at org.apache.spark.util.EventLoop$$anon.run(EventLoop.scala:49)
        at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:868)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:2202)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:2223)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:2242)
        at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:472)
        at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:425)
        at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:47)
        at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:3696)
        at org.apache.spark.sql.Dataset.$anonfun$head(Dataset.scala:2722)
        at org.apache.spark.sql.Dataset.$anonfun$withAction(Dataset.scala:3687)
        at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId(SQLExecution.scala:103)
        at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
        at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId(SQLExecution.scala:90)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:772)
        at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
        at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3685)
        at org.apache.spark.sql.Dataset.head(Dataset.scala:2722)
        at org.apache.spark.sql.Dataset.take(Dataset.scala:2929)
        at org.apache.spark.sql.Dataset.getRows(Dataset.scala:301)
        at org.apache.spark.sql.Dataset.showString(Dataset.scala:338)
        at org.apache.spark.sql.Dataset.show(Dataset.scala:825)
        at org.apache.spark.sql.Dataset.show(Dataset.scala:784)
        at org.apache.spark.sql.Dataset.show(Dataset.scala:793)
        at com.spark.part_4.Main.main(Main.java:45)
        at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.base/java.lang.reflect.Method.invoke(Method.java:566)
        at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
        at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:951)
        at org.apache.spark.deploy.SparkSubmit.doRunMain(SparkSubmit.scala:180)
        at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:203)
        at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:90)
        at org.apache.spark.deploy.SparkSubmit$$anon.doSubmit(SparkSubmit.scala:1030)
        at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1039)
        at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.lang.NullPointerException
        at com.spark.part_4.Main.call(Main.java:33)
        at com.spark.part_4.Main.call(Main.java:27)
        at org.apache.spark.sql.Dataset.$anonfun$flatMap(Dataset.scala:2876)
        at scala.collection.Iterator$$anon.nextCur(Iterator.scala:484)
        at scala.collection.Iterator$$anon.hasNext(Iterator.scala:490)
        at scala.collection.Iterator$$anon.hasNext(Iterator.scala:458)
        at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.agg_doAggregateWithKeys_0$(Unknown Source)
        at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
        at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
        at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon.hasNext(WholeStageCodegenExec.scala:755)
        at scala.collection.Iterator$$anon.hasNext(Iterator.scala:458)
        at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:132)
        at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
        at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
        at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
        at org.apache.spark.scheduler.Task.run(Task.scala:131)
        at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run(Executor.scala:497)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
        at java.base/java.lang.Thread.run(Thread.java:829)

希望得到大家的帮助,谢谢...

您在应用 FlatMapFunction 时得到 java.lang.NullPointerException,因为您的数据集中可能有空值。在此示例中,您似乎使用了 Description 列。

对于空值,该列可能会被 spark 和以下行读取为 null

List<String> listItem = Arrays.asList(r.getString(2).split(" "));

可能会在 r.getString(2) returns null 并且您尝试在空引用上调用函数 split 时引发此异常。

您可以尝试通过在拆分前检查是否有 null 个值来解决这个问题,例如

        data.flatMap(new FlatMapFunction<Row, Row>() {
            private static final long serialVersionUID = 1L;
            private int cnt = 0;
            
            public Iterator<Row> call(Row r) throws Exception {
                    
                List<Row> listItemRow = new ArrayList<Row>();
                //check if null before splitting here
                if(r.getString(2) != null) { 
                    List<String> listItem = Arrays.asList(r.getString(2).split(" "));
                    for (String item : listItem) {
                        listItemRow.add(RowFactory.create(cnt, item, 1));
                        cnt++;
                    }
                }
                        
                return listItemRow.iterator();
            }
        }, RowEncoder.apply(
                new StructType().add("number", "integer")
                                .add("word", "string")
                                .add("lit", "integer")
        )).createOrReplaceTempView("data");

您可以使用

查看具有 null 值的这些行
data.where("Description is null").show();

并在应用 flatMap 例如

之前类似地过滤这些行
data.where("Description is not null")
    .flatMap(new FlatMapFunction<Row, Row>() {
//continue the rest of your code