为什么在 Spark 中重新分区比 partitionBy 快?
Why is repartition faster than partitionBy in Spark?
我正在尝试将 Spark 用于一个非常简单的用例:给定大量文件 (90k),其中包含数百万台设备的设备时间序列数据,将给定设备的所有时间序列读取分组到一组文件(分区)。现在假设我们的目标是 100 个分区,并且给定的设备数据显示在同一个输出文件中并不重要,只是同一个分区。
鉴于这个问题,我们想出了两种方法来做到这一点 - repartition
然后 write
或 write
将 partitionBy
应用于 Writer
.其中任何一个的代码都非常简单:
repartition
(添加哈希列以确保与下面的 partitionBy
代码的比较是一对一的):
df = spark.read.format("xml") \
.options(rowTag="DeviceData") \
.load(file_path, schema=meter_data) \
.withColumn("partition", hash(col("_DeviceName")).cast("Long") % num_partitions) \
.repartition("partition") \
.write.format("json") \
.option("codec", "org.apache.hadoop.io.compress.GzipCodec") \
.mode("overwrite") \
.save(output_path)
partitionBy
:
df = spark.read.format("xml") \
.options(rowTag="DeviceData") \
.load(file_path, schema=meter_data) \
.withColumn("partition", hash(col("_DeviceName")).cast("Long") % num_partitions) \
.write.format("json") \
.partitionBy(“partition”) \
.option("codec", "org.apache.hadoop.io.compress.GzipCodec") \
.mode("overwrite") \
.save(output_path)
在我们的测试中,repartition
比 partitionBy
快 10 倍。这是为什么?
根据我的理解,repartition
会导致混乱,我的 Spark 学习告诉我要尽可能避免。另一方面,partitionBy
(根据我的理解)只对每个节点进行本地排序操作——不需要洗牌。我是不是误会了什么让我认为 partitionBy
会更快?
TLDR:当您调用 partitionBy
时,Spark 会触发排序,而不是哈希重新分区。这就是为什么你的情况要慢得多。
我们可以用一个玩具示例来验证:
spark.range(1000).withColumn("partition", 'id % 100)
.repartition('partition).write.csv("/tmp/test.csv")
不要关注灰色阶段,因为它是在之前的作业中计算过的。
然后,partitionBy
:
spark.range(1000).withColumn("partition", 'id % 100)
.write.partitionBy("partition").csv("/tmp/test2.csv")
你可以检查你可以在partitionBy
之前添加repartition
,排序仍然存在。那么发生了什么事?请注意,第二个 DAG 中的排序不会触发洗牌。它是一个映射分区。事实上,当你调用 partitionBy
时,spark 并没有像一开始所期望的那样打乱数据。 Spark 分别对每个分区进行排序,然后每个执行程序将其数据写入相应的分区中的单独文件中。因此,请注意,使用 partitionBy
您不是在编写 num_partitions
文件,而是在 num_partitions
和 num_partitions * num_executors
文件之间。每个分区的每个执行程序都有一个文件,其中包含属于该分区的数据。
我认为@Oli 在他对主要答案的评论中已经完美地解释了这个问题。我只想加上我的 2 美分并尝试解释相同的内容。
假设您正在读取 XML 个文件 [90K 个文件],spark 将其读入 N 个分区。这是根据spark.sql.files.maxPartitionBytes、文件格式、压缩类型等因素的数量决定的]等
我们假设它是 10K 个分区。这发生在代码的以下部分。
df = spark.read.format("xml") \
.options(rowTag="DeviceData") \
.load(file_path, schema=meter_data) \
假设您正在使用 num_partitions = 100,您将添加一个名为 partition 的新列,其值为 0-99 . Spark 只是向现有数据框 [或 rdd] 添加一个新列,该数据框分为 10K 个分区。
.withColumn("partition", hash(col("_DeviceName")).cast("Long") % num_partitions) \
至此,两个代码是一样的。
现在,让我们比较一下重新分区发生了什么 v/s partitionBy
案例一:重新分区
.repartition("partition") \
.write.format("json") \
在这里,您将根据具有 100 个不同值的 “分区” 列对现有数据框进行重新分区。因此,现有数据帧将导致完全洗牌,将分区数量从 10K 减少到 100。由于涉及完全洗牌,因此此阶段的计算量很大。如果一个特定分区的大小确实很大 [倾斜分区],这也可能会失败。
但这里的优势在于,在下一阶段发生写入时,Spark 只需将 100 个文件 写入 output_path。每个文件将只有对应于 "partition"
列的一个值的数据
情况二:partitionBy
.write.format("json") \
.partitionBy("partition") \
在这里,您要求 spark 将 existing 数据帧写入按列的不同值分区的 output_path “分区”。您无处要求 spark 减少数据帧的现有分区数。
因此 spark 将在 output_path 中创建新文件夹
并在其中写入每个分区对应的数据。
output_path + "\partition=0\"
output_path + "\partition=1\"
output_path + "\partition=99\"
现在,由于您在现有数据帧上有 10K 个 spark 分区,并假设 最坏情况,其中每个 10K 分区都具有列的所有不同值 "partition",Spark将不得不写10K * 100 = 1M个文件。
即,所有 10K 分区中的一部分将写入列 "partition" 创建的所有 100 个文件夹中。这样spark将通过创建子目录将1M文件写入output_path里面。优点是我们使用这种方法跳过了完全洗牌。
现在与案例 1 中的内存计算密集型洗牌相比,这会慢得多,因为 Spark 必须创建 1M 文件并将它们写入持久存储。
那也是,最初到一个临时文件夹,然后到 output_path.
如果写入发生在像 AWS S3 或 GCP Blob 这样的对象存储上,这会慢得多
案例三:合并+partitionBy
.coalesce(num_partitions) \
.write.format("json") \
.partitionBy("partition") \
在这种情况下,您将使用 coalesce() 将 spark 分区的数量从 10K 减少到 100,并将其写入 output_path 按列分区 "partition".
因此,假设最坏情况,其中这 100 个分区中的每一个都具有列 “分区” 的所有不同值,spark将不得不写 100 * 100 = 10K 个文件。
这仍然比 案例 2 快,但比 案例 1 慢。
这是因为您正在使用 coalesce() 进行部分洗牌,但最终仍将 10K 文件 写入 output_path.
案例四:重新分区+partitionBy
.repartition("partition") \
.write.format("json") \
.partitionBy("partition") \
在这种情况下,您将使用 repartition() 将 spark 分区的数量从 10K 减少到 100 [列 "partition"] 的不同值 并将其写入 output_path 按列分区 "partition".
因此,这 100 个分区中的每一个都只有一个不同的列 "partition",spark 将不得不写 100 * 1 = 100 文件。 partitionBy()创建的每个子文件夹里面只有1个文件。
这将花费与案例 1 相同的时间,因为这两个案例都涉及完全洗牌,然后写入 100 个文件。这里唯一的区别是 100 个文件将位于 output_path.
下的子文件夹中
此设置对于通过 spark 或 hive 读取 output_path 时过滤器的谓词下推很有用。
结论:
尽管 partitionBy 比 repartition 快,但取决于数据帧分区的数量和这些分区内的数据分布,只需使用单独使用 partitionBy 可能最终代价高昂。
我正在尝试将 Spark 用于一个非常简单的用例:给定大量文件 (90k),其中包含数百万台设备的设备时间序列数据,将给定设备的所有时间序列读取分组到一组文件(分区)。现在假设我们的目标是 100 个分区,并且给定的设备数据显示在同一个输出文件中并不重要,只是同一个分区。
鉴于这个问题,我们想出了两种方法来做到这一点 - repartition
然后 write
或 write
将 partitionBy
应用于 Writer
.其中任何一个的代码都非常简单:
repartition
(添加哈希列以确保与下面的 partitionBy
代码的比较是一对一的):
df = spark.read.format("xml") \
.options(rowTag="DeviceData") \
.load(file_path, schema=meter_data) \
.withColumn("partition", hash(col("_DeviceName")).cast("Long") % num_partitions) \
.repartition("partition") \
.write.format("json") \
.option("codec", "org.apache.hadoop.io.compress.GzipCodec") \
.mode("overwrite") \
.save(output_path)
partitionBy
:
df = spark.read.format("xml") \
.options(rowTag="DeviceData") \
.load(file_path, schema=meter_data) \
.withColumn("partition", hash(col("_DeviceName")).cast("Long") % num_partitions) \
.write.format("json") \
.partitionBy(“partition”) \
.option("codec", "org.apache.hadoop.io.compress.GzipCodec") \
.mode("overwrite") \
.save(output_path)
在我们的测试中,repartition
比 partitionBy
快 10 倍。这是为什么?
根据我的理解,repartition
会导致混乱,我的 Spark 学习告诉我要尽可能避免。另一方面,partitionBy
(根据我的理解)只对每个节点进行本地排序操作——不需要洗牌。我是不是误会了什么让我认为 partitionBy
会更快?
TLDR:当您调用 partitionBy
时,Spark 会触发排序,而不是哈希重新分区。这就是为什么你的情况要慢得多。
我们可以用一个玩具示例来验证:
spark.range(1000).withColumn("partition", 'id % 100)
.repartition('partition).write.csv("/tmp/test.csv")
不要关注灰色阶段,因为它是在之前的作业中计算过的。
然后,partitionBy
:
spark.range(1000).withColumn("partition", 'id % 100)
.write.partitionBy("partition").csv("/tmp/test2.csv")
你可以检查你可以在partitionBy
之前添加repartition
,排序仍然存在。那么发生了什么事?请注意,第二个 DAG 中的排序不会触发洗牌。它是一个映射分区。事实上,当你调用 partitionBy
时,spark 并没有像一开始所期望的那样打乱数据。 Spark 分别对每个分区进行排序,然后每个执行程序将其数据写入相应的分区中的单独文件中。因此,请注意,使用 partitionBy
您不是在编写 num_partitions
文件,而是在 num_partitions
和 num_partitions * num_executors
文件之间。每个分区的每个执行程序都有一个文件,其中包含属于该分区的数据。
我认为@Oli 在他对主要答案的评论中已经完美地解释了这个问题。我只想加上我的 2 美分并尝试解释相同的内容。
假设您正在读取 XML 个文件 [90K 个文件],spark 将其读入 N 个分区。这是根据spark.sql.files.maxPartitionBytes、文件格式、压缩类型等因素的数量决定的]等
我们假设它是 10K 个分区。这发生在代码的以下部分。
df = spark.read.format("xml") \
.options(rowTag="DeviceData") \
.load(file_path, schema=meter_data) \
假设您正在使用 num_partitions = 100,您将添加一个名为 partition 的新列,其值为 0-99 . Spark 只是向现有数据框 [或 rdd] 添加一个新列,该数据框分为 10K 个分区。
.withColumn("partition", hash(col("_DeviceName")).cast("Long") % num_partitions) \
至此,两个代码是一样的。
现在,让我们比较一下重新分区发生了什么 v/s partitionBy
案例一:重新分区
.repartition("partition") \
.write.format("json") \
在这里,您将根据具有 100 个不同值的 “分区” 列对现有数据框进行重新分区。因此,现有数据帧将导致完全洗牌,将分区数量从 10K 减少到 100。由于涉及完全洗牌,因此此阶段的计算量很大。如果一个特定分区的大小确实很大 [倾斜分区],这也可能会失败。
但这里的优势在于,在下一阶段发生写入时,Spark 只需将 100 个文件 写入 output_path。每个文件将只有对应于 "partition"
列的一个值的数据情况二:partitionBy
.write.format("json") \
.partitionBy("partition") \
在这里,您要求 spark 将 existing 数据帧写入按列的不同值分区的 output_path “分区”。您无处要求 spark 减少数据帧的现有分区数。
因此 spark 将在 output_path 中创建新文件夹 并在其中写入每个分区对应的数据。
output_path + "\partition=0\"
output_path + "\partition=1\"
output_path + "\partition=99\"
现在,由于您在现有数据帧上有 10K 个 spark 分区,并假设 最坏情况,其中每个 10K 分区都具有列的所有不同值 "partition",Spark将不得不写10K * 100 = 1M个文件。 即,所有 10K 分区中的一部分将写入列 "partition" 创建的所有 100 个文件夹中。这样spark将通过创建子目录将1M文件写入output_path里面。优点是我们使用这种方法跳过了完全洗牌。
现在与案例 1 中的内存计算密集型洗牌相比,这会慢得多,因为 Spark 必须创建 1M 文件并将它们写入持久存储。 那也是,最初到一个临时文件夹,然后到 output_path.
如果写入发生在像 AWS S3 或 GCP Blob 这样的对象存储上,这会慢得多
案例三:合并+partitionBy
.coalesce(num_partitions) \
.write.format("json") \
.partitionBy("partition") \
在这种情况下,您将使用 coalesce() 将 spark 分区的数量从 10K 减少到 100,并将其写入 output_path 按列分区 "partition".
因此,假设最坏情况,其中这 100 个分区中的每一个都具有列 “分区” 的所有不同值,spark将不得不写 100 * 100 = 10K 个文件。
这仍然比 案例 2 快,但比 案例 1 慢。 这是因为您正在使用 coalesce() 进行部分洗牌,但最终仍将 10K 文件 写入 output_path.
案例四:重新分区+partitionBy
.repartition("partition") \
.write.format("json") \
.partitionBy("partition") \
在这种情况下,您将使用 repartition() 将 spark 分区的数量从 10K 减少到 100 [列 "partition"] 的不同值 并将其写入 output_path 按列分区 "partition".
因此,这 100 个分区中的每一个都只有一个不同的列 "partition",spark 将不得不写 100 * 1 = 100 文件。 partitionBy()创建的每个子文件夹里面只有1个文件。
这将花费与案例 1 相同的时间,因为这两个案例都涉及完全洗牌,然后写入 100 个文件。这里唯一的区别是 100 个文件将位于 output_path.
下的子文件夹中此设置对于通过 spark 或 hive 读取 output_path 时过滤器的谓词下推很有用。
结论:
尽管 partitionBy 比 repartition 快,但取决于数据帧分区的数量和这些分区内的数据分布,只需使用单独使用 partitionBy 可能最终代价高昂。