Spark 使用 python:如何解决阶段 x 包含非常大的任务 (xxx KB)。建议的最大任务大小为 100 KB
Spark using python: How to resolve Stage x contains a task of very large size (xxx KB). The maximum recommended task size is 100 KB
我刚刚创建了 python 个 range(1,100000)
列表。
使用 SparkContext 完成以下步骤:
a = sc.parallelize([i for i in range(1, 100000)])
b = sc.parallelize([i for i in range(1, 100000)])
c = a.zip(b)
>>> [(1, 1), (2, 2), -----]
sum = sc.accumulator(0)
c.foreach(lambda (x, y): life.add((y-x)))
其中警告如下:
ARN TaskSetManager:第 3 阶段包含一个非常大的任务 (4644 KB)。建议的最大任务大小为 100 KB。
如何解决这个警告?有没有办法处理尺寸?还有,会不会影响大数据的时间复杂度?
Spark 在发送任务期间本地发送每个变量的副本。对于较大的此类变量,您可能需要使用 Broadcast Variables
如果你仍然面临大小问题,那么也许这个数据本身应该是一个 RDD
扩展@leo9r 评论:考虑不使用 python range
,而是 sc.range
https://spark.apache.org/docs/1.6.0/api/python/pyspark.html#pyspark.SparkContext.range.
因此您可以避免将庞大的列表从您的驱动程序传输到执行程序。
当然,这样的RDD通常只用于测试目的,所以你不希望它们被广播。
总体思路是 PySpark 创建与执行程序一样多的 java 个进程,然后将数据传送到每个进程。如果进程太少,java 堆 space.
会出现内存瓶颈
在你的例子中,具体错误是你用sc.parallelize([...])
创建的RDD没有指定分区数(参数numSlices
,见docs)。而RDD默认的partition数量太少(可能是由单个partition构成)。
要解决这个问题,只需指定所需的分区数:
a = sc.parallelize([...], numSlices=1000) # and likewise for b
随着您指定越来越多的切片,您会看到警告消息中所述的大小减小。增加切片的数量,直到您不再收到警告消息。例如得到
Stage 0 contains a task of very large size (696 KB). The maximum recommended task size is 100 KB
意味着你需要指定更多的切片。
处理内存问题时可能有用的另一个提示(但这与警告消息无关):默认情况下,每个执行程序可用的内存为 1 GB 左右。您可以通过命令行指定更大的数量,例如 --executor-memory 64G
.
我刚刚创建了 python 个 range(1,100000)
列表。
使用 SparkContext 完成以下步骤:
a = sc.parallelize([i for i in range(1, 100000)])
b = sc.parallelize([i for i in range(1, 100000)])
c = a.zip(b)
>>> [(1, 1), (2, 2), -----]
sum = sc.accumulator(0)
c.foreach(lambda (x, y): life.add((y-x)))
其中警告如下:
ARN TaskSetManager:第 3 阶段包含一个非常大的任务 (4644 KB)。建议的最大任务大小为 100 KB。
如何解决这个警告?有没有办法处理尺寸?还有,会不会影响大数据的时间复杂度?
Spark 在发送任务期间本地发送每个变量的副本。对于较大的此类变量,您可能需要使用 Broadcast Variables
如果你仍然面临大小问题,那么也许这个数据本身应该是一个 RDD
扩展@leo9r 评论:考虑不使用 python range
,而是 sc.range
https://spark.apache.org/docs/1.6.0/api/python/pyspark.html#pyspark.SparkContext.range.
因此您可以避免将庞大的列表从您的驱动程序传输到执行程序。
当然,这样的RDD通常只用于测试目的,所以你不希望它们被广播。
总体思路是 PySpark 创建与执行程序一样多的 java 个进程,然后将数据传送到每个进程。如果进程太少,java 堆 space.
会出现内存瓶颈在你的例子中,具体错误是你用sc.parallelize([...])
创建的RDD没有指定分区数(参数numSlices
,见docs)。而RDD默认的partition数量太少(可能是由单个partition构成)。
要解决这个问题,只需指定所需的分区数:
a = sc.parallelize([...], numSlices=1000) # and likewise for b
随着您指定越来越多的切片,您会看到警告消息中所述的大小减小。增加切片的数量,直到您不再收到警告消息。例如得到
Stage 0 contains a task of very large size (696 KB). The maximum recommended task size is 100 KB
意味着你需要指定更多的切片。
处理内存问题时可能有用的另一个提示(但这与警告消息无关):默认情况下,每个执行程序可用的内存为 1 GB 左右。您可以通过命令行指定更大的数量,例如 --executor-memory 64G
.