DELTA MERGE 中的 Spark 非描述性错误

Spark non-descriptive error in DELTA MERGE

我在 Databricks (Databricks Runtime 8) 中使用 Spark 3.1 和一个非常大的集群(25 个工作人员,每个工作人员有 112 Gb 的内存和 16 个核心)在 Azure 数据中复制多个 SAP tables Lake Storage(ADLS gen2)。为此,一个工具将所有这些 table 的增量写入中间系统(SQL 服务器),然后,如果我有某个 table 的新数据,我执行将新数据与 ADLS 中可用的现有数据合并的 Databricks 作业。

这个过程对于大多数 table 来说工作正常,但是其中一些(最大的)需要花费很多时间才能合并(我使用每个 table) 并且最大的一个从一周前开始出现故障(当生成 table 的大增量时)。我在作业中看到的错误跟踪:

Py4JJavaError: An error occurred while calling o233.sql. : org.apache.spark.SparkException: Job aborted. at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:234) at com.databricks.sql.transaction.tahoe.files.TransactionalWriteEdge.$anonfun$writeFiles(TransactionalWriteEdge.scala:246) ... .. ............................................................................................................................................................................................................................................................................................................................................................................ Caused by: org.apache.spark.SparkException: Exception thrown in awaitResult: at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:428) at com.databricks.sql.transaction.tahoe.perf.DeltaOptimizedWriterExec.awaitShuffleMapStage(DeltaOptimizedWriterExec.scala:153) at com.databricks.sql.transaction.tahoe.perf.DeltaOptimizedWriterExec.getShuffleStats(DeltaOptimizedWriterExec.scala:158) at com.databricks.sql.transaction.tahoe.perf.DeltaOptimizedWriterExec.computeBins(DeltaOptimizedWriterExec.scala:106) at com.databricks.sql.transaction.tahoe.perf.DeltaOptimizedWriterExec.doExecute(DeltaOptimizedWriterExec.scala:174) at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute(SparkPlan.scala:196) at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery(SparkPlan.scala:240) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:165) at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:236) at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:192) at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:180) ... 141 more Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: ShuffleMapStage 68 (execute at DeltaOptimizedWriterExec.scala:97) has failed the maximum allowable number of times: 4. Most recent failure reason: org.apache.spark.shuffle.FetchFailedException: Connection from /XXX.XX.XX.XX:4048 closed at org.apache.spark.storage.ShuffleBlockFetcherIterator.throwFetchFailedException(ShuffleBlockFetcherIterator.scala:769) at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:684) at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:69) at .................................................................................................................................................................................................................................................................................................................................... ... java.lang.Thread.run(Thread.java:748) Caused by: java.io.IOException: Connection from /XXX.XX.XX.XX:4048 closed at org.apache.spark.network.client.TransportResponseHandler.channelInactive(TransportResponseHandler.java:146) at org.apache.spark.network.server.TransportChannelHandler.channelInactive(TransportChannelHandler.java:117) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:262) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:248) at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:241) at io.netty.channel.ChannelInboundHandlerAdapter.channelInactive(ChannelInboundHandlerAdapter.java:81) at io.netty.handler.timeout.IdleStateHandler.channelInactive(IdleStateHandler.java:277) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:262) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:248) at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:241) at io.netty.channel.ChannelInboundHandlerAdapter.channelInactive(ChannelInboundHandlerAdapter.java:81) at org.apache.spark.network.util.TransportFrameDecoder.channelInactive(TransportFrameDecoder.java:225) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:262) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:248) at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:241) at io.netty.channel.DefaultChannelPipeline$HeadContext.channelInactive(DefaultChannelPipeline.java:1405) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:262) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:248) at io.netty.channel.DefaultChannelPipeline.fireChannelInactive(DefaultChannelPipeline.java:901) at io.netty.channel.AbstractChannel$AbstractUnsafe.run(AbstractChannel.java:818) at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164) at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:472) at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:497) at io.netty.util.concurrent.SingleThreadEventExecutor.run(SingleThreadEventExecutor.java:989) at io.netty.util.internal.ThreadExecutorMap.run(ThreadExecutorMap.java:74) at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ... 1 more

由于错误是非描述性的,我查看了每个执行程序日志并看到了以下消息:

21/04/07 09:11:24 ERROR OneForOneBlockFetcher: Failed while starting block fetches java.io.IOException: Connection from /XXX.XX.XX.XX:4048 closed

并且在似乎无法连接的执行程序中,我看到以下错误消息:

21/04/06 09:30:46 ERROR SparkThreadLocalCapturingRunnable: Exception in thread Task reaper-7 org.apache.spark.SparkException: Killing executor JVM because killed task 5912 could not be stopped within 60000 ms. at org.apache.spark.executor.Executor$TaskReaper.run(Executor.scala:1119) at org.apache.spark.util.threads.SparkThreadLocalCapturingRunnable.$anonfun$run(SparkThreadLocalForwardingThreadPoolExecutor.scala:104) at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) at org.apache.spark.util.threads.SparkThreadLocalCapturingHelper.runWithCaptured(SparkThreadLocalForwardingThreadPoolExecutor.scala:68) at org.apache.spark.util.threads.SparkThreadLocalCapturingHelper.runWithCaptured$(SparkThreadLocalForwardingThreadPoolExecutor.scala:54) at org.apache.spark.util.threads.SparkThreadLocalCapturingRunnable.runWithCaptured(SparkThreadLocalForwardingThreadPoolExecutor.scala:101) at org.apache.spark.util.threads.SparkThreadLocalCapturingRunnable.run(SparkThreadLocalForwardingThreadPoolExecutor.scala:104) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(

我已经尝试增加默认随机播放并行度(从 200 到 1200,正如这里建议的那样 ),似乎作业的执行时间更长,但它再次失败。

我尝试在作业执行的同时监控SparkUI:

但是如您所见,问题是相同的:某些阶段失败是因为执行程序无法访问,因为任务失败了 X 次以上。

我上面提到的 big delta 大约有 4-50 亿行,而我要合并的 big dump 大约有 1 亿行。 table 尚未分区(尚未),因此该过程非常耗费工作。失败的是合并部分,而不是将数据从 SQL 服务器复制到 ADLS 的过程,因此一旦要合并的数据已经是 Parquet 格式,就会进行合并。

知道发生了什么或者我能做什么来完成这个合并吗?

提前致谢。

最后,我检查了集群,并在我想要执行的作业代码中将 spark.sql.shuffle.partitions 属性 更改为 1600配置(而不是直接在集群上更改它)。 在我的集群中,我有 400 个核心,所以我选择了该数字的倍数 (1600)。

之后,执行在两个小时内完成。我得出这个结论是因为,在我的日志和 Spark UI 中,我观察到大量磁盘溢出,所以我认为分区不适合工作节点。