如何在 YARN 集群上使用 ZeroMQ 运行 简单的 Spark 应用程序?

How to run simple Spark app with ZeroMQ on a YARN cluster?

我正在尝试 运行 在 YARN[=34= 上使用 ZeroMQ 的 Spark 应用程序]集群。

执行者的日志包含以下消息:

16/05/24 11:42:09 INFO storage.MemoryStore: Block broadcast_2 stored as values in memory (estimated size 67.2 KB, free 530.2 MB)
16/05/24 11:42:09 INFO util.RecurringTimer: Started timer for BlockGenerator at time 1464090129200
16/05/24 11:42:09 INFO receiver.BlockGenerator: Started BlockGenerator
16/05/24 11:42:09 INFO receiver.BlockGenerator: Started block pushing thread
16/05/24 11:42:09 INFO receiver.ReceiverSupervisorImpl: Starting receiver
16/05/24 11:42:09 INFO receiver.ActorReceiver: Supervision tree for receivers initialized at:akka://sparkExecutor/user/Supervisor0
16/05/24 11:42:09 INFO receiver.ReceiverSupervisorImpl: Called receiver onStart
16/05/24 11:42:09 INFO receiver.ActorReceiver: Started receiver worker at:akka://sparkExecutor/user/Supervisor0/ZeroMQReceiver
16/05/24 11:42:09 INFO receiver.ReceiverSupervisorImpl: Waiting for receiver to be stopped
16/05/24 11:42:09 ERROR actor.OneForOneStrategy: No configuration setting found for key 'akka.zeromq'
akka.actor.ActorInitializationException: exception during creation
    at akka.actor.ActorInitializationException$.apply(Actor.scala:166)
    at akka.actor.ActorCell.create(ActorCell.scala:596)
    at akka.actor.ActorCell.invokeAll(ActorCell.scala:456)
    at akka.actor.ActorCell.systemInvoke(ActorCell.scala:478)
    at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:263)
    at akka.dispatch.Mailbox.run(Mailbox.scala:219)
    at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: com.typesafe.config.ConfigException$Missing: No configuration setting found for key 'akka.zeromq'
    at com.typesafe.config.impl.SimpleConfig.findKey(SimpleConfig.java:124)
    at com.typesafe.config.impl.SimpleConfig.find(SimpleConfig.java:147)
    at com.typesafe.config.impl.SimpleConfig.find(SimpleConfig.java:151)
    at com.typesafe.config.impl.SimpleConfig.find(SimpleConfig.java:159)
    at com.typesafe.config.impl.SimpleConfig.find(SimpleConfig.java:164)
    at com.typesafe.config.impl.SimpleConfig.getDuration(SimpleConfig.java:260)
    at com.typesafe.config.impl.SimpleConfig.getMilliseconds(SimpleConfig.java:249)
    at akka.zeromq.ZeroMQExtension.<init>(ZeroMQExtension.scala:48)
    at akka.zeromq.ZeroMQExtension$.createExtension(ZeroMQExtension.scala:35)
    at akka.zeromq.ZeroMQExtension$.createExtension(ZeroMQExtension.scala:32)
    at akka.actor.ActorSystemImpl.registerExtension(ActorSystem.scala:713)
    at akka.actor.ExtensionId$class.apply(Extension.scala:79)
    at akka.zeromq.ZeroMQExtension$.apply(ZeroMQExtension.scala:32)
    at org.apache.spark.streaming.zeromq.ZeroMQReceiver.preStart(ZeroMQReceiver.scala:39)
    at akka.actor.Actor$class.aroundPreStart(Actor.scala:472)
    at org.apache.spark.streaming.zeromq.ZeroMQReceiver.aroundPreStart(ZeroMQReceiver.scala:32)
    at akka.actor.ActorCell.create(ActorCell.scala:580)
    ... 9 more
