Spark:线程异常 "main" akka.actor.ActorNotFound:
Spark: Exception in thread "main" akka.actor.ActorNotFound:
我正在将我的 Spark 作业从本地笔记本电脑提交到远程独立 Spark 集群 (spark://IP:7077)。提交成功。但是,我没有得到任何输出,一段时间后它失败了。当我检查集群上的工作人员时,我发现以下异常:
Exception in thread "main" akka.actor.ActorNotFound: Actor not found for: ActorSelection[Actor[akka.tcp://sparkDriver@localhost:54561/]/user/CoarseGrainedScheduler]
当我在我的本地系统 (local[*]) 上 运行 相同的代码时,它 运行 成功并给出了输出。
请注意,我在 spark notebook 中 运行 它。当我使用 spark-submit
通过终端提交时,相同的应用程序 运行s 在远程独立集群上成功
我是不是在笔记本的配置中遗漏了什么?还有其他可能的原因吗?
代码很简单
详细异常:
Exception in thread "main" akka.actor.ActorNotFound: Actor not found for: ActorSelection[Actor[akka.tcp://sparkDriver@localhost:54561/]/user/CoarseGrainedScheduler]
at akka.actor.ActorSelection$$anonfun$resolveOne.apply(ActorSelection.scala:66)
at akka.actor.ActorSelection$$anonfun$resolveOne.apply(ActorSelection.scala:64)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
at akka.dispatch.BatchingExecutor$Batch$$anonfun$run.processBatch(BatchingExecutor.scala:67)
at akka.dispatch.BatchingExecutor$Batch$$anonfun$run.apply$mcV$sp(BatchingExecutor.scala:82)
at akka.dispatch.BatchingExecutor$Batch$$anonfun$run.apply(BatchingExecutor.scala:59)
at akka.dispatch.BatchingExecutor$Batch$$anonfun$run.apply(BatchingExecutor.scala:59)
at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72)
at akka.dispatch.BatchingExecutor$Batch.run(BatchingExecutor.scala:58)
at akka.dispatch.ExecutionContexts$sameThreadExecutionContext$.unbatchedExecute(Future.scala:74)
at akka.dispatch.BatchingExecutor$class.execute(BatchingExecutor.scala:110)
at akka.dispatch.ExecutionContexts$sameThreadExecutionContext$.execute(Future.scala:73)
at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:40)
at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:248)
at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:269)
at akka.actor.EmptyLocalActorRef.specialHandle(ActorRef.scala:512)
at akka.actor.DeadLetterActorRef.specialHandle(ActorRef.scala:545)
at akka.actor.DeadLetterActorRef.$bang(ActorRef.scala:535)
at akka.remote.RemoteActorRefProvider$RemoteDeadLetterActorRef.$bang(RemoteActorRefProvider.scala:91)
at akka.actor.ActorRef.tell(ActorRef.scala:125)
at akka.dispatch.Mailboxes$$anon$$anon.enqueue(Mailboxes.scala:44)
at akka.dispatch.QueueBasedMessageQueue$class.cleanUp(Mailbox.scala:438)
at akka.dispatch.UnboundedDequeBasedMailbox$MessageQueue.cleanUp(Mailbox.scala:650)
at akka.dispatch.Mailbox.cleanUp(Mailbox.scala:309)
at akka.dispatch.MessageDispatcher.unregister(AbstractDispatcher.scala:204)
at akka.dispatch.MessageDispatcher.detach(AbstractDispatcher.scala:140)
at akka.actor.dungeon.FaultHandling$class.akka$actor$dungeon$FaultHandling$$finishTerminate(FaultHandling.scala:203)
at akka.actor.dungeon.FaultHandling$class.terminate(FaultHandling.scala:163)
at akka.actor.ActorCell.terminate(ActorCell.scala:338)
at akka.actor.ActorCell.invokeAll(ActorCell.scala:431)
at akka.actor.ActorCell.systemInvoke(ActorCell.scala:447)
at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:262)
at akka.dispatch.Mailbox.run(Mailbox.scala:218)
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
示例代码
val logFile = "hdfs://hostname/path/to/file"
val conf = new SparkConf()
.setMaster("spark://hostname:7077") // as appears on hostname:8080
.setAppName("myapp")
.set("spark.executor.memory", "20G")
.set("spark.cores.max", "40")
.set("spark.executor.cores","20")
.set("spark.driver.allowMultipleContexts","true")
val sc2 = new SparkContext(conf)
val logData = sc2.textFile(logFile)
val numAs = logData.filter(line => line.contains("hello")).count()
val numBs = logData.filter(line => line.contains("hi")).count()
println("Lines with a: %s, Lines with b: %s".format(numAs, numBs))
当你说 "spark notebook" 我假设你指的是 github 项目 https://github.com/andypetrella/spark-notebook?
我必须查看笔记本的细节,但我注意到您的工作人员正在尝试连接到 "localhost" 上的主机。
对于正常的 Spark 配置,在 $SPARK_HOME/conf/spark-env.sh 中设置 SPARK_MASTER_IP worker 并查看是否有帮助,即使您 运行 在单台机器上独立运行模式,设置这个。根据我的经验,Spark 并不总能正确解析主机名,因此从所有 IP 的基线开始是个好主意。
其余为一般信息,看看对您的具体问题是否有帮助:
如果您从笔记本电脑提交到集群,您可以使用 --deploy-mode 集群告诉您的驱动程序在其中一个工作节点上 运行。这会额外考虑您如何设置类路径,因为您不知道驱动程序将 运行 放在哪个 worker 上。
为了完整起见,这里有一些一般信息,有一个关于主机名解析为 IP 地址的已知 Spark 错误。我并不是在所有情况下都将此作为完整答案,但我建议尝试使用仅使用所有 IP 的基线,并且仅使用单个配置 SPARK_MASTER_IP。仅通过这两种做法,我的集群就可以正常工作,而所有其他配置或使用主机名似乎都把事情搞砸了。
因此在您的 spark-env.sh 中删除 SPARK_LOCAL_IP 并将 SPARK_MASTER_IP 更改为 IP 地址,而不是主机名。
I have treated this more at length in this answer.
为了更完整,这里是该答案的一部分:
你能 ping Spark master 所在的盒子 运行ning 吗?你能从 master ping worker 吗?更重要的是,你能 password-less 从 master box ssh 到 worker 吗?根据 1.5.2 文档,您需要能够使用私钥执行此操作并将工作人员输入 conf/slaves 文件。我在最后复制了相关段落。
您可能会遇到这样一种情况,即 worker 可以联系 master,但 master 无法回复 worker,所以看起来好像没有建立连接。检查两个方向。
我认为主节点上的从属文件和 password-less ssh 可能会导致与您所看到的类似的错误。
根据我交联的答案,还有 an old bug 但不清楚该错误是如何解决的。
更新:
可以通过在应用程序代码中包含驱动程序的 IP 地址(即本地笔记本电脑的 public IP)来避免上述问题。这可以通过在 spark 上下文中添加以下行来完成:
.set("spark.driver.host",YourSystemIPAddress)
但是,如果驱动程序的 IP 地址位于 NAT 后面,则可能会出现问题。在这种情况下,工作人员将无法找到 IP。
我正在将我的 Spark 作业从本地笔记本电脑提交到远程独立 Spark 集群 (spark://IP:7077)。提交成功。但是,我没有得到任何输出,一段时间后它失败了。当我检查集群上的工作人员时,我发现以下异常:
Exception in thread "main" akka.actor.ActorNotFound: Actor not found for: ActorSelection[Actor[akka.tcp://sparkDriver@localhost:54561/]/user/CoarseGrainedScheduler]
当我在我的本地系统 (local[*]) 上 运行 相同的代码时,它 运行 成功并给出了输出。
请注意,我在 spark notebook 中 运行 它。当我使用 spark-submit
我是不是在笔记本的配置中遗漏了什么?还有其他可能的原因吗?
代码很简单
详细异常:
Exception in thread "main" akka.actor.ActorNotFound: Actor not found for: ActorSelection[Actor[akka.tcp://sparkDriver@localhost:54561/]/user/CoarseGrainedScheduler]
at akka.actor.ActorSelection$$anonfun$resolveOne.apply(ActorSelection.scala:66)
at akka.actor.ActorSelection$$anonfun$resolveOne.apply(ActorSelection.scala:64)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
at akka.dispatch.BatchingExecutor$Batch$$anonfun$run.processBatch(BatchingExecutor.scala:67)
at akka.dispatch.BatchingExecutor$Batch$$anonfun$run.apply$mcV$sp(BatchingExecutor.scala:82)
at akka.dispatch.BatchingExecutor$Batch$$anonfun$run.apply(BatchingExecutor.scala:59)
at akka.dispatch.BatchingExecutor$Batch$$anonfun$run.apply(BatchingExecutor.scala:59)
at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72)
at akka.dispatch.BatchingExecutor$Batch.run(BatchingExecutor.scala:58)
at akka.dispatch.ExecutionContexts$sameThreadExecutionContext$.unbatchedExecute(Future.scala:74)
at akka.dispatch.BatchingExecutor$class.execute(BatchingExecutor.scala:110)
at akka.dispatch.ExecutionContexts$sameThreadExecutionContext$.execute(Future.scala:73)
at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:40)
at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:248)
at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:269)
at akka.actor.EmptyLocalActorRef.specialHandle(ActorRef.scala:512)
at akka.actor.DeadLetterActorRef.specialHandle(ActorRef.scala:545)
at akka.actor.DeadLetterActorRef.$bang(ActorRef.scala:535)
at akka.remote.RemoteActorRefProvider$RemoteDeadLetterActorRef.$bang(RemoteActorRefProvider.scala:91)
at akka.actor.ActorRef.tell(ActorRef.scala:125)
at akka.dispatch.Mailboxes$$anon$$anon.enqueue(Mailboxes.scala:44)
at akka.dispatch.QueueBasedMessageQueue$class.cleanUp(Mailbox.scala:438)
at akka.dispatch.UnboundedDequeBasedMailbox$MessageQueue.cleanUp(Mailbox.scala:650)
at akka.dispatch.Mailbox.cleanUp(Mailbox.scala:309)
at akka.dispatch.MessageDispatcher.unregister(AbstractDispatcher.scala:204)
at akka.dispatch.MessageDispatcher.detach(AbstractDispatcher.scala:140)
at akka.actor.dungeon.FaultHandling$class.akka$actor$dungeon$FaultHandling$$finishTerminate(FaultHandling.scala:203)
at akka.actor.dungeon.FaultHandling$class.terminate(FaultHandling.scala:163)
at akka.actor.ActorCell.terminate(ActorCell.scala:338)
at akka.actor.ActorCell.invokeAll(ActorCell.scala:431)
at akka.actor.ActorCell.systemInvoke(ActorCell.scala:447)
at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:262)
at akka.dispatch.Mailbox.run(Mailbox.scala:218)
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
示例代码
val logFile = "hdfs://hostname/path/to/file"
val conf = new SparkConf()
.setMaster("spark://hostname:7077") // as appears on hostname:8080
.setAppName("myapp")
.set("spark.executor.memory", "20G")
.set("spark.cores.max", "40")
.set("spark.executor.cores","20")
.set("spark.driver.allowMultipleContexts","true")
val sc2 = new SparkContext(conf)
val logData = sc2.textFile(logFile)
val numAs = logData.filter(line => line.contains("hello")).count()
val numBs = logData.filter(line => line.contains("hi")).count()
println("Lines with a: %s, Lines with b: %s".format(numAs, numBs))
当你说 "spark notebook" 我假设你指的是 github 项目 https://github.com/andypetrella/spark-notebook?
我必须查看笔记本的细节,但我注意到您的工作人员正在尝试连接到 "localhost" 上的主机。
对于正常的 Spark 配置,在 $SPARK_HOME/conf/spark-env.sh 中设置 SPARK_MASTER_IP worker 并查看是否有帮助,即使您 运行 在单台机器上独立运行模式,设置这个。根据我的经验,Spark 并不总能正确解析主机名,因此从所有 IP 的基线开始是个好主意。
其余为一般信息,看看对您的具体问题是否有帮助:
如果您从笔记本电脑提交到集群,您可以使用 --deploy-mode 集群告诉您的驱动程序在其中一个工作节点上 运行。这会额外考虑您如何设置类路径,因为您不知道驱动程序将 运行 放在哪个 worker 上。
为了完整起见,这里有一些一般信息,有一个关于主机名解析为 IP 地址的已知 Spark 错误。我并不是在所有情况下都将此作为完整答案,但我建议尝试使用仅使用所有 IP 的基线,并且仅使用单个配置 SPARK_MASTER_IP。仅通过这两种做法,我的集群就可以正常工作,而所有其他配置或使用主机名似乎都把事情搞砸了。
因此在您的 spark-env.sh 中删除 SPARK_LOCAL_IP 并将 SPARK_MASTER_IP 更改为 IP 地址,而不是主机名。
I have treated this more at length in this answer.
为了更完整,这里是该答案的一部分:
你能 ping Spark master 所在的盒子 运行ning 吗?你能从 master ping worker 吗?更重要的是,你能 password-less 从 master box ssh 到 worker 吗?根据 1.5.2 文档,您需要能够使用私钥执行此操作并将工作人员输入 conf/slaves 文件。我在最后复制了相关段落。
您可能会遇到这样一种情况,即 worker 可以联系 master,但 master 无法回复 worker,所以看起来好像没有建立连接。检查两个方向。 我认为主节点上的从属文件和 password-less ssh 可能会导致与您所看到的类似的错误。
根据我交联的答案,还有 an old bug 但不清楚该错误是如何解决的。
更新:
可以通过在应用程序代码中包含驱动程序的 IP 地址(即本地笔记本电脑的 public IP)来避免上述问题。这可以通过在 spark 上下文中添加以下行来完成:
.set("spark.driver.host",YourSystemIPAddress)
但是,如果驱动程序的 IP 地址位于 NAT 后面,则可能会出现问题。在这种情况下,工作人员将无法找到 IP。