为什么 1 行 DataFrame 上的 collect() 使用 2000 个执行器?
Why does collect() on a DataFrame with 1 row use 2000 exectors?
这是我能想到的最简单的DataFrame。我正在使用 PySpark 1.6.1。
# one row of data
rows = [ (1, 2) ]
cols = [ "a", "b" ]
df = sqlContext.createDataFrame(rows, cols)
所以数据框完全适合内存,没有对任何文件的引用,对我来说看起来很微不足道。
然而当我收集数据时,它使用了 2000 个执行程序:
df.collect()
在收集期间,使用了 2000 个执行程序:
[Stage 2:===================================================>(1985 + 15) / 2000]
然后是预期的输出:
[Row(a=1, b=2)]
为什么会这样? DataFrame 不应该完全在驱动程序的内存中吗?
您可以配置执行器的数量。在许多情况下,spark 将尽可能多地使用可用的执行程序,并且执行时间比限制为少量执行程序时要差很多。
conf = SparkConf()
conf.set('spark.dynamicAllocation.enabled','true')
conf.set('spark.dynamicAllocation.maxExecutors','32')
所以我仔细研究了代码,试图弄清楚发生了什么。看来sqlContext.createDataFrame
确实没有尝试根据数据设置合理的参数值。
为什么要 2000 个任务?
Spark 使用 2000 个任务,因为我的数据框有 2000 个分区。 (尽管分区多于行似乎是胡说八道。)
这可以通过以下方式看到:
>>> df.rdd.getNumPartitions()
2000
为什么DataFrame有2000个分区?
发生这种情况是因为 sqlContext.createDataFrame
使用默认的分区数(在我的例子中是 2000),不管数据是如何组织的或者有多少行。
代码轨迹如下
在 sql/context.py
中,sqlContext.createDataFrame
函数调用(在本例中):
rdd, schema = self._createFromLocal(data, schema)
依次调用:
return self._sc.parallelize(data), schema
并且sqlContext.parallelize
函数定义在context.py
:
numSlices = int(numSlices) if numSlices is not None else self.defaultParallelism
不检查行数,无法指定sqlContext.createDataFrame
的切片数。
如何更改 DataFrame 的分区数?
使用DataFrame.coalesce
.
>>> smdf = df.coalesce(1)
>>> smdf.rdd.getNumPartitions()
1
>>> smdf.explain()
== Physical Plan ==
Coalesce 1
+- Scan ExistingRDD[a#0L,b#1L]
>>> smdf.collect()
[Row(a=1, b=2)]
这是我能想到的最简单的DataFrame。我正在使用 PySpark 1.6.1。
# one row of data
rows = [ (1, 2) ]
cols = [ "a", "b" ]
df = sqlContext.createDataFrame(rows, cols)
所以数据框完全适合内存,没有对任何文件的引用,对我来说看起来很微不足道。
然而当我收集数据时,它使用了 2000 个执行程序:
df.collect()
在收集期间,使用了 2000 个执行程序:
[Stage 2:===================================================>(1985 + 15) / 2000]
然后是预期的输出:
[Row(a=1, b=2)]
为什么会这样? DataFrame 不应该完全在驱动程序的内存中吗?
您可以配置执行器的数量。在许多情况下,spark 将尽可能多地使用可用的执行程序,并且执行时间比限制为少量执行程序时要差很多。
conf = SparkConf()
conf.set('spark.dynamicAllocation.enabled','true')
conf.set('spark.dynamicAllocation.maxExecutors','32')
所以我仔细研究了代码,试图弄清楚发生了什么。看来sqlContext.createDataFrame
确实没有尝试根据数据设置合理的参数值。
为什么要 2000 个任务?
Spark 使用 2000 个任务,因为我的数据框有 2000 个分区。 (尽管分区多于行似乎是胡说八道。)
这可以通过以下方式看到:
>>> df.rdd.getNumPartitions()
2000
为什么DataFrame有2000个分区?
发生这种情况是因为 sqlContext.createDataFrame
使用默认的分区数(在我的例子中是 2000),不管数据是如何组织的或者有多少行。
代码轨迹如下
在 sql/context.py
中,sqlContext.createDataFrame
函数调用(在本例中):
rdd, schema = self._createFromLocal(data, schema)
依次调用:
return self._sc.parallelize(data), schema
并且sqlContext.parallelize
函数定义在context.py
:
numSlices = int(numSlices) if numSlices is not None else self.defaultParallelism
不检查行数,无法指定sqlContext.createDataFrame
的切片数。
如何更改 DataFrame 的分区数?
使用DataFrame.coalesce
.
>>> smdf = df.coalesce(1)
>>> smdf.rdd.getNumPartitions()
1
>>> smdf.explain()
== Physical Plan ==
Coalesce 1
+- Scan ExistingRDD[a#0L,b#1L]
>>> smdf.collect()
[Row(a=1, b=2)]