16/05/24 11:42:09 ERROR actor.ActorCell: changing Recreate into Create after akka.actor.ActorInitializationException: exception during creation
16/05/24 11:42:09 ERROR actor.OneForOneStrategy: No configuration setting found for key 'akka.zeromq'
akka.actor.ActorInitializationException: exception during creation
    at akka.actor.ActorInitializationException$.apply(Actor.scala:166)
    at akka.actor.ActorCell.create(ActorCell.scala:596)
    at akka.actor.dungeon.FaultHandling$class.finishCreate(FaultHandling.scala:136)
    at akka.actor.dungeon.FaultHandling$class.faultCreate(FaultHandling.scala:130)
    at akka.actor.ActorCell.faultCreate(ActorCell.scala:369)
    at akka.actor.dungeon.FaultHandling$class.faultRecreate(FaultHandling.scala:59)
    at akka.actor.ActorCell.faultRecreate(ActorCell.scala:369)
    at akka.actor.ActorCell.invokeAll(ActorCell.scala:459)
    at akka.actor.ActorCell.systemInvoke(ActorCell.scala:478)
    at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:263)
    at akka.dispatch.Mailbox.run(Mailbox.scala:219)
    at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: com.typesafe.config.ConfigException$Missing: No configuration setting found for key 'akka.zeromq'
    at com.typesafe.config.impl.SimpleConfig.findKey(SimpleConfig.java:124)
    at com.typesafe.config.impl.SimpleConfig.find(SimpleConfig.java:147)
    at com.typesafe.config.impl.SimpleConfig.find(SimpleConfig.java:151)
    at com.typesafe.config.impl.SimpleConfig.find(SimpleConfig.java:159)
    at com.typesafe.config.impl.SimpleConfig.find(SimpleConfig.java:164)
    at com.typesafe.config.impl.SimpleConfig.getDuration(SimpleConfig.java:260)
    at com.typesafe.config.impl.SimpleConfig.getMilliseconds(SimpleConfig.java:249)
    at akka.zeromq.ZeroMQExtension.<init>(ZeroMQExtension.scala:48)
    at akka.zeromq.ZeroMQExtension$.createExtension(ZeroMQExtension.scala:35)
    at akka.zeromq.ZeroMQExtension$.createExtension(ZeroMQExtension.scala:32)
    at akka.actor.ActorSystemImpl.registerExtension(ActorSystem.scala:713)
    at akka.actor.ExtensionId$class.apply(Extension.scala:79)
    at akka.zeromq.ZeroMQExtension$.apply(ZeroMQExtension.scala:32)
    at org.apache.spark.streaming.zeromq.ZeroMQReceiver.preStart(ZeroMQReceiver.scala:39)
    at akka.actor.Actor$class.aroundPreStart(Actor.scala:472)
    at org.apache.spark.streaming.zeromq.ZeroMQReceiver.aroundPreStart(ZeroMQReceiver.scala:32)
    at akka.actor.ActorCell.create(ActorCell.scala:580)
    ... 14 more

我的 application.conf 文件包含 akka.zeromq 部分,但执行者似乎看不到这些参数(它们各自的配置设置)。

车主端应用可以访问 application.conf 文件。

此问题在 'Words Count' 示例应用中重现。

我尝试使用以下命令来 运行 这个应用程序:

spark-submit 
--verbose 
--class app.ZeroMQWordCount 
--master yarn-cluster 
app-allinone.jar "tcp://127.0.1.1:1234" "foo"

spark-submit 
--class app.ZeroMQWordCount  
--master yarn-cluster 
--files hdfs://namenode:8020/app/application.conf 
--conf "spark.executor.extraClassPath=application.conf" 
app-allinone.jar "tcp://127.0.1.1:1234" "foo"

spark-submit 
--class app.ZeroMQWordCount  
--master yarn-cluster 
--files hdfs://namenode:8020/app/application.conf 
--conf "spark.executor.extraClassPath=./" 
app-allinone.jar "tcp://127.0.1.1:1234" "foo"

我看到有两种方法可以解决我的问题:

1) 通过 "SparkConf" 对象设置选项(例如 sparkConf.set("akka.zeromq.new-socket-timeout", "5")

2)通过 cli args 设置选项(例如 --conf "spark.executor.extraJavaOptions=-Dakka.zeromq.poll-timeout=100ms -Dakka.zeromq.new-socket-timeout=5s"

您还应该初始化所有 akka.zeromq.* 选项:

sparkConf.set("akka.zeromq.socket-dispatcher.executor", "thread-pool-executor")

sparkConf.set("akka.zeromq.new-socket-timeout", "5")

sparkConf.set("akka.zeromq.poll-timeout", "100")

sparkConf.set("akka.zeromq.socket-dispatcher.thread-pool-executor.allow-core-timeout", "off")

sparkConf.set("akka.zeromq.socket-dispatcher.type", "PinnedDispatcher")