如何在火花数据框中强制重新分区?

How to force repartitioning in a spark dataframe?

我有很多 spark 数据帧,我需要在这些数据帧上执行以下操作:

1) load a single spark dataframe
2) select rows from it
3) merge it with all of the previous spark dataframes

现在,上述每个操作都需要不同数量的分区。选择行需要很多分区,比如 100 个分区。合并只需要很少的分区,比如 10 个分区。

所以,我真的希望它像这样工作:

1) load a single spark dataframe
1.5) repartition into 100 partitions
2) select rows from it
2.5) repartition into 10 partitions
3) merge it with all of the previous spark dataframes

现在,我如何强制它在第 1 步和第 2 步之间以及第 2 步和第 3 步之间重新分区?

我知道当我调用 data = data.repartition(7) 时它会被延迟计算,因此它只会在实际保存时才重新分区。

所以,我一直这样做:

1) load a single spark dataframe
1.5) repartition into 100 partitions
1.75) `df.count()` *just* to force materialization
2) select rows from it
2.5) repartition into 10 partitions
2.75) `df.count()` *just* to force materialization
3) merge it with all of the previous spark dataframes

有没有更好的方法强制它在这之间重新分区?在数据帧上有比 运行 count() 更好的方法吗?

由于 spark 中数据帧的所有转换都是延迟评估的,因此您需要执行一个操作来实际执行转换。目前没有其他方法可以强制转换。

所有可用的数据帧操作都可以在 documentation 中找到(在 actions 下查看)。在您的情况下,您可以使用 first() 而不是使用 count() 强制转换,这应该便宜得多。

在步骤 2.5 中,您可以将 repartition() 替换为 coalesce(),因为这样可以避免完全随机播放。当新的分区数量比以前少时,这通常是有利的,因为它将最大限度地减少数据移动。

编辑:

回答您关于如果您不使用任何操作而只是执行以下操作会发生什么的问题:1) 重新分区,2) spark 数据帧转换,3) 重新分区。由于优化 spark 对转换执行,因此似乎并不总是遵循此顺序。我做了一个小测试程序来测试一下:

val df = spark.sparkContext.parallelize(Array((1.0,"a"),(2.0,"b"),(3.0,"c"),(1.0,"d"),(2.0,"e"),(3.0,"f"))).toDF("x", "y")
val df1 = df.repartition(10).filter($"x" =!= 1.0).repartition(5).filter($"y" =!= "b")
df1.explain(true)

此 returns 有关如何计算数据帧的信息。

== Parsed Logical Plan ==
'Filter NOT ('y = b)
+- Repartition 5, true
   +- Filter NOT (x#5 = 1.0)
      +- Repartition 10, true
         +- Project [_1#2 AS x#5, _2#3 AS y#6]
            +- LogicalRDD [_1#2, _2#3]

== Analyzed Logical Plan ==
x: double, y: string
Filter NOT (y#6 = b)
+- Repartition 5, true
   +- Filter NOT (x#5 = 1.0)
      +- Repartition 10, true
         +- Project [_1#2 AS x#5, _2#3 AS y#6]
            +- LogicalRDD [_1#2, _2#3]

== Optimized Logical Plan ==
Repartition 5, true
+- Project [_1#2 AS x#5, _2#3 AS y#6]
   +- Filter ((NOT (_1#2 = 1.0) && isnotnull(_2#3)) && NOT (_2#3 = b))
      +- LogicalRDD [_1#2, _2#3]

== Physical Plan ==
Exchange RoundRobinPartitioning(5)
+- *Project [_1#2 AS x#5, _2#3 AS y#6]
   +- *Filter ((NOT (_1#2 = 1.0) && isnotnull(_2#3)) && NOT (_2#3 = b))
      +- Scan ExistingRDD[_1#2,_2#3]

从这里可以看出,repartition(10) 这一步没有包括在内,似乎在优化过程中被删除了。