火花可扩展性:我做错了什么?

spark scalability: what am I doing wrong?

我正在使用 spark 处理数据,它可以处理一天的数据 (40G),但在一周的数据中因 OOM 而失败:

import pyspark
import datetime
import operator
sc = pyspark.SparkContext()
sqc = pyspark.sql.SQLContext(sc)
sc.union([sqc.parquetFile(hour.strftime('.....'))
          .map(lambda row:(row.id, row.foo))
          for hour in myrange(beg,end,datetime.timedelta(0,3600))]) \
  .reduceByKey(operator.add).saveAsTextFile("myoutput")

不同ID的数量小于10k。 每个ID都是一个小的int。 作业失败,因为太多执行程序因 OOM 而失败。 当作业成功时(在小输入上),"myoutput" 大约是 100k。

  1. 我做错了什么?
  2. 我尝试用collect替换saveAsTextFile(因为我实际上想在保存之前在python中做一些切片和切块),行为没有区别,同样的失败。这是意料之中的事吗?
  3. 我以前用 reduce(lambda x,y: x.union(y), [sqc.parquetFile(...)...]) 而不是 sc.union - 哪个更好?这有什么区别吗?

集群有 25 个节点,其中有 825GB RAM 和 224 个核心。

调用是 spark-submit --master yarn --num-executors 50 --executor-memory 5G.

单个 RDD 大约有 140 列,涵盖一小时的数据,因此一周是 168(=7*24) 个 RDD 的并集。

原来不是spark的问题,而是yarn的问题。 解决方案是 运行 与

产生火花
spark-submit --conf spark.yarn.executor.memoryOverhead=1000

(或修改纱线配置)。

Spark 在缩放时经常遇到内存不足错误。在这些情况下,程序员应该进行微调。或者重新检查你的代码,确保你没有做太多的事情,比如收集驱动程序中的所有 ,这很可能超过 memoryOverhead限制,不管你设置多大。

要了解正在发生的事情,您应该意识到 决定终止容器以超出内存限制。当容器超出 memoryOverhead 限制时,就会发生这种情况。

在调度程序中,您可以检查事件时间轴以查看容器发生了什么。如果 Yarn 杀死了一个容器,它会显示为红色,当你 hover/click 在它上面时,你会看到这样的消息:

Container killed by YARN for exceeding memory limits. 16.9 GB of 16 GB physical memory used. Consider boosting spark.yarn.executor.memoryOverhead.


所以在那种情况下,您要关注的是这些配置属性(值是 my 集群上的示例):

# More executor memory overhead
spark.yarn.executor.memoryOverhead          4096

# More driver memory overhead
spark.yarn.driver.memoryOverhead            8192

# Max on my nodes
#spark.executor.cores                        8
#spark.executor.memory                       12G

# For the executors
spark.executor.cores                        6
spark.executor.memory                       8G

# For the driver
spark.driver.cores                          6
spark.driver.memory                         8G

首先要做的是增加memoryOverhead.

在驱动程序中还是在执行程序中?

当您从 UI 概览您的集群时,您可以点击 Attempt ID 并检查 Diagnostics Info,其中应该提到容器的 ID被杀害。如果它与您的 AM 容器 相同,则它是驱动程序,否则是执行程序。


这并没有解决问题,现在怎么办?

您必须微调您提供的内核数量和堆内存。您会看到 将在堆外内存中完成大部分工作,因此您不想为堆提供太多 space,因为那会被浪费。你不想给的太少,因为那时垃圾收集器会有问题。回想一下,这些是 JVM。

所述,一个 worker 可以托管多个执行程序,因此使用的内核数量会影响每个执行程序拥有的内存量,因此减少#cores 可能会有所帮助。

我把它写在 memoryOverhead issue in Spark and Spark – Container exited with a non-zero exit code 143 in more detail, mostly that I won't forget! Another option, that I haven't tried would be spark.default.parallelism or/and spark.storage.memoryFraction 中,根据我的经验,它没有帮助。


您可以像 sds 提到的那样传递配置标志,或者像这样:

spark-submit --properties-file my_properties

其中 "my_properties" 类似于我在上面列出的属性。

对于非数值,你可以这样做:

spark-submit --conf spark.executor.memory='4G'