编写时引发结构化流式异常

spark structured streaming exception while writing

我在编写 spark 结构化流式数据帧时出现以下错误 - 请告诉我在 运行 这段代码中我哪里做错了-

这里 df 正在从 s3://abc/testing 位置读取,我正在使用 spark streaming-

将此数据帧写入不同的 s3 位置
 val q = df  .writeStream
        .trigger(Trigger.Once)
        .option("checkpointLocation", "s3://abc/checkpoint")
        .foreachBatch { (batchDF: DataFrame, batchId: Long) =>
           batchDF
            .write
            .mode(SaveMode.Append)
            .parquet("s3://abc/demo")
        }.start()
      q.processAllAvailable()
      q.stop()

虽然 运行 以上代码我得到以下错误 -

    org.apache.spark.sql.streaming.StreamingQueryException: Job aborted.
    === Streaming Query ===
    Identifier: [id = 82cae180-6190-499a-99ae, runId = 23aa9dca-c6ef-49ff-b860]
    Current Committed Offsets: {}
    Current Available Offsets: {FileStreamSource[s3://abc/testing]: {"logOffset":0}}
    
    Current State: ACTIVE
    Thread State: RUNNABLE
    
    Logical Plan:
    FileStreamSource[s3://abc/testing]
        at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:379)
        at org.apache.spark.sql.execution.streaming.StreamExecution$$anon.run(StreamExecution.scala:269)
Caused by: org.apache.spark.SparkException: Job aborted.
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:230)
    at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:178)
    at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:116)
    at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:114)
    at org.apache.spark.sql.execution.command.DataWritingCommandExec.doExecute(commands.scala:139)
    at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute(SparkPlan.scala:200)
    at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery(SparkPlan.scala:252)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:165)
    at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:248)
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:192)
    at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:158)
    at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:157)
    at org.apache.spark.sql.DataFrameWriter.$anonfun$runCommand(DataFrameWriter.scala:999)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withCustomExecutionEnv(SQLExecution.scala:116)
    at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:249)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withCustomExecutionEnv(SQLExecution.scala:101)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:845)
    at org.apache.spark.sql.execution.SQLExecution$.withCustomExecutionEnv(SQLExecution.scala:77)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:199)
    at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:999)
    at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:437)
    at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:421)
    at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:294)
    at org.apache.spark.sql.DataFrameWriter.parquet(DataFrameWriter.scala:884)
    at line7d42fe70c8664871b443fdc5f6bbc35869.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.$anonfun$withCreateExtract(command-3858326:61)
    at line7d42fe70c8664871b443fdc5f6bbc35869.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.$anonfun$withCreateExtract$adapted(command-3858326:56)
    at org.apache.spark.sql.execution.streaming.sources.ForeachBatchSink.addBatch(ForeachBatchSink.scala:39)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch(MicroBatchExecution.scala:593)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withCustomExecutionEnv(SQLExecution.scala:116)
    at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:249)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withCustomExecutionEnv(SQLExecution.scala:101)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:845)
    at org.apache.spark.sql.execution.SQLExecution$.withCustomExecutionEnv(SQLExecution.scala:77)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:199)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch(MicroBatchExecution.scala:591)
    at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:276)
    at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:274)
    at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:74)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runBatch(MicroBatchExecution.scala:591)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream(MicroBatchExecution.scala:231)
    at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
    at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:276)
    at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:274)
    at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:74)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream(MicroBatchExecution.scala:199)
    at org.apache.spark.sql.execution.streaming.OneTimeExecutor.execute(TriggerExecutor.scala:39)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:193)
    at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:358)
    at org.apache.spark.sql.execution.streaming.StreamExecution$$anon.run(StreamExecution.scala:269)
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Total size of serialized results of 31 tasks (4.0 GiB) is bigger than spark.driver.maxResultSize 4.0 GiB.
    at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2519)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage(DAGScheduler.scala:2466)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$adapted(DAGScheduler.scala:2460)
    at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
    at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2460)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed(DAGScheduler.scala:1152)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$adapted(DAGScheduler.scala:1152)
    at scala.Option.foreach(Option.scala:407)
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1152)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2721)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2668)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2656)
    at org.apache.spark.util.EventLoop$$anon.run(EventLoop.scala:49)
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:938)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2339)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2434)
    at org.apache.spark.sql.execution.collect.Collector.runSparkJobs(Collector.scala:273)
    at org.apache.spark.sql.execution.collect.Collector.collect(Collector.scala:308)
    at org.apache.spark.sql.execution.collect.Collector$.collect(Collector.scala:82)
    at org.apache.spark.sql.execution.collect.Collector$.collect(Collector.scala:88)
    at org.apache.spark.sql.execution.ResultCacheManager.getOrComputeResult(ResultCacheManager.scala:508)
    at org.apache.spark.sql.execution.ResultCacheManager.getOrComputeResult(ResultCacheManager.scala:480)
    at org.apache.spark.sql.execution.SparkPlan.executeCollectResult(SparkPlan.scala:401)
    at org.apache.spark.sql.execution.exchange.BroadcastExchangeExec.$anonfun$relationFuture(BroadcastExchangeExec.scala:127)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:845)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withThreadLocalCaptured(SQLExecution.scala:308)
    at scala.util.DynamicVariable.withValue(DynamicVariable.scala:62)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withThreadLocalCaptured(SQLExecution.scala:308)
    at scala.util.DynamicVariable.withValue(DynamicVariable.scala:62)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withThreadLocalCaptured(SQLExecution.scala:307)
    at org.apache.spark.sql.execution.SQLExecution$.withOptimisticTransaction(SQLExecution.scala:325)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withThreadLocalCaptured(SQLExecution.scala:306)
    at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604)
    at org.apache.spark.util.threads.SparkThreadLocalCapturingRunnable.$anonfun$run(SparkThreadLocalForwardingThreadPoolExecutor.scala:104)
    at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
    at org.apache.spark.util.threads.SparkThreadLocalCapturingHelper.runWithCaptured(SparkThreadLocalForwardingThreadPoolExecutor.scala:68)
    at org.apache.spark.util.threads.SparkThreadLocalCapturingHelper.runWithCaptured$(SparkThreadLocalForwardingThreadPoolExecutor.scala:54)
    at org.apache.spark.util.threads.SparkThreadLocalCapturingRunnable.runWithCaptured(SparkThreadLocalForwardingThreadPoolExecutor.scala:101)
    at org.apache.spark.util.threads.SparkThreadLocalCapturingRunnable.run(SparkThreadLocalForwardingThreadPoolExecutor.scala:104)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

Total size of serialized results of 31 tasks (4.0 GiB) is bigger than spark.driver.maxResultSize 4.0 GiB. 表示当执行器试图将其结果发送给驱动程序时,它超过了 spark.driver.maxResultSize。你可以通过增加它来解决它,直到你让它工作,但如果执行者试图发送太多数据,这不是一个建议。

可能导致这种情况的另一件事是数据倾斜,您应该检查数据在工作节点上的分布情况,可能的情况是所有数据最终都在单个节点上,这会导致巨大的 input/output 数据来自单身工人。在这种情况下,您可以尝试重新分区您的数据以在您的工作人员之间分配负载,这将是比增加限制更好的解决方案。