限制大 RDD

Towards limiting the big RDD

我正在阅读许多图像,我想处理其中的一小部分以进行开发。因此,我试图了解 and 如何实现这一目标:

In [1]: d = sqlContext.read.parquet('foo')
In [2]: d.map(lambda x: x.photo_id).first()
Out[2]: u'28605'

In [3]: d.limit(1).map(lambda x: x.photo_id)
Out[3]: PythonRDD[31] at RDD at PythonRDD.scala:43

In [4]: d.limit(1).map(lambda x: x.photo_id).first()
// still running...

..所以发生了什么事?我希望 limit() 到 运行 比 [2] 快得多,但事实并非如此*.

下面说说我的理解,还请大家指正,显然我漏掉了什么:

  1. d 是成对的 RDD(我从模式中知道),我是说 使用地图功能:

    i) 拿走每一对(将被命名为 x 并把 photo_id 属性还给我)。

    ii) 这将产生一个新的(匿名的)RDD,我们在其中应用 first() 方法,我不确定它是如何工作的$ ,但应该给我那个匿名 RDD 的第一个元素。

  2. [3]中,我们将d RDD限制为1,这意味着尽管d有 许多元素,仅使用 1 并将 map 函数应用于该元素 仅元素。 Out [3]应该是映射创建的RDD。

  3. [4]中,我希望遵循[3]的逻辑,只打印有限RDD的唯一元素...

正如预期的那样,在查看监视器后,[4] 似乎处理了 整个数据集 ,而其他人则没有,所以我似乎没有使用 limit() 正确,或者那不是我要找的:


编辑:

tiny_d = d.limit(1).map(lambda x: x.photo_id)
tiny_d.map(lambda x: x.photo_id).first()

第一个会给出一个PipelinedRDD,正如所描述的here,它实际上不会做任何动作,只是一个转换。

然而,第二行也将处理整个数据集(事实上,现在的任务数量和以前一样多,加一个!)。


*[2] 立即执行,而 [4] 仍在 运行ning 且 >3h 已过去..

$由于名称原因,我在文档中找不到它。

根据您的代码,这里有一个更简单的 Spark 2.0 测试用例

case class my (x: Int)
val rdd = sc.parallelize(0.until(10000), 1000).map { x => my(x) }
val df1 = spark.createDataFrame(rdd)
val df2 = df1.limit(1)
df1.map { r => r.getAs[Int](0) }.first
df2.map { r => r.getAs[Int](0) }.first // Much slower than the previous line

其实Dataset.first等同于Dataset.limit(1).collect,所以查看两种情况的物理计划:

scala> df1.map { r => r.getAs[Int](0) }.limit(1).explain
== Physical Plan ==
CollectLimit 1
+- *SerializeFromObject [input[0, int, true] AS value#124]
   +- *MapElements <function1>, obj#123: int
      +- *DeserializeToObject createexternalrow(x#74, StructField(x,IntegerType,false)), obj#122: org.apache.spark.sql.Row
         +- Scan ExistingRDD[x#74]

scala> df2.map { r => r.getAs[Int](0) }.limit(1).explain
== Physical Plan ==
CollectLimit 1
+- *SerializeFromObject [input[0, int, true] AS value#131]
   +- *MapElements <function1>, obj#130: int
      +- *DeserializeToObject createexternalrow(x#74, StructField(x,IntegerType,false)), obj#129: org.apache.spark.sql.Row
         +- *GlobalLimit 1
            +- Exchange SinglePartition
               +- *LocalLimit 1
                  +- Scan ExistingRDD[x#74]

第一种情况,与CollectLimitExec物理算子的优化有关。也就是说,它将首先获取第一个分区以获得限制行数,在这种情况下为 1,如果不满足,则获取更多分区,直到达到所需的限制。所以一般情况下,如果第一个分区不为空,则只会计算并取回第一个分区。其他分区更不会被计算

然而,在第二种情况下,CollectLimitExec中的优化并没有帮助,因为之前的限制操作涉及到shuffle操作。将计算所有分区,并对每个分区执行 运行 LocalLimit(1) 以获取 1 行,然后将所有分区打乱为一个分区。 CollectLimitExec 将从结果的单个分区中获取 1 行。