为什么在 Spark 中重新分区比 partitionBy 快?

Why is repartition faster than partitionBy in Spark?

我正在尝试将 Spark 用于一个非常简单的用例:给定大量文件 (90k),其中包含数百万台设备的设备时间序列数据,将给定设备的所有时间序列读取分组到一组文件(分区)。现在假设我们的目标是 100 个分区,并且给定的设备数据显示在同一个输出文件中并不重要,只是同一个分区。

鉴于这个问题,我们想出了两种方法来做到这一点 - repartition 然后 writewritepartitionBy 应用于 Writer.其中任何一个的代码都非常简单:

repartition(添加哈希列以确保与下面的 partitionBy 代码的比较是一对一的):


df = spark.read.format("xml") \
  .options(rowTag="DeviceData") \
  .load(file_path, schema=meter_data) \
  .withColumn("partition", hash(col("_DeviceName")).cast("Long") % num_partitions) \
  .repartition("partition") \
  .write.format("json") \
  .option("codec", "org.apache.hadoop.io.compress.GzipCodec") \
  .mode("overwrite") \
  .save(output_path)

partitionBy:


df = spark.read.format("xml") \
  .options(rowTag="DeviceData") \
  .load(file_path, schema=meter_data) \
  .withColumn("partition", hash(col("_DeviceName")).cast("Long") % num_partitions) \
  .write.format("json") \
  .partitionBy(“partition”) \
  .option("codec", "org.apache.hadoop.io.compress.GzipCodec") \
  .mode("overwrite") \
  .save(output_path)

在我们的测试中,repartitionpartitionBy 快 10 倍。这是为什么?

根据我的理解,repartition 会导致混乱,我的 Spark 学习告诉我要尽可能避免。另一方面,partitionBy(根据我的理解)只对每个节点进行本地排序操作——不需要洗牌。我是不是误会了什么让我认为 partitionBy 会更快?

TLDR:当您调用 partitionBy 时,Spark 会触发排序,而不是哈希重新分区。这就是为什么你的情况要慢得多。

我们可以用一个玩具示例来验证:

spark.range(1000).withColumn("partition", 'id % 100)
    .repartition('partition).write.csv("/tmp/test.csv")

不要关注灰色阶段,因为它是在之前的作业中计算过的。

然后,partitionBy:

spark.range(1000).withColumn("partition", 'id % 100)
    .write.partitionBy("partition").csv("/tmp/test2.csv")

你可以检查你可以在partitionBy之前添加repartition,排序仍然存在。那么发生了什么事?请注意,第二个 DAG 中的排序不会触发洗牌。它是一个映射分区。事实上,当你调用 partitionBy 时,spark 并没有像一开始所期望的那样打乱数据。 Spark 分别对每个分区进行排序,然后每个执行程序将其数据写入相应的分区中的单独文件中。因此,请注意,使用 partitionBy 您不是在编写 num_partitions 文件,而是在 num_partitionsnum_partitions * num_executors 文件之间。每个分区的每个执行程序都有一个文件,其中包含属于该分区的数据。

我认为@Oli 在他对主要答案的评论中已经完美地解释了这个问题。我只想加上我的 2 美分并尝试解释相同的内容。

假设您正在读取 XML 个文件 [90K 个文件],spark 将其读入 N 个分区。这是根据spark.sql.files.maxPartitionBytes文件格式压缩类型等因素的数量决定的]等

我们假设它是 10K 个分区。这发生在代码的以下部分。

df = spark.read.format("xml") \
  .options(rowTag="DeviceData") \
  .load(file_path, schema=meter_data) \

假设您正在使用 num_partitions = 100,您将添加一个名为 partition 的新列,其值为 0-99 . Spark 只是向现有数据框 [或 rdd] 添加一个新列,该数据框分为 10K 个分区。

.withColumn("partition", hash(col("_DeviceName")).cast("Long") % num_partitions) \

至此,两个代码是一样的。

现在,让我们比较一下重新分区发生了什么 v/s partitionBy

案例一:重新分区

.repartition("partition") \
.write.format("json") \

在这里,您将根据具有 100 个不同值的 “分区” 列对现有数据框进行重新分区。因此,现有数据帧将导致完全洗牌,将分区数量从 10K 减少到 100。由于涉及完全洗牌,因此此阶段的计算量很大。如果一个特定分区的大小确实很大 [倾斜分区],这也可能会失败。

但这里的优势在于,在下一阶段发生写入时,Spark 只需将 100 个文件 写入 output_path。每个文件将只有对应于 "partition"

列的一个值的数据

情况二:partitionBy

.write.format("json") \
.partitionBy("partition") \

在这里,您要求 spark 将 existing 数据帧写入按列的不同值分区的 output_path “分区”。您无处要求 spark 减少数据帧的现有分区数。

因此 spark 将在 output_path 中创建新文件夹 并在其中写入每个分区对应的数据。

output_path + "\partition=0\"
output_path + "\partition=1\"
output_path + "\partition=99\"

现在,由于您在现有数据帧上有 10K 个 spark 分区,并假设 最坏情况,其中每个 10K 分区都具有列的所有不同值 "partition",Spark将不得不写10K * 100 = 1M个文件。 即,所有 10K 分区中的一部分将写入列 "partition" 创建的所有 100 个文件夹中。这样spark将通过创建子目录将1M文件写入output_path里面。优点是我们使用这种方法跳过了完全洗牌。

现在与案例 1 中的内存计算密集型洗牌相比,这会慢得多,因为 Spark 必须创建 1M 文件并将它们写入持久存储。 那也是,最初到一个临时文件夹,然后到 output_path.

如果写入发生在像 AWS S3 或 GCP Blob 这样的对象存储上,这会慢得多

案例三:合并+partitionBy

.coalesce(num_partitions) \
.write.format("json") \
.partitionBy("partition") \

在这种情况下,您将使用 coalesce() 将 spark 分区的数量从 10K 减少到 100,并将其写入 output_path 按列分区 "partition".

因此,假设最坏情况,其中这 100 个分区中的每一个都具有列 “分区” 的所有不同值,spark将不得不写 100 * 100 = 10K 个文件。

这仍然比 案例 2 快,但比 案例 1 慢。 这是因为您正在使用 coalesce() 进行部分洗牌,但最终仍将 10K 文件 写入 output_path.

案例四:重新分区+partitionBy

.repartition("partition") \
.write.format("json") \
.partitionBy("partition") \

在这种情况下,您将使用 repartition() 将 spark 分区的数量从 10K 减少到 100 [列 "partition"] 的不同值 并将其写入 output_path 按列分区 "partition".

因此,这 100 个分区中的每一个都只有一个不同的列 "partition",spark 将不得不写 100 * 1 = 100 文件。 partitionBy()创建的每个子文件夹里面只有1个文件。

这将花费与案例 1 相同的时间,因为这两个案例都涉及完全洗牌,然后写入 100 个文件。这里唯一的区别是 100 个文件将位于 output_path.

下的子文件夹中

此设置对于通过 spark 或 hive 读取 output_path 时过滤器的谓词下推很有用。

结论:

尽管 partitionByrepartition 快,但取决于数据帧分区的数量和这些分区内的数据分布,只需使用单独使用 partitionBy 可能最终代价高昂。