有未完成的请求时,外部 Shuffle 服务连接空闲超过 120 秒

External Shuffle service connection idle for more than 120seconds while there are outstanding requests

我是 运行 纱线上的火花工作。该作业在亚马逊 EMR 上正常运行。 (1 主从 m4.xlarge)

我使用 aws ec2 机器使用 HDP 2.6 发行版设置了类似的基础设施。但是 spark 作业卡在某个特定阶段,一段时间后我在容器日志中收到以下错误。主要错误似乎是随机播放服务处于空闲状态。

18/06/25 07:15:31 INFO spark.MapOutputTrackerWorker: Doing the fetch; tracker endpoint = NettyRpcEndpointRef(spark://MapOutputTracker@10.210.150.150:44343) 18/06/25 07:15:31 INFO spark.MapOutputTrackerWorker: Don't have map outputs for shuffle 9, fetching them 18/06/25 07:15:31 INFO spark.MapOutputTrackerWorker: Don't have map outputs for shuffle 9, fetching them 18/06/25 07:15:31 INFO spark.MapOutputTrackerWorker: Got the output locations 18/06/25 07:15:31 INFO storage.ShuffleBlockFetcherIterator: Getting 5 non-empty blocks out of 1000 blocks 18/06/25 07:15:31 INFO storage.ShuffleBlockFetcherIterator: Started 1 remote fetches in 0 ms 18/06/25 07:15:31 INFO storage.ShuffleBlockFetcherIterator: Getting 5 non-empty blocks out of 1000 blocks 18/06/25 07:15:31 INFO storage.ShuffleBlockFetcherIterator: Started 0 remote fetches in 0 ms 18/06/25 07:15:31 INFO storage.ShuffleBlockFetcherIterator: Getting 5 non-empty blocks out of 1000 blocks 18/06/25 07:15:31 INFO storage.ShuffleBlockFetcherIterator: Started 1 remote fetches in 0 ms 18/06/25 07:15:31 INFO storage.ShuffleBlockFetcherIterator: Getting 5 non-empty blocks out of 1000 blocks 18/06/25 07:15:31 INFO storage.ShuffleBlockFetcherIterator: Started 1 remote fetches in 1 ms 18/06/25 07:15:31 INFO codegen.CodeGenerator: Code generated in 4.822611 ms 18/06/25 07:15:31 INFO codegen.CodeGenerator: Code generated in 8.430244 ms 18/06/25 07:17:31 ERROR server.TransportChannelHandler: Connection to ip-10-210-150-180.********/10.210.150.180:7447 has been quiet for 120000 ms while there are outstanding requests. Assuming connection is dead; please adjust spark.network.timeout if this is wrong. 18/06/25 07:17:31 ERROR client.TransportResponseHandler: Still have 307 requests outstanding when connection from ip-10-210-150-180.********/10.210.150.180:7447 is closed 18/06/25 07:17:31 INFO shuffle.RetryingBlockFetcher: Retrying fetch (1/3) for 197 outstanding blocks after 5000 ms 18/06/25 07:17:31 ERROR shuffle.OneForOneBlockFetcher: Failed while starting block fetches java.io.IOException: Connection from ip-10-210-150-180.********/10.210.150.180:7447 closed at org.apache.spark.network.client.TransportResponseHandler.channelInactive(TransportResponseHandler.java:146) at org.apache.spark.network.server.TransportChannelHandler.channelInactive(TransportChannelHandler.java:108) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:241) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:227) at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:220) at io.netty.channel.ChannelInboundHandlerAdapter.channelInactive(ChannelInboundHandlerAdapter.java:75) at io.netty.handler.timeout.IdleStateHandler.channelInactive(IdleStateHandler.java:278) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:241) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:227) at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:220) at io.netty.channel.ChannelInboundHandlerAdapter.channelInactive(ChannelInboundHandlerAdapter.java:75) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:241) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:227) at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:220) at io.netty.channel.ChannelInboundHandlerAdapter.channelInactive(ChannelInboundHandlerAdapter.java:75) at org.apache.spark.network.util.TransportFrameDecoder.channelInactive(TransportFrameDecoder.java:182) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:241) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:227) at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:220) at io.netty.channel.DefaultChannelPipeline$HeadContext.channelInactive(DefaultChannelPipeline.java:1289) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:241) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:227) at io.netty.channel.DefaultChannelPipeline.fireChannelInactive(DefaultChannelPipeline.java:893) at io.netty.channel.AbstractChannel$AbstractUnsafe.run(AbstractChannel.java:691) at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:399) at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:446) at io.netty.util.concurrent.SingleThreadEventExecutor.run(SingleThreadEventExecutor.java:131) at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:144) at java.lang.Thread.run(Thread.java:748) 18/06/25 07:17:31 INFO shuffle.RetryingBlockFetcher: Retrying fetch (1/3) for 166 outstanding blocks after 5000 ms 18/06/25 07:17:31 ERROR shuffle.OneForOneBlockFetcher: Failed while starting block fetches java.io.IOException: Connection from ip-10-210-150-180.********/10.210.150.180:7447 closed at org.apache.spark.network.client.TransportResponseHandler.channelInactive(TransportResponseHandler.java:146) at org.apache.spark.network.server.TransportChannelHandler.channelInactive(TransportChannelHandler.java:108) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:241) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:227) at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:220) at io.netty.channel.ChannelInboundHandlerAdapter.channelInactive(ChannelInboundHandlerAdapter.java:75) at io.netty.handler.timeout.IdleStateHandler.channelInactive(IdleStateHandler.java:278) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:241) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:227) at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:220) at io.netty.channel.ChannelInboundHandlerAdapter.channelInactive(ChannelInboundHandlerAdapter.java:75) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:241) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:227) at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:220) at io.netty.channel.ChannelInboundHandlerAdapter.channelInactive(ChannelInboundHandlerAdapter.java:75) at org.apache.spark.network.util.TransportFrameDecoder.channelInactive(TransportFrameDecoder.java:182) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:241) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:227) at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:220) at io.netty.channel.DefaultChannelPipeline$HeadContext.channelInactive(DefaultChannelPipeline.java:1289) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:241) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:227) at io.netty.channel.DefaultChannelPipeline.fireChannelInactive(DefaultChannelPipeline.java:893) at io.netty.channel.AbstractChannel$AbstractUnsafe.run(AbstractChannel.java:691) at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:399) at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:446) at io.netty.util.concurrent.SingleThreadEventExecutor.run(SingleThreadEventExecutor.java:131) at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:144) at java.lang.Thread.run(Thread.java:748)

我目前 运行 spark on yarn cluster 具有以下 spark-defaults 配置

spark.eventLog.dir=hdfs:///user/spark/applicationHistory
spark.eventLog.enabled=true
spark.yarn.historyServer.address=ppv-qa12-tenant8-spark-cluster-master.periscope-solutions.local:18080
spark.shuffle.service.enabled=true
spark.dynamicAllocation.enabled=true
spark.driver.extraLibraryPath=/usr/hdp/current/hadoop-client/lib/native:/usr/hdp/current/hadoop-client/lib/native/Linux-amd64-64
spark.executor.extraLibraryPath=/usr/hdp/current/hadoop-client/lib/native:/usr/hdp/current/hadoop-client/lib/native/Linux-amd64-64
spark.driver.maxResultSize=0
spark.driver.extraJavaOptions=-XX:+UseConcMarkSweepGC -XX:CMSInitiatingOccupancyFraction=70 -XX:MaxHeapFreeRatio=70 -XX:+CMSClassUnloadingEnabled -XX:OnOutOfMemoryError='kill -9 %p'
spark.executor.extraJavaOptions=-verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+UseConcMarkSweepGC -XX:CMSInitiatingOccupancyFraction=70 -XX:MaxHeapFreeRatio=70 -XX:+CMSClassUnloadingEnabled -XX:OnOutOfMemoryError='kill -9 %p'
spark.executor.memory=5g
spark.driver.memory=1g
spark.executor.cores=4

并且我在 yarn-site.xml 在从属机器的节点管理器中设置了以下内容

<configuration>
  <property>
    <name>yarn.application.classpath</name>
    <value>/usr/hdp/current/spark2-client/aux/*,/etc/hadoop/conf,/usr/hdp/current/hadoop-client/*,/usr/hdp/current/hadoop-client/lib/*,/usr/hdp/current/hadoop-hdfs-client/*,/usr/hdp/current/hadoop-hdfs-client/lib/*,/usr/hdp/current/hadoop-yarn-client/*,/usr/hdp/current/hadoop-yarn-client/lib/*</value>
  </property>
  <property>
    <name>yarn.nodemanager.aux-services</name>
    <value>spark2_shuffle</value>
  </property>
  <property>
    <name>yarn.nodemanager.aux-services.mapreduce_shuffle.class</name>
    <value>org.apache.hadoop.mapred.ShuffleHandler</value>
  </property>
  <property>
    <name>yarn.nodemanager.aux-services.spark2_shuffle.class</name>
    <value>org.apache.spark.network.yarn.YarnShuffleService</value>
  </property>
  <property>
    <name>yarn.nodemanager.container-manager.thread-count</name>
    <value>64</value>
  </property>
  <property>
    <name>yarn.nodemanager.localizer.client.thread-count</name>
    <value>20</value>
  </property>
  <property>
    <name>yarn.nodemanager.vmem-pmem-ratio</name>
    <value>5</value>
  </property>
  <property>
    <name>yarn.resourcemanager.hostname</name>
    <value>************</value>
  </property>
  <property>
    <name>yarn.resourcemanager.resource-tracker.client.thread-count</name>
    <value>64</value>
  </property>
  <property>
    <name>yarn.resourcemanager.scheduler.client.thread-count</name>
    <value>64</value>
  </property>
  <property>
    <name>yarn.scheduler.increment-allocation-mb</name>
    <value>32</value>
  </property>
  <property>
    <name>yarn.scheduler.increment-allocation-vcores</name>
    <value>1</value>
  </property>
  <property>
    <name>yarn.scheduler.maximum-allocation-vcores</name>
    <value>128</value>
  </property>
  <property>
    <name>yarn.scheduler.minimum-allocation-mb</name>
    <value>32</value>
  </property>
  <property>
    <name>yarn.timeline-service.enabled</name>
    <value>true</value>
  </property>
  <property>
  <name>yarn.nodemanager.resource.cpu-vcores</name>
    <value>8</value>
  </property>
  <property>
  <name>yarn.nodemanager.resource.memory-mb</name>
    <value>11520</value>
  </property>
  <property>
  <name>yarn.scheduler.maximum-allocation-mb</name>
    <value>11520</value>
  </property>
  <property>
  <name>yarn.nodemanager.hostname</name>
    <value>*************</value>
  </property>
</configuration>

编辑:通过一些网络调试,我发现容器创建的用于连接随机播放服务的临时端口主动拒绝连接。 (telnet 立即抛出错误)

在查看内核和系统 activity 日志时,我们在 /var/log/messages

中发现了以下问题

xen_netfront: xennet: skb rides the rocket: 19 slots

这意味着我们的 aws ec2 机器出现网络丢包。

数据传输b/n 容器和随机播放服务通过 RPC 调用(ChunkFetchRequest、ChunkFetchSuccess 和 ChunkFetchFailure)发生,这些 RPC 调用被网络抑制。

有关此日志的更多信息可在以下线程中找到。

http://www.brendangregg.com/blog/2014-09-11/perf-kernel-line-tracing.html

日志消息表示我们超出了可以放入驱动程序环形缓冲区队列 (16) 的数据包的最大缓冲区大小,并且那些 SKB 已丢失

分散-聚集收集多个响应并将它们作为单个响应发送,这反过来又导致 SKB 大小增加。

所以我们使用以下命令关闭了分散-聚集。

sudo ethtool -K eth0 sg off

之后就没有丢包了。

性能也与我们过去在 EMR 中的性能相似。