Spark 资源未在 Amazon EMR 上完全分配

Spark resources not fully allocated on Amazon EMR

我正在尝试为一项简单任务最大限度地利用集群。

集群为 1+2 x m3.xlarge、运行nning Spark 1.3.1、Hadoop 2.4、Amazon AMI 3.7

任务读取文本文件的所有行并将它们解析为 csv。

当我以 yarn-cluster 模式 spark-submit 任务时,我得到以下结果之一:

我的预期:

有时,当我使用 1 个执行程序执行 "successful" 时,克隆并重新启动该步骤以 0 个执行程序结束。

我使用这个命令创建了我的集群:

aws emr --region us-east-1 create-cluster --name "Spark Test"
--ec2-attributes KeyName=mykey 
--ami-version 3.7.0 
--use-default-roles 
--instance-type m3.xlarge 
--instance-count 3 
--log-uri s3://mybucket/logs/ 
--bootstrap-actions Path=s3://support.elasticmapreduce/spark/install-spark,Args=["-x"] 
--steps Name=Sample,Jar=s3://elasticmapreduce/libs/script-runner/script-runner.jar,Args=[/home/hadoop/spark/bin/spark-submit,--master,yarn,--deploy-mode,cluster,--class,my.sample.spark.Sample,s3://mybucket/test/sample_2.10-1.0.0-SNAPSHOT-shaded.jar,s3://mybucket/data/],ActionOnFailure=CONTINUE

有一些步骤变化,包括:

--驱动程序内存 8G --驱动程序核心数 4 --num-executors 2


带 -x 的安装 spark 脚本生成以下 spark-defaults.conf:

$ cat spark-defaults.conf
spark.eventLog.enabled  false
spark.executor.extraJavaOptions         -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+UseConcMarkSweepGC -XX:CMSInitiatingOccupancyFraction=70 -XX:MaxHeapFreeRatio=70
spark.driver.extraJavaOptions         -Dspark.driver.log.level=INFO
spark.executor.instances        2
spark.executor.cores    4
spark.executor.memory   9404M
spark.default.parallelism       8

更新 1

我在通用 JavaWordCount 示例中得到了相同的行为:

/home/hadoop/spark/bin/spark-submit --verbose --master yarn --deploy-mode cluster --driver-memory 8G --class org.apache.spark.examples.JavaWordCount /home/hadoop/spark/lib/spark-examples-1.3.1-hadoop2.4.0.jar s3://mybucket/data/

但是,如果我删除“--driver-memory 8G”,任务将分配 2 个执行程序并正确完成。

那么,驱动程序内存阻止我获取执行程序的任务是怎么回事?

驱动程序是否应该像解释的那样在集群的主节点上与 Yarn 主容器一起执行 here

如何为我的 Spark 作业驱动程序提供更多内存? (收集和其他一些有用的操作出现的地方)

问题是关于 Spark 如何在 YARN 上工作的预期。当 Spark 运行 集群或主部署模式设置为 yarn-cluster 时,driver 不是 在主节点上执行,而是在应用程序中执行其中一个从属节点上的主容器。有关详细信息,请参阅 https://spark.apache.org/docs/latest/running-on-yarn.html

我预计正在发生的事情是集群无法满足 driver 的内存要求(请记住,集群实际请求的内存是您要求的加上开销),因此永远等待分配Application Master driver 将 运行 或执行者。

要为 driver 提供您请求的内存量,您需要使用额外的从站以便同时为 cluster-based driver 和执行程序提供资源.由于 driver 的开销,我怀疑您可能需要使用具有更多内存的实例类型。当您为 driver 请求 8G 时,请查看资源管理器日志并验证请求的实际数量。

至 运行 主节点上的 driver 部署模式需要为客户端。如果您利用一个步骤调用脚本将 driver jar 本地化到主节点上,然后下一步可以调用 spark-submit set for deployment mode client 并引用本地主文件系统上的 JAR。

最大化集群使用率的解决方案是在 EMR 上安装 spark 时忘记“-x”参数,并手动调整执行程序内存和内核。

这个 post 很好地解释了当 运行 Spark on YARN 时如何进行资源分配。

要记住的一件重要事情是所有执行者必须分配相同的资源!正如我们所说,Spark 不支持异构执行器。 (目前正在进行一些支持 GPU 的工作,但这是另一个话题)

因此,为了在为执行程序分配最大内存的同时获得分配给驱动程序的最大内存,我应该像这样拆分我的节点(这个 slideshare 在第 25 页提供了很好的屏幕截图):

  • 节点 0 - 主节点(纱线资源管理器)
  • 节点 1 - NodeManager(Container(Driver) + Container(Executor))
  • 节点 2 - NodeManager(Container(Executor) + Container(Executor))

注意:另一种选择是 spark-submit 与来自主节点 0 的 --master yarn --deploy-mode client。是否有任何反例,这是一个坏主意?

在我的示例中,我最多可以有 3 个执行程序,每个执行程序有 2 个 vcores,每个 4736 MB + 一个具有相同规格的驱动程序。

4736 内存是从/home/hadoop/conf/yarn-site.xml 中定义的yarn.nodemanager.resource.memory-mb 的值导出的。在 m3.xlarge 上,它设置为 11520 mb(请参阅 here 以了解与每个实例类型关联的所有值)

然后,我们得到:

(11520 - 1024) / 2(每个节点的执行者数)= 5248 => 5120(四舍五入为 256 mb 增量,如 yarn.scheduler.minimum-allocation-mb 中所定义)

7% * 5120 = 367 向上舍入为 384(内存开销)在 spark 1.4[=20= 中将变为 10% ]

5120 - 384 = 4736

其他有趣的链接:

Michel Lemay 的 post 是很好的背景读物,他给出了 1 个特定集群配置的答案。我已将该逻辑嵌入到一个 spreadsheet 中,它将显示任何集群的最佳选项。要使用,请填写集群中的节点数、虚拟数 cores/node 和可分配量 memory/node。执行此操作后,sheet 将为您提供启动命令的选项,这些选项将充分利用您的集群的客户端和集群模式,每个节点有 1、2、4 和 8 个执行程序。我突出显示了对应于每个节点 2 个执行程序的行,因为这一直是我测试中的最佳选择。随意复制此 sheet 或根据需要为不同的集群类型添加选项卡。

https://docs.google.com/spreadsheets/d/1VH7Qly308hoRPu5VoLIg0ceolrzen-nBktRFkXHRrY4/edit?usp=sharing

我是这样解决这个问题的:

通过设置 spark.executor.memory + driver-memory 低于任何给定 MASTER 节点的总和,然后 YARN 能够将 Master 和 executor 都放在给定节点上。你牺牲了一些丢失的内存其他节点,但更重要的是我有 CPU 运行。这是一个示例(在 r3.8xlarge 上):

aws emr add-steps --cluster-id j-1234 --steps Type=Spark,Name=foob3,Args=[--conf,spark.memory.fraction=0.95,--conf,spark.memory.storageFraction=0.1,--conf,spark.yarn.executor.memoryOverhead=8000,--conf,spark.executor.memory=200g,--conf,spark.executor.cores=32,--conf,spark.executor.instances=4,--conf,spark.dynamicAllocation.enabled=false,--class,myclass.Foo,--deploy-mode,cluster,--master,yarn,--driver-memory,10g,s3://myjar-1.0-SNAPSHOT.jar],ActionOnFailure=CONTINUE