Spark 使用 python:如何解决阶段 x 包含非常大的任务 (xxx KB)。建议的最大任务大小为 100 KB

Spark using python: How to resolve Stage x contains a task of very large size (xxx KB). The maximum recommended task size is 100 KB

我刚刚创建了 python 个 range(1,100000) 列表。

使用 SparkContext 完成以下步骤:

a = sc.parallelize([i for i in range(1, 100000)])
b = sc.parallelize([i for i in range(1, 100000)])

c = a.zip(b)

>>> [(1, 1), (2, 2), -----]

sum  = sc.accumulator(0)

c.foreach(lambda (x, y): life.add((y-x)))

其中警告如下:

ARN TaskSetManager:第 3 阶段包含一个非常大的任务 (4644 KB)。建议的最大任务大小为 100 KB。

如何解决这个警告?有没有办法处理尺寸?还有,会不会影响大数据的时间复杂度?

Spark 在发送任务期间本地发送每个变量的副本。对于较大的此类变量,您可能需要使用 Broadcast Variables

如果你仍然面临大小问题,那么也许这个数据本身应该是一个 RDD

扩展@leo9r 评论:考虑不使用 python range,而是 sc.range https://spark.apache.org/docs/1.6.0/api/python/pyspark.html#pyspark.SparkContext.range.

因此您可以避免将庞大的列表从您的驱动程序传输到执行程序。

当然,这样的RDD通常只用于测试目的,所以你不希望它们被广播。

总体思路是 PySpark 创建与执行程序一样多的 java 个进程,然后将数据传送到每个进程。如果进程太少,java 堆 space.

会出现内存瓶颈

在你的例子中,具体错误是你用sc.parallelize([...])创建的RDD没有指定分区数(参数numSlices,见docs)。而RDD默认的partition数量太少(可能是由单个partition构成)。

要解决这个问题,只需指定所需的分区数:

a = sc.parallelize([...], numSlices=1000)   # and likewise for b

随着您指定越来越多的切片,您会看到警告消息中所述的大小减小。增加切片的数量,直到您不再收到警告消息。例如得到

Stage 0 contains a task of very large size (696 KB). The maximum recommended task size is 100 KB

意味着你需要指定更多的切片。


处理内存问题时可能有用的另一个提示(但这与警告消息无关):默认情况下,每个执行程序可用的内存为 1 GB 左右。您可以通过命令行指定更大的数量,例如 --executor-memory 64G.