org.apache.spark.shuffle.FetchFailedException:来自 server1/xxx.xxx.x.xxx:7337 的连接已关闭

org.apache.spark.shuffle.FetchFailedException: Connection from server1/xxx.xxx.x.xxx:7337 closed

突出显示
我已经升级了 Spark 并尝试在 YARN 上 运行 已经存在的 Spark Streaming 应用程序(通过流接受文件名,然后从 HDFS 读取文件名,使用 rdd 和数据帧操作进行转换,最后分析的数据集保存到 HBase 中) ,这是失败的,无法解决问题。

环境详情如下

使用版本

Platform
OS : RHEL 6.6, 128GB RAM, 42TB HDD, 32 Core
Java : 1.8.0_25
Scala : 2.11
Hadoop : 2.7.7
Spark : 2.4.6 with Hadoop 2.7 binaries
HBase : 1.4.12

升级后无法使用

Spark : 3.0.0 with Hadoop 2.7 binaries
Compiled same code using Scala 2.12, as required for Spark 3.0.0, which had some minor changes as per version change, with no logical changes.

所需的 YARN 配置

<property>
  <name>yarn.nodemanager.aux-services</name>
  <value>mapreduce_shuffle,spark_shuffle</value>
</property>
<property>
  <name>yarn.nodemanager.aux-services.mapreduce.shuffle.class</name>
  <value>org.apache.hadoop.mapred.ShuffleHandler</value>
</property>
<property>
  <name>yarn.nodemanager.aux-services.spark_shuffle.class</name>
  <value>org.apache.spark.network.yarn.YarnShuffleService</value>
</property>
<property>
  <name>spark.shuffle.service.port</name>
  <value>7337</value>
</property>

启动作业时已通过 Spark 配置

spark.app.name=Ingestion
spark.eventLog.enabled=true
spark.yarn.historyServer.address=${hadoopconf-yarn.resourcemanager.hostname}:18088
spark.eventLog.dir=hdfs:///user/hduser/applicationHistory
spark.submit.deployMode=cluster
spark.driver.memory=1GB
spark.driver.cores=1
spark.executor.memory=5GB
spark.executor.cores=5
spark.dynamicAllocation.enabled=true
spark.shuffle.service.enabled=true
spark.dynamicAllocation.minExecutors=1
spark.sql.shuffle.partitions=2001
spark.logging.level=INFO
spark.serializer=org.apache.spark.serializer.KryoSerializer
spark.yarn.archive=hdfs:///spark-3.0.0-bin-hadoop2.7-jars.zip
spark.ui.killEnabled=false
spark.driver.memoryOverhead=512
spark.executor.memoryOverhead=1024
spark.yarn.maxAppAttempts=4
spark.yarn.am.attemptFailuresValidityInterval=1h

问题
同一段代码适用于具有相同 Hadoop、YARN、Spark 配置集的 spark 版本 2.4.4、2.4.5、2.4.6。 当我升级到 spark 3.0.0 时,代码开始失败并出现以下异常。尝试过多次调整,如增加资源、减少分区等,但没有成功。已通过 telnet 检查端口 7337,它也已打开并正在侦听。经过一周的调试,无法找到任何解决方案,似乎没有理由关闭 shuffle 端口连接。
正在处理将近 50 MB 的数据集。使用与 Spark 2.4.x 完全相同的配置,相同的代码能够处理超过 300 MB 的数据。这很奇怪!!

异常

org.apache.spark.shuffle.FetchFailedException: Connection from server1/xxx.xxx.x.xxx:7337 closed
    at org.apache.spark.storage.ShuffleBlockFetcherIterator.throwFetchFailedException(ShuffleBlockFetcherIterator.scala:748)
    at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:663)
    at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:70)
    at org.apache.spark.util.CompletionIterator.next(CompletionIterator.scala:29)
    at scala.collection.Iterator$$anon.nextCur(Iterator.scala:484)
    at scala.collection.Iterator$$anon.hasNext(Iterator.scala:490)
    at scala.collection.Iterator$$anon.hasNext(Iterator.scala:458)
    at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:31)
    at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
    at scala.collection.Iterator$$anon.hasNext(Iterator.scala:458)
    at org.apache.spark.sql.execution.aggregate.ObjectAggregationIterator.processInputs(ObjectAggregationIterator.scala:155)
    at org.apache.spark.sql.execution.aggregate.ObjectAggregationIterator.<init>(ObjectAggregationIterator.scala:78)
    at org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec.$anonfun$doExecute(ObjectHashAggregateExec.scala:129)
    at org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec.$anonfun$doExecute$adapted(ObjectHashAggregateExec.scala:107)
    at org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec$$Lambda7/1323895653.apply(Unknown Source)
    at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndexInternal(RDD.scala:859)
    at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndexInternal$adapted(RDD.scala:859)
    at org.apache.spark.rdd.RDD$$Lambda4/1207730390.apply(Unknown Source)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
    at org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:89)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
    at org.apache.spark.sql.execution.SQLExecutionRDD.compute(SQLExecutionRDD.scala:55)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.run(Task.scala:127)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run(Executor.scala:444)
    at org.apache.spark.executor.Executor$TaskRunner$$Lambda1/1364680867.apply(Unknown Source)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:447)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.IOException: Connection from server1/xxx.xxx.x.xxx:7337 closed
    at org.apache.spark.network.client.TransportResponseHandler.channelInactive(TransportResponseHandler.java:146)
    at org.apache.spark.network.server.TransportChannelHandler.channelInactive(TransportChannelHandler.java:117)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:262)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:248)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:241)
    at io.netty.channel.ChannelInboundHandlerAdapter.channelInactive(ChannelInboundHandlerAdapter.java:81)
    at io.netty.handler.timeout.IdleStateHandler.channelInactive(IdleStateHandler.java:277)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:262)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:248)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:241)
    at io.netty.channel.ChannelInboundHandlerAdapter.channelInactive(ChannelInboundHandlerAdapter.java:81)
    at org.apache.spark.network.util.TransportFrameDecoder.channelInactive(TransportFrameDecoder.java:225)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:262)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:248)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:241)
    at io.netty.channel.DefaultChannelPipeline$HeadContext.channelInactive(DefaultChannelPipeline.java:1405)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:262)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:248)
    at io.netty.channel.DefaultChannelPipeline.fireChannelInactive(DefaultChannelPipeline.java:901)
    at io.netty.channel.AbstractChannel$AbstractUnsafe.run(AbstractChannel.java:818)
    at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164)
    at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:472)
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:497)
    at io.netty.util.concurrent.SingleThreadEventExecutor.run(SingleThreadEventExecutor.java:989)
    at io.netty.util.internal.ThreadExecutorMap.run(ThreadExecutorMap.java:74)
    at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
    ... 1 more

还有其他人遇到过这个问题吗?如果是,请告诉我您是如何解决的。我没有更多关于要检查什么的线索。任何帮助将不胜感激
谢谢

Spark 3.x 不能与旧的 shuffle 服务一起使用。

如果您想保留旧的随机播放服务,请尝试进行以下配置更改。

spark.shuffle.useOldFetchProtocol=true

引用https://issues.apache.org/jira/browse/SPARK-29435