BlockManager 进程的 Spark Connection 被拒绝

Spark Connection refused for BlockManager process

我在 CentOS 虚拟机上以本地方式设置了一个 7 节点集群。 早些时候,虚拟机位于同一位置并且一切正常,但从昨天开始,随着虚拟机分布在整个数据中心(但在同一个机架中),我面临 Connection refusedIOException: Failed to connect to 错误。

日志:

2019-05-24 03:33:37 INFO  TorrentBroadcast:54 - Started reading broadcast variable 6
2019-05-24 03:33:37 ERROR RetryingBlockFetcher:143 - Exception while beginning fetch of 1 outstanding blocks 
java.io.IOException: Failed to connect to <HOST/IP>:38000
    at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:245)
    at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:187)
    at org.apache.spark.network.netty.NettyBlockTransferService$$anon.createAndStart(NettyBlockTransferService.scala:113)
    at org.apache.spark.network.shuffle.RetryingBlockFetcher.fetchAllOutstanding(RetryingBlockFetcher.java:141)
    at org.apache.spark.network.shuffle.RetryingBlockFetcher.start(RetryingBlockFetcher.java:121)
    at org.apache.spark.network.netty.NettyBlockTransferService.fetchBlocks(NettyBlockTransferService.scala:123)
    at org.apache.spark.network.BlockTransferService.fetchBlockSync(BlockTransferService.scala:98)
    at org.apache.spark.storage.BlockManager.getRemoteBytes(BlockManager.scala:693)
    at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$broadcast$TorrentBroadcast$$readBlocks.apply$mcVI$sp(TorrentBroadcast.scala:162)
    at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$broadcast$TorrentBroadcast$$readBlocks.apply(TorrentBroadcast.scala:151)
    at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$broadcast$TorrentBroadcast$$readBlocks.apply(TorrentBroadcast.scala:151)
    at scala.collection.immutable.List.foreach(List.scala:381)
    at org.apache.spark.broadcast.TorrentBroadcast.org$apache$spark$broadcast$TorrentBroadcast$$readBlocks(TorrentBroadcast.scala:151)
    at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readBroadcastBlock$$anonfun$apply.apply(TorrentBroadcast.scala:231)
    at scala.Option.getOrElse(Option.scala:121)
    at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readBroadcastBlock.apply(TorrentBroadcast.scala:211)
    at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1347)
    at org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:207)
    at org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(TorrentBroadcast.scala:66)
    at org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scala:66)
    at org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.scala:96)
    at org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:70)
    at org.apache.spark.sql.execution.datasources.text.TextFileFormat$$anonfun$readToUnsafeMem.apply(TextFileFormat.scala:125)
    at org.apache.spark.sql.execution.datasources.text.TextFileFormat$$anonfun$readToUnsafeMem.apply(TextFileFormat.scala:124)
    at org.apache.spark.sql.execution.datasources.FileFormat$$anon.apply(FileFormat.scala:148)
    at org.apache.spark.sql.execution.datasources.FileFormat$$anon.apply(FileFormat.scala:132)
    at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon.org$apache$spark$sql$execution$datasources$FileScanRDD$$anon$$readCurrentFile(FileScanRDD.scala:128)
    at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon.nextIterator(FileScanRDD.scala:182)
    at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon.hasNext(FileScanRDD.scala:109)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$$anon.hasNext(WholeStageCodegenExec.scala:614)
    at scala.collection.Iterator$$anon.hasNext(Iterator.scala:408)
    at scala.collection.Iterator$$anon.hasNext(Iterator.scala:408)
    at scala.collection.Iterator$$anon.hasNext(Iterator.scala:461)
    at scala.collection.Iterator$$anon.hasNext(Iterator.scala:461)
    at scala.collection.Iterator$$anon.hasNext(Iterator.scala:408)
    at scala.collection.Iterator$class.foreach(Iterator.scala:893)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
    at scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
    at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1336)
    at scala.collection.TraversableOnce$class.aggregate(TraversableOnce.scala:214)
    at scala.collection.AbstractIterator.aggregate(Iterator.scala:1336)
    at org.apache.spark.rdd.RDD$$anonfun$aggregate$$anonfun.apply(RDD.scala:1122)
    at org.apache.spark.rdd.RDD$$anonfun$aggregate$$anonfun.apply(RDD.scala:1122)
    at org.apache.spark.SparkContext$$anonfun.apply(SparkContext.scala:2130)
    at org.apache.spark.SparkContext$$anonfun.apply(SparkContext.scala:2130)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
    at org.apache.spark.scheduler.Task.run(Task.scala:109)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: io.netty.channel.AbstractChannel$AnnotatedConnectException: Connection refused: <HOST/IP>:38000
    at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
    at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)
    at io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:323)
    at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:340)
    at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:633)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:580)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:497)
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:459)
    at io.netty.util.concurrent.SingleThreadEventExecutor.run(SingleThreadEventExecutor.java:858)
    at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:138)
    ... 1 more
