如何在 YARN 集群上使用 ZeroMQ 运行 简单的 Spark 应用程序?
How to run simple Spark app with ZeroMQ on a YARN cluster?
我正在尝试 运行 在 YARN
[=34= 上使用 ZeroMQ
的 Spark 应用程序]集群。
执行者的日志包含以下消息:
16/05/24 11:42:09 INFO storage.MemoryStore: Block broadcast_2 stored as values in memory (estimated size 67.2 KB, free 530.2 MB)
16/05/24 11:42:09 INFO util.RecurringTimer: Started timer for BlockGenerator at time 1464090129200
16/05/24 11:42:09 INFO receiver.BlockGenerator: Started BlockGenerator
16/05/24 11:42:09 INFO receiver.BlockGenerator: Started block pushing thread
16/05/24 11:42:09 INFO receiver.ReceiverSupervisorImpl: Starting receiver
16/05/24 11:42:09 INFO receiver.ActorReceiver: Supervision tree for receivers initialized at:akka://sparkExecutor/user/Supervisor0
16/05/24 11:42:09 INFO receiver.ReceiverSupervisorImpl: Called receiver onStart
16/05/24 11:42:09 INFO receiver.ActorReceiver: Started receiver worker at:akka://sparkExecutor/user/Supervisor0/ZeroMQReceiver
16/05/24 11:42:09 INFO receiver.ReceiverSupervisorImpl: Waiting for receiver to be stopped
16/05/24 11:42:09 ERROR actor.OneForOneStrategy: No configuration setting found for key 'akka.zeromq'
akka.actor.ActorInitializationException: exception during creation
at akka.actor.ActorInitializationException$.apply(Actor.scala:166)
at akka.actor.ActorCell.create(ActorCell.scala:596)
at akka.actor.ActorCell.invokeAll(ActorCell.scala:456)
at akka.actor.ActorCell.systemInvoke(ActorCell.scala:478)
at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:263)
at akka.dispatch.Mailbox.run(Mailbox.scala:219)
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: com.typesafe.config.ConfigException$Missing: No configuration setting found for key 'akka.zeromq'
at com.typesafe.config.impl.SimpleConfig.findKey(SimpleConfig.java:124)
at com.typesafe.config.impl.SimpleConfig.find(SimpleConfig.java:147)
at com.typesafe.config.impl.SimpleConfig.find(SimpleConfig.java:151)
at com.typesafe.config.impl.SimpleConfig.find(SimpleConfig.java:159)
at com.typesafe.config.impl.SimpleConfig.find(SimpleConfig.java:164)
at com.typesafe.config.impl.SimpleConfig.getDuration(SimpleConfig.java:260)
at com.typesafe.config.impl.SimpleConfig.getMilliseconds(SimpleConfig.java:249)
at akka.zeromq.ZeroMQExtension.<init>(ZeroMQExtension.scala:48)
at akka.zeromq.ZeroMQExtension$.createExtension(ZeroMQExtension.scala:35)
at akka.zeromq.ZeroMQExtension$.createExtension(ZeroMQExtension.scala:32)
at akka.actor.ActorSystemImpl.registerExtension(ActorSystem.scala:713)
at akka.actor.ExtensionId$class.apply(Extension.scala:79)
at akka.zeromq.ZeroMQExtension$.apply(ZeroMQExtension.scala:32)
at org.apache.spark.streaming.zeromq.ZeroMQReceiver.preStart(ZeroMQReceiver.scala:39)
at akka.actor.Actor$class.aroundPreStart(Actor.scala:472)
at org.apache.spark.streaming.zeromq.ZeroMQReceiver.aroundPreStart(ZeroMQReceiver.scala:32)
at akka.actor.ActorCell.create(ActorCell.scala:580)
... 9 more
16/05/24 11:42:09 ERROR actor.ActorCell: changing Recreate into Create after akka.actor.ActorInitializationException: exception during creation
16/05/24 11:42:09 ERROR actor.OneForOneStrategy: No configuration setting found for key 'akka.zeromq'
akka.actor.ActorInitializationException: exception during creation
at akka.actor.ActorInitializationException$.apply(Actor.scala:166)
at akka.actor.ActorCell.create(ActorCell.scala:596)
at akka.actor.dungeon.FaultHandling$class.finishCreate(FaultHandling.scala:136)
at akka.actor.dungeon.FaultHandling$class.faultCreate(FaultHandling.scala:130)
at akka.actor.ActorCell.faultCreate(ActorCell.scala:369)
at akka.actor.dungeon.FaultHandling$class.faultRecreate(FaultHandling.scala:59)
at akka.actor.ActorCell.faultRecreate(ActorCell.scala:369)
at akka.actor.ActorCell.invokeAll(ActorCell.scala:459)
at akka.actor.ActorCell.systemInvoke(ActorCell.scala:478)
at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:263)
at akka.dispatch.Mailbox.run(Mailbox.scala:219)
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: com.typesafe.config.ConfigException$Missing: No configuration setting found for key 'akka.zeromq'
at com.typesafe.config.impl.SimpleConfig.findKey(SimpleConfig.java:124)
at com.typesafe.config.impl.SimpleConfig.find(SimpleConfig.java:147)
at com.typesafe.config.impl.SimpleConfig.find(SimpleConfig.java:151)
at com.typesafe.config.impl.SimpleConfig.find(SimpleConfig.java:159)
at com.typesafe.config.impl.SimpleConfig.find(SimpleConfig.java:164)
at com.typesafe.config.impl.SimpleConfig.getDuration(SimpleConfig.java:260)
at com.typesafe.config.impl.SimpleConfig.getMilliseconds(SimpleConfig.java:249)
at akka.zeromq.ZeroMQExtension.<init>(ZeroMQExtension.scala:48)
at akka.zeromq.ZeroMQExtension$.createExtension(ZeroMQExtension.scala:35)
at akka.zeromq.ZeroMQExtension$.createExtension(ZeroMQExtension.scala:32)
at akka.actor.ActorSystemImpl.registerExtension(ActorSystem.scala:713)
at akka.actor.ExtensionId$class.apply(Extension.scala:79)
at akka.zeromq.ZeroMQExtension$.apply(ZeroMQExtension.scala:32)
at org.apache.spark.streaming.zeromq.ZeroMQReceiver.preStart(ZeroMQReceiver.scala:39)
at akka.actor.Actor$class.aroundPreStart(Actor.scala:472)
at org.apache.spark.streaming.zeromq.ZeroMQReceiver.aroundPreStart(ZeroMQReceiver.scala:32)
at akka.actor.ActorCell.create(ActorCell.scala:580)
... 14 more
我的 application.conf
文件包含 akka.zeromq
部分,但执行者似乎看不到这些参数(它们各自的配置设置)。
车主端应用可以访问 application.conf
文件。
此问题在 'Words Count' 示例应用中重现。
我尝试使用以下命令来 运行 这个应用程序:
spark-submit
--verbose
--class app.ZeroMQWordCount
--master yarn-cluster
app-allinone.jar "tcp://127.0.1.1:1234" "foo"
spark-submit
--class app.ZeroMQWordCount
--master yarn-cluster
--files hdfs://namenode:8020/app/application.conf
--conf "spark.executor.extraClassPath=application.conf"
app-allinone.jar "tcp://127.0.1.1:1234" "foo"
spark-submit
--class app.ZeroMQWordCount
--master yarn-cluster
--files hdfs://namenode:8020/app/application.conf
--conf "spark.executor.extraClassPath=./"
app-allinone.jar "tcp://127.0.1.1:1234" "foo"
我看到有两种方法可以解决我的问题:
1) 通过 "SparkConf" 对象设置选项(例如 sparkConf.set("akka.zeromq.new-socket-timeout", "5")
)
2)通过 cli args 设置选项(例如 --conf "spark.executor.extraJavaOptions=-Dakka.zeromq.poll-timeout=100ms -Dakka.zeromq.new-socket-timeout=5s"
)
您还应该初始化所有 akka.zeromq.*
选项:
sparkConf.set("akka.zeromq.socket-dispatcher.executor", "thread-pool-executor")
sparkConf.set("akka.zeromq.new-socket-timeout", "5")
sparkConf.set("akka.zeromq.poll-timeout", "100")
sparkConf.set("akka.zeromq.socket-dispatcher.thread-pool-executor.allow-core-timeout", "off")
sparkConf.set("akka.zeromq.socket-dispatcher.type", "PinnedDispatcher")
我正在尝试 运行 在 YARN
[=34= 上使用 ZeroMQ
的 Spark 应用程序]集群。
执行者的日志包含以下消息:
16/05/24 11:42:09 INFO storage.MemoryStore: Block broadcast_2 stored as values in memory (estimated size 67.2 KB, free 530.2 MB)
16/05/24 11:42:09 INFO util.RecurringTimer: Started timer for BlockGenerator at time 1464090129200
16/05/24 11:42:09 INFO receiver.BlockGenerator: Started BlockGenerator
16/05/24 11:42:09 INFO receiver.BlockGenerator: Started block pushing thread
16/05/24 11:42:09 INFO receiver.ReceiverSupervisorImpl: Starting receiver
16/05/24 11:42:09 INFO receiver.ActorReceiver: Supervision tree for receivers initialized at:akka://sparkExecutor/user/Supervisor0
16/05/24 11:42:09 INFO receiver.ReceiverSupervisorImpl: Called receiver onStart
16/05/24 11:42:09 INFO receiver.ActorReceiver: Started receiver worker at:akka://sparkExecutor/user/Supervisor0/ZeroMQReceiver
16/05/24 11:42:09 INFO receiver.ReceiverSupervisorImpl: Waiting for receiver to be stopped
16/05/24 11:42:09 ERROR actor.OneForOneStrategy: No configuration setting found for key 'akka.zeromq'
akka.actor.ActorInitializationException: exception during creation
at akka.actor.ActorInitializationException$.apply(Actor.scala:166)
at akka.actor.ActorCell.create(ActorCell.scala:596)
at akka.actor.ActorCell.invokeAll(ActorCell.scala:456)
at akka.actor.ActorCell.systemInvoke(ActorCell.scala:478)
at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:263)
at akka.dispatch.Mailbox.run(Mailbox.scala:219)
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: com.typesafe.config.ConfigException$Missing: No configuration setting found for key 'akka.zeromq'
at com.typesafe.config.impl.SimpleConfig.findKey(SimpleConfig.java:124)
at com.typesafe.config.impl.SimpleConfig.find(SimpleConfig.java:147)
at com.typesafe.config.impl.SimpleConfig.find(SimpleConfig.java:151)
at com.typesafe.config.impl.SimpleConfig.find(SimpleConfig.java:159)
at com.typesafe.config.impl.SimpleConfig.find(SimpleConfig.java:164)
at com.typesafe.config.impl.SimpleConfig.getDuration(SimpleConfig.java:260)
at com.typesafe.config.impl.SimpleConfig.getMilliseconds(SimpleConfig.java:249)
at akka.zeromq.ZeroMQExtension.<init>(ZeroMQExtension.scala:48)
at akka.zeromq.ZeroMQExtension$.createExtension(ZeroMQExtension.scala:35)
at akka.zeromq.ZeroMQExtension$.createExtension(ZeroMQExtension.scala:32)
at akka.actor.ActorSystemImpl.registerExtension(ActorSystem.scala:713)
at akka.actor.ExtensionId$class.apply(Extension.scala:79)
at akka.zeromq.ZeroMQExtension$.apply(ZeroMQExtension.scala:32)
at org.apache.spark.streaming.zeromq.ZeroMQReceiver.preStart(ZeroMQReceiver.scala:39)
at akka.actor.Actor$class.aroundPreStart(Actor.scala:472)
at org.apache.spark.streaming.zeromq.ZeroMQReceiver.aroundPreStart(ZeroMQReceiver.scala:32)
at akka.actor.ActorCell.create(ActorCell.scala:580)
... 9 more
16/05/24 11:42:09 ERROR actor.ActorCell: changing Recreate into Create after akka.actor.ActorInitializationException: exception during creation
16/05/24 11:42:09 ERROR actor.OneForOneStrategy: No configuration setting found for key 'akka.zeromq'
akka.actor.ActorInitializationException: exception during creation
at akka.actor.ActorInitializationException$.apply(Actor.scala:166)
at akka.actor.ActorCell.create(ActorCell.scala:596)
at akka.actor.dungeon.FaultHandling$class.finishCreate(FaultHandling.scala:136)
at akka.actor.dungeon.FaultHandling$class.faultCreate(FaultHandling.scala:130)
at akka.actor.ActorCell.faultCreate(ActorCell.scala:369)
at akka.actor.dungeon.FaultHandling$class.faultRecreate(FaultHandling.scala:59)
at akka.actor.ActorCell.faultRecreate(ActorCell.scala:369)
at akka.actor.ActorCell.invokeAll(ActorCell.scala:459)
at akka.actor.ActorCell.systemInvoke(ActorCell.scala:478)
at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:263)
at akka.dispatch.Mailbox.run(Mailbox.scala:219)
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: com.typesafe.config.ConfigException$Missing: No configuration setting found for key 'akka.zeromq'
at com.typesafe.config.impl.SimpleConfig.findKey(SimpleConfig.java:124)
at com.typesafe.config.impl.SimpleConfig.find(SimpleConfig.java:147)
at com.typesafe.config.impl.SimpleConfig.find(SimpleConfig.java:151)
at com.typesafe.config.impl.SimpleConfig.find(SimpleConfig.java:159)
at com.typesafe.config.impl.SimpleConfig.find(SimpleConfig.java:164)
at com.typesafe.config.impl.SimpleConfig.getDuration(SimpleConfig.java:260)
at com.typesafe.config.impl.SimpleConfig.getMilliseconds(SimpleConfig.java:249)
at akka.zeromq.ZeroMQExtension.<init>(ZeroMQExtension.scala:48)
at akka.zeromq.ZeroMQExtension$.createExtension(ZeroMQExtension.scala:35)
at akka.zeromq.ZeroMQExtension$.createExtension(ZeroMQExtension.scala:32)
at akka.actor.ActorSystemImpl.registerExtension(ActorSystem.scala:713)
at akka.actor.ExtensionId$class.apply(Extension.scala:79)
at akka.zeromq.ZeroMQExtension$.apply(ZeroMQExtension.scala:32)
at org.apache.spark.streaming.zeromq.ZeroMQReceiver.preStart(ZeroMQReceiver.scala:39)
at akka.actor.Actor$class.aroundPreStart(Actor.scala:472)
at org.apache.spark.streaming.zeromq.ZeroMQReceiver.aroundPreStart(ZeroMQReceiver.scala:32)
at akka.actor.ActorCell.create(ActorCell.scala:580)
... 14 more
我的 application.conf
文件包含 akka.zeromq
部分,但执行者似乎看不到这些参数(它们各自的配置设置)。
车主端应用可以访问 application.conf
文件。
此问题在 'Words Count' 示例应用中重现。
我尝试使用以下命令来 运行 这个应用程序:
spark-submit
--verbose
--class app.ZeroMQWordCount
--master yarn-cluster
app-allinone.jar "tcp://127.0.1.1:1234" "foo"
spark-submit
--class app.ZeroMQWordCount
--master yarn-cluster
--files hdfs://namenode:8020/app/application.conf
--conf "spark.executor.extraClassPath=application.conf"
app-allinone.jar "tcp://127.0.1.1:1234" "foo"
spark-submit
--class app.ZeroMQWordCount
--master yarn-cluster
--files hdfs://namenode:8020/app/application.conf
--conf "spark.executor.extraClassPath=./"
app-allinone.jar "tcp://127.0.1.1:1234" "foo"
我看到有两种方法可以解决我的问题:
1) 通过 "SparkConf" 对象设置选项(例如 sparkConf.set("akka.zeromq.new-socket-timeout", "5")
)
2)通过 cli args 设置选项(例如 --conf "spark.executor.extraJavaOptions=-Dakka.zeromq.poll-timeout=100ms -Dakka.zeromq.new-socket-timeout=5s"
)
您还应该初始化所有 akka.zeromq.*
选项:
sparkConf.set("akka.zeromq.socket-dispatcher.executor", "thread-pool-executor")
sparkConf.set("akka.zeromq.new-socket-timeout", "5")
sparkConf.set("akka.zeromq.poll-timeout", "100")
sparkConf.set("akka.zeromq.socket-dispatcher.thread-pool-executor.allow-core-timeout", "off")
sparkConf.set("akka.zeromq.socket-dispatcher.type", "PinnedDispatcher")