PySpark Yarn 应用程序在 groupBy 上失败
PySpark Yarn Application fails on groupBy
我正在尝试 运行 处理从 google 云存储读取的大量数据 (2TB) 的 Yarn 模式作业。
管道可以这样总结:
sc.textFile("gs://path/*.json")\
.map(lambda row: json.loads(row))\
.map(toKvPair)\
.groupByKey().take(10)
[...] later processing on collections and output to GCS.
This computation over the elements of collections is not associative,
each element is sorted in it's keyspace.
当 运行 在 10GB 的数据上时,它已完成,没有任何问题。
然而,当我 运行 它在完整数据集上时,它总是失败,并在容器中显示以下日志:
15/11/04 16:08:07 WARN org.apache.spark.scheduler.cluster.YarnSchedulerBackend$YarnSchedulerEndpoint: ApplicationMaster has disassociated: xxxxxxxxxxx
15/11/04 16:08:07 ERROR org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend: Yarn application has already exited with state FINISHED!
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "/usr/lib/spark/python/pyspark/rdd.py", line 1299, in take
res = self.context.runJob(self, takeUpToNumLeft, p)
File "/usr/lib/spark/python/pyspark/context.py", line 916, in runJob
15/11/04 16:08:07 WARN org.apache.spark.ExecutorAllocationManager: No stages are running, but numRunningTasks != 0
port = self._jvm.PythonRDD.runJob(self._jsc.sc(), mappedRDD._jrdd, partitions)
File "/usr/lib/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py", line 538, in __call__
File "/usr/lib/spark/python/pyspark/sql/utils.py", line 36, in deco
return f(*a, **kw)
File "/usr/lib/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py", line 300, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.runJob.
: org.apache.spark.SparkException: Job cancelled because SparkContext was shut down
我尝试通过连接到 master 逐个启动每个操作来进行调查,但它似乎在 groupBy 上失败了。我也尝试通过添加节点并升级它们的内存和 CPU 数量来重新扩展集群,但我仍然遇到同样的问题。
120 个节点 + 1 个具有相同规格的主节点:
8 个 vCPU - 52GB 内存
我试图找到具有类似问题的线程但没有成功,所以我真的不知道我应该提供什么信息,因为日志不是很清楚,所以请随时询问更多信息。
主键是每条记录的必需值,我们需要所有不带过滤器的键,大约代表 600k 个键。
是否真的可以在不将集群扩展到大规模的情况下执行此操作?我刚读到 databricks 对 100TB 的数据 (https://databricks.com/blog/2014/10/10/spark-petabyte-sort.html) 进行了排序,这也涉及到大规模的洗牌。他们成功地将多个内存缓冲区替换为单个缓冲区,从而导致大量磁盘 IO?我的集群规模可以执行这样的操作吗?
总结一下我们通过对原始问题的评论学到的知识,如果一个小数据集有效(尤其是一个可能适合单台机器的总内存的数据集),然后尽管向集群添加了更多节点,但大数据集失败了,结合 groupByKey
的任何用法,最常要查找的是您的数据是否存在每个键的记录数显着不平衡。
特别是,截至今天,groupByKey
仍然有一个限制,即不仅必须将单个键的所有值洗牌到同一台机器,而且它们还必须能够放入内存中:
/**
* Group the values for each key in the RDD into a single sequence. Allows controlling the
* partitioning of the resulting key-value pair RDD by passing a Partitioner.
* The ordering of elements within each group is not guaranteed, and may even differ
* each time the resulting RDD is evaluated.
*
* Note: This operation may be very expensive. If you are grouping in order to perform an
* aggregation (such as a sum or average) over each key, using [[PairRDDFunctions.aggregateByKey]]
* or [[PairRDDFunctions.reduceByKey]] will provide much better performance.
*
* Note: As currently implemented, groupByKey must be able to hold all the key-value pairs for any
* key in memory. If a key has too many values, it can result in an [[OutOfMemoryError]].
*/
有一些 further discussion of this problem which points at a mailing list discussion 其中包括一些解决方法的讨论;也就是说,您可以明确地将 value/record 的散列字符串附加到密钥,散列到一些小的存储桶集中,以便您手动分片您的大组。
在您的情况下,您甚至可以最初进行 .map
转换,它仅有条件地调整已知热键的键以将其分成子组,同时保持非热键不变。
一般来说,"in-memory" 约束意味着您无法通过添加更多节点来真正解决严重偏斜的键,因为它需要在热节点上缩放 "in-place"。对于特定情况,您可以将 spark.executor.memory
设置为 --conf
或在 dataproc gcloud beta dataproc jobs submit spark [other flags] --properties spark.executor.memory=30g
中,只要最大键的值都可以容纳在 30g 中(有些 headroom/overhead)。但这将在任何可用的最大机器上达到顶峰,因此如果最大密钥的大小有可能随着整体数据集的增长而增长,最好更改密钥分布本身而不是尝试增加单执行程序内存.
我正在尝试 运行 处理从 google 云存储读取的大量数据 (2TB) 的 Yarn 模式作业。
管道可以这样总结:
sc.textFile("gs://path/*.json")\
.map(lambda row: json.loads(row))\
.map(toKvPair)\
.groupByKey().take(10)
[...] later processing on collections and output to GCS.
This computation over the elements of collections is not associative,
each element is sorted in it's keyspace.
当 运行 在 10GB 的数据上时,它已完成,没有任何问题。 然而,当我 运行 它在完整数据集上时,它总是失败,并在容器中显示以下日志:
15/11/04 16:08:07 WARN org.apache.spark.scheduler.cluster.YarnSchedulerBackend$YarnSchedulerEndpoint: ApplicationMaster has disassociated: xxxxxxxxxxx
15/11/04 16:08:07 ERROR org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend: Yarn application has already exited with state FINISHED!
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "/usr/lib/spark/python/pyspark/rdd.py", line 1299, in take
res = self.context.runJob(self, takeUpToNumLeft, p)
File "/usr/lib/spark/python/pyspark/context.py", line 916, in runJob
15/11/04 16:08:07 WARN org.apache.spark.ExecutorAllocationManager: No stages are running, but numRunningTasks != 0
port = self._jvm.PythonRDD.runJob(self._jsc.sc(), mappedRDD._jrdd, partitions)
File "/usr/lib/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py", line 538, in __call__
File "/usr/lib/spark/python/pyspark/sql/utils.py", line 36, in deco
return f(*a, **kw)
File "/usr/lib/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py", line 300, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.runJob.
: org.apache.spark.SparkException: Job cancelled because SparkContext was shut down
我尝试通过连接到 master 逐个启动每个操作来进行调查,但它似乎在 groupBy 上失败了。我也尝试通过添加节点并升级它们的内存和 CPU 数量来重新扩展集群,但我仍然遇到同样的问题。
120 个节点 + 1 个具有相同规格的主节点: 8 个 vCPU - 52GB 内存
我试图找到具有类似问题的线程但没有成功,所以我真的不知道我应该提供什么信息,因为日志不是很清楚,所以请随时询问更多信息。
主键是每条记录的必需值,我们需要所有不带过滤器的键,大约代表 600k 个键。 是否真的可以在不将集群扩展到大规模的情况下执行此操作?我刚读到 databricks 对 100TB 的数据 (https://databricks.com/blog/2014/10/10/spark-petabyte-sort.html) 进行了排序,这也涉及到大规模的洗牌。他们成功地将多个内存缓冲区替换为单个缓冲区,从而导致大量磁盘 IO?我的集群规模可以执行这样的操作吗?
总结一下我们通过对原始问题的评论学到的知识,如果一个小数据集有效(尤其是一个可能适合单台机器的总内存的数据集),然后尽管向集群添加了更多节点,但大数据集失败了,结合 groupByKey
的任何用法,最常要查找的是您的数据是否存在每个键的记录数显着不平衡。
特别是,截至今天,groupByKey
仍然有一个限制,即不仅必须将单个键的所有值洗牌到同一台机器,而且它们还必须能够放入内存中:
/**
* Group the values for each key in the RDD into a single sequence. Allows controlling the
* partitioning of the resulting key-value pair RDD by passing a Partitioner.
* The ordering of elements within each group is not guaranteed, and may even differ
* each time the resulting RDD is evaluated.
*
* Note: This operation may be very expensive. If you are grouping in order to perform an
* aggregation (such as a sum or average) over each key, using [[PairRDDFunctions.aggregateByKey]]
* or [[PairRDDFunctions.reduceByKey]] will provide much better performance.
*
* Note: As currently implemented, groupByKey must be able to hold all the key-value pairs for any
* key in memory. If a key has too many values, it can result in an [[OutOfMemoryError]].
*/
有一些 further discussion of this problem which points at a mailing list discussion 其中包括一些解决方法的讨论;也就是说,您可以明确地将 value/record 的散列字符串附加到密钥,散列到一些小的存储桶集中,以便您手动分片您的大组。
在您的情况下,您甚至可以最初进行 .map
转换,它仅有条件地调整已知热键的键以将其分成子组,同时保持非热键不变。
一般来说,"in-memory" 约束意味着您无法通过添加更多节点来真正解决严重偏斜的键,因为它需要在热节点上缩放 "in-place"。对于特定情况,您可以将 spark.executor.memory
设置为 --conf
或在 dataproc gcloud beta dataproc jobs submit spark [other flags] --properties spark.executor.memory=30g
中,只要最大键的值都可以容纳在 30g 中(有些 headroom/overhead)。但这将在任何可用的最大机器上达到顶峰,因此如果最大密钥的大小有可能随着整体数据集的增长而增长,最好更改密钥分布本身而不是尝试增加单执行程序内存.