火花可扩展性:我做错了什么?
spark scalability: what am I doing wrong?
我正在使用 spark 处理数据,它可以处理一天的数据 (40G),但在一周的数据中因 OOM 而失败:
import pyspark
import datetime
import operator
sc = pyspark.SparkContext()
sqc = pyspark.sql.SQLContext(sc)
sc.union([sqc.parquetFile(hour.strftime('.....'))
.map(lambda row:(row.id, row.foo))
for hour in myrange(beg,end,datetime.timedelta(0,3600))]) \
.reduceByKey(operator.add).saveAsTextFile("myoutput")
不同ID的数量小于10k。
每个ID都是一个小的int
。
作业失败,因为太多执行程序因 OOM 而失败。
当作业成功时(在小输入上),"myoutput"
大约是 100k。
- 我做错了什么?
- 我尝试用
collect
替换saveAsTextFile
(因为我实际上想在保存之前在python中做一些切片和切块),行为没有区别,同样的失败。这是意料之中的事吗?
- 我以前用
reduce(lambda x,y: x.union(y), [sqc.parquetFile(...)...])
而不是 sc.union
- 哪个更好?这有什么区别吗?
集群有 25 个节点,其中有 825GB RAM 和 224 个核心。
调用是 spark-submit --master yarn --num-executors 50 --executor-memory 5G
.
单个 RDD 大约有 140 列,涵盖一小时的数据,因此一周是 168(=7*24) 个 RDD 的并集。
原来不是spark的问题,而是yarn的问题。
解决方案是 运行 与
产生火花
spark-submit --conf spark.yarn.executor.memoryOverhead=1000
(或修改纱线配置)。
Spark 在缩放时经常遇到内存不足错误。在这些情况下,程序员应该进行微调。或者重新检查你的代码,确保你没有做太多的事情,比如收集驱动程序中的所有 bigdata,这很可能超过 memoryOverhead限制,不管你设置多大。
要了解正在发生的事情,您应该意识到 yarn 决定终止容器以超出内存限制。当容器超出 memoryOverhead 限制时,就会发生这种情况。
在调度程序中,您可以检查事件时间轴以查看容器发生了什么。如果 Yarn 杀死了一个容器,它会显示为红色,当你 hover/click 在它上面时,你会看到这样的消息:
Container killed by YARN for exceeding memory limits. 16.9 GB of 16 GB physical memory used. Consider boosting spark.yarn.executor.memoryOverhead.
所以在那种情况下,您要关注的是这些配置属性(值是 my 集群上的示例):
# More executor memory overhead
spark.yarn.executor.memoryOverhead 4096
# More driver memory overhead
spark.yarn.driver.memoryOverhead 8192
# Max on my nodes
#spark.executor.cores 8
#spark.executor.memory 12G
# For the executors
spark.executor.cores 6
spark.executor.memory 8G
# For the driver
spark.driver.cores 6
spark.driver.memory 8G
首先要做的是增加memoryOverhead
.
在驱动程序中还是在执行程序中?
当您从 UI 概览您的集群时,您可以点击 Attempt ID 并检查 Diagnostics Info,其中应该提到容器的 ID被杀害。如果它与您的 AM 容器 相同,则它是驱动程序,否则是执行程序。
这并没有解决问题,现在怎么办?
您必须微调您提供的内核数量和堆内存。您会看到 pyspark 将在堆外内存中完成大部分工作,因此您不想为堆提供太多 space,因为那会被浪费。你不想给的太少,因为那时垃圾收集器会有问题。回想一下,这些是 JVM。
如 所述,一个 worker 可以托管多个执行程序,因此使用的内核数量会影响每个执行程序拥有的内存量,因此减少#cores 可能会有所帮助。
我把它写在 memoryOverhead issue in Spark and Spark – Container exited with a non-zero exit code 143 in more detail, mostly that I won't forget! Another option, that I haven't tried would be spark.default.parallelism or/and spark.storage.memoryFraction
中,根据我的经验,它没有帮助。
您可以像 sds 提到的那样传递配置标志,或者像这样:
spark-submit --properties-file my_properties
其中 "my_properties" 类似于我在上面列出的属性。
对于非数值,你可以这样做:
spark-submit --conf spark.executor.memory='4G'
我正在使用 spark 处理数据,它可以处理一天的数据 (40G),但在一周的数据中因 OOM 而失败:
import pyspark
import datetime
import operator
sc = pyspark.SparkContext()
sqc = pyspark.sql.SQLContext(sc)
sc.union([sqc.parquetFile(hour.strftime('.....'))
.map(lambda row:(row.id, row.foo))
for hour in myrange(beg,end,datetime.timedelta(0,3600))]) \
.reduceByKey(operator.add).saveAsTextFile("myoutput")
不同ID的数量小于10k。
每个ID都是一个小的int
。
作业失败,因为太多执行程序因 OOM 而失败。
当作业成功时(在小输入上),"myoutput"
大约是 100k。
- 我做错了什么?
- 我尝试用
collect
替换saveAsTextFile
(因为我实际上想在保存之前在python中做一些切片和切块),行为没有区别,同样的失败。这是意料之中的事吗? - 我以前用
reduce(lambda x,y: x.union(y), [sqc.parquetFile(...)...])
而不是sc.union
- 哪个更好?这有什么区别吗?
集群有 25 个节点,其中有 825GB RAM 和 224 个核心。
调用是 spark-submit --master yarn --num-executors 50 --executor-memory 5G
.
单个 RDD 大约有 140 列,涵盖一小时的数据,因此一周是 168(=7*24) 个 RDD 的并集。
原来不是spark的问题,而是yarn的问题。 解决方案是 运行 与
产生火花spark-submit --conf spark.yarn.executor.memoryOverhead=1000
(或修改纱线配置)。
Spark 在缩放时经常遇到内存不足错误。在这些情况下,程序员应该进行微调。或者重新检查你的代码,确保你没有做太多的事情,比如收集驱动程序中的所有 bigdata,这很可能超过 memoryOverhead限制,不管你设置多大。
要了解正在发生的事情,您应该意识到 yarn 决定终止容器以超出内存限制。当容器超出 memoryOverhead 限制时,就会发生这种情况。
在调度程序中,您可以检查事件时间轴以查看容器发生了什么。如果 Yarn 杀死了一个容器,它会显示为红色,当你 hover/click 在它上面时,你会看到这样的消息:
Container killed by YARN for exceeding memory limits. 16.9 GB of 16 GB physical memory used. Consider boosting spark.yarn.executor.memoryOverhead.
所以在那种情况下,您要关注的是这些配置属性(值是 my 集群上的示例):
# More executor memory overhead
spark.yarn.executor.memoryOverhead 4096
# More driver memory overhead
spark.yarn.driver.memoryOverhead 8192
# Max on my nodes
#spark.executor.cores 8
#spark.executor.memory 12G
# For the executors
spark.executor.cores 6
spark.executor.memory 8G
# For the driver
spark.driver.cores 6
spark.driver.memory 8G
首先要做的是增加memoryOverhead
.
在驱动程序中还是在执行程序中?
当您从 UI 概览您的集群时,您可以点击 Attempt ID 并检查 Diagnostics Info,其中应该提到容器的 ID被杀害。如果它与您的 AM 容器 相同,则它是驱动程序,否则是执行程序。
这并没有解决问题,现在怎么办?
您必须微调您提供的内核数量和堆内存。您会看到 pyspark 将在堆外内存中完成大部分工作,因此您不想为堆提供太多 space,因为那会被浪费。你不想给的太少,因为那时垃圾收集器会有问题。回想一下,这些是 JVM。
如
我把它写在 memoryOverhead issue in Spark and Spark – Container exited with a non-zero exit code 143 in more detail, mostly that I won't forget! Another option, that I haven't tried would be spark.default.parallelism or/and spark.storage.memoryFraction
中,根据我的经验,它没有帮助。
您可以像 sds 提到的那样传递配置标志,或者像这样:
spark-submit --properties-file my_properties
其中 "my_properties" 类似于我在上面列出的属性。
对于非数值,你可以这样做:
spark-submit --conf spark.executor.memory='4G'