ec2 上的 sparkr:确保工作人员已注册并有足够的内存

sparkr on ec2 : ensure that workers are registered and have sufficient memory

我使用发行版附带的 spark-ec2 脚本在 ec2 上设置了一个 spark (spark-1.4.0) 集群。主机和一个从机启动正常,我可以在 http://:8080

上查看状态

现在我想 运行 我的集群上的 sparkR,这 运行 在主机和从机的本地模式下没问题:

Rscript Myscript.R local[2]

在 Myscript.R 我有以下几行:

library(SparkR)
#Initialize Spark context 
sc <- sparkR.init(args[[1]], "bartcv")

但是当我尝试在集群上 运行 时:

[ec2-user@ip-10-234-176-66 ~]$ Rscript Myscript.R spark://ec2-ww-xx-yy-zz.something.amazonaws.com:7077

Loading required package: methods
[SparkR] Initializing with classpath /usr/lib64/R/library/SparkR/sparkr-assembly-0.1.jar
Launching java with command  /usr/lib/jvm/java/bin/java   -Xmx512m -cp '/usr/lib64/R/library/SparkR/sparkr-assembly-0.1.jar:' edu.berkeley.cs.amplab.sparkr.SparkRBackend /tmp/Rtmp2Kylxz/backend_portb3c54b28b03 
15/07/16 13:47:16 INFO Slf4jLogger: Slf4jLogger started
15/07/16 13:47:37 WARN TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient memory
15/07/16 13:47:52 WARN TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient memory
15/07/16 13:48:07 WARN TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient memory
15/07/16 13:48:19 ERROR SparkDeploySchedulerBackend: Application has been killed. Reason: All masters are unresponsive! Giving up.
collect on 5 failed with java.lang.reflect.InvocationTargetException
java.lang.reflect.InvocationTargetException
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:606)
        at edu.berkeley.cs.amplab.sparkr.SparkRBackendHandler.handleMethodCall(SparkRBackendHandler.scala:111)
        at edu.berkeley.cs.amplab.sparkr.SparkRBackendHandler.channelRead0(SparkRBackendHandler.scala:58)
        at edu.berkeley.cs.amplab.sparkr.SparkRBackendHandler.channelRead0(SparkRBackendHandler.scala:19)
        at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
        at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
        at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:163)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
        at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:787)
        at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:130)
        at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
        at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
        at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
        at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
        at io.netty.util.concurrent.SingleThreadEventExecutor.run(SingleThreadEventExecutor.java:116)
        at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:137)
        at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: All masters are unresponsive! Giving up.
        at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1185)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage.apply(DAGScheduler.scala:1174)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage.apply(DAGScheduler.scala:1173)
        at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
        at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1173)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed.apply(DAGScheduler.scala:688)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed.apply(DAGScheduler.scala:688)
        at scala.Option.foreach(Option.scala:236)
        at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:688)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive.applyOrElse(DAGScheduler.scala:1391)
        at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
        at akka.actor.ActorCell.invoke(ActorCell.scala:456)
        at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
        at akka.dispatch.Mailbox.run(Mailbox.scala:219)
        at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
        at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
        at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
        at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Error: returnStatus == 0 is not TRUE
Execution halted

然后我将以下行添加到 /root/spark/conf/spark-env.sh :

export SPARK_WORKER_MEMORY=2g
export SPARK_EXECUTOR_MEMORY=2g
export SPARK_DRIVER_MEMORY=2g

并用

复制到slaves
~/spark-ec2/copy-dir /root/spark/conf/

但我仍然遇到同样的错误。

我发现如果不做一些额外的工作,Rscript 将不会加载必要的 ec2 spark 配置,但 sparkR 会。 sparkR 是一个 bash 脚本,它为 ec2 集群加载必要的配置,即 sparkR(在 /root/spark/bin/sparkR 中)获取 spark 配置文件(/root/spark/bin/load-spark-env.sh,后者又调用 /root/spark/conf/spark-env.sh)并执行 spark-submit sparkr-shell-main.