Caused by: java.net.ConnectException: Connection refused
    ... 11 more
2019-05-24 03:33:37 INFO  RetryingBlockFetcher:164 - Retrying fetch (1/3) for 1 outstanding blocks after 5000 ms
2019-05-24 03:33:42 ERROR RetryingBlockFetcher:143 - Exception while beginning fetch of 1 outstanding blocks (after 1 retries)
java.io.IOException: Failed to connect to <HOST/IP>:38000
    at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:245)
    at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:187)
    at org.apache.spark.network.netty.NettyBlockTransferService$$anon.createAndStart(NettyBlockTransferService.scala:113)
    at org.apache.spark.network.shuffle.RetryingBlockFetcher.fetchAllOutstanding(RetryingBlockFetcher.java:141)
    at org.apache.spark.network.shuffle.RetryingBlockFetcher.lambda$initiateRetry[=10=](RetryingBlockFetcher.java:169)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:138)
    at java.lang.Thread.run(Thread.java:748)
Caused by: io.netty.channel.AbstractChannel$AnnotatedConnectException: Connection refused: <HOST/IP>:38000
    at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
    at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)
    at io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:323)
    at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:340)
    at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:633)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:580)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:497)
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:459)
    at io.netty.util.concurrent.SingleThreadEventExecutor.run(SingleThreadEventExecutor.java:858)
    ... 2 more
Caused by: java.net.ConnectException: Connection refused
    ... 11 more
2019-05-24 03:33:42 INFO  RetryingBlockFetcher:164 - Retrying fetch (2/3) for 1 outstanding blocks after 5000 ms
2019-05-24 03:33:47 ERROR RetryingBlockFetcher:143 - Exception while beginning fetch of 1 outstanding blocks (after 2 retries)
java.io.IOException: Failed to connect to <HOST/IP>:38000
    at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:245)
    at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:187)
    at org.apache.spark.network.netty.NettyBlockTransferService$$anon.createAndStart(NettyBlockTransferService.scala:113)
    at org.apache.spark.network.shuffle.RetryingBlockFetcher.fetchAllOutstanding(RetryingBlockFetcher.java:141)
    at org.apache.spark.network.shuffle.RetryingBlockFetcher.lambda$initiateRetry[=10=](RetryingBlockFetcher.java:169)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:138)
    at java.lang.Thread.run(Thread.java:748)
Caused by: io.netty.channel.AbstractChannel$AnnotatedConnectException: Connection refused: <HOST/IP>:38000
    at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
    at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)
    at io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:323)
    at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:340)
    at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:633)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:580)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:497)
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:459)
    at io.netty.util.concurrent.SingleThreadEventExecutor.run(SingleThreadEventExecutor.java:858)
    ... 2 more
Caused by: java.net.ConnectException: Connection refused
    ... 11 more

我的理论是因为虚拟机已经扩散,它们的通信可能存在延迟,并且该延迟可能超过超时,从而导致连接被拒绝或超时?这样想对吗?

如果不是,这可能是什么问题?

感谢您的帮助!

想通了。 由于最近对 VM 的更改,BlockManager 服务在 127.0.0.1 上 运行。 修复了这个问题,spark cluster 又恢复了原来的状态!