Spark - repartition() 与 coalesce()

Spark - repartition() vs coalesce()

根据 Learning Spark

Keep in mind that repartitioning your data is a fairly expensive operation. Spark also has an optimized version of repartition() called coalesce() that allows avoiding data movement, but only if you are decreasing the number of RDD partitions.

我得到的一个区别是 repartition() 的分区数可以是 increased/decreased,但是 coalesce() 的分区数只能减少。

如果分区分布在多台机器上,coalesce()是运行,如何避免数据移动?

它避免了完整 随机播放。如果已知数量正在减少,那么执行程序可以安全地将数据保留在最少数量的分区上,只将数据从额外的节点移到我们保留的节点上。

所以,它会是这样的:

Node 1 = 1,2,3
Node 2 = 4,5,6
Node 3 = 7,8,9
Node 4 = 10,11,12

然后 coalesce 减少到 2 个分区:

Node 1 = 1,2,3 + (10,11,12)
Node 3 = 7,8,9 + (4,5,6)

请注意,节点 1 和节点 3 不需要移动其原始数据。

这里还要注意一点,Spark RDD的基本原则是不变性。重新分区或合并将创建新的 RDD。基本 RDD 将继续存在,并保留其原始分区数。如果用例需要将 RDD 持久化在缓存中,则必须对新创建的 RDD 执行相同的操作。

scala> pairMrkt.repartition(10)
res16: org.apache.spark.rdd.RDD[(String, Array[String])] =MapPartitionsRDD[11] at repartition at <console>:26

scala> res16.partitions.length
res17: Int = 10

scala>  pairMrkt.partitions.length
res20: Int = 2

Justin 的回答很棒,而且这个回复更深入。

repartition 算法执行完全随机播放并使用均匀分布的数据创建新分区。让我们用 1 到 12 的数字创建一个 DataFrame。

val x = (1 to 12).toList
val numbersDf = x.toDF("number")

numbersDf 在我的机器上包含 4 个分区。

numbersDf.rdd.partitions.size // => 4

以下是数据在分区上的划分方式:

Partition 00000: 1, 2, 3
Partition 00001: 4, 5, 6
Partition 00002: 7, 8, 9
Partition 00003: 10, 11, 12

让我们使用 repartition 方法进行全洗牌,并在两个节点上获取此数据。

val numbersDfR = numbersDf.repartition(2)

以下是 numbersDfR 数据在我的机器上的分区方式:

Partition A: 1, 3, 4, 6, 7, 9, 10, 12
Partition B: 2, 5, 8, 11

repartition方法进行新分区,并将数据均匀分布在新分区中(数据集越大,数据分布越均匀)。

coalescerepartition

的区别

coalesce 使用现有分区来最大程度地减少混洗的数据量。 repartition 创建新分区并进行完全随机播放。 coalesce 导致具有不同数据量的分区(有时分区大小相差很大),repartition 导致大小大致相等的分区。

coalescerepartition 更快吗?

coalesce 可能 运行 比 repartition 快,但不等大小的分区通常比等大小的分区更慢。在筛选大型数据集后,您通常需要重新分区数据集。我发现 repartition 总体上更快,因为 Spark 是为使用相同大小的分区而构建的。

N.B。我好奇地观察到 repartition can increase the size of data on disk。当您在大型数据集上使用重新分区/合并时,确保 运行 测试。

Read this blog post 如果您想了解更多详细信息。

何时在实践中使用合并和重新分区

  • 请参阅this question了解如何使用合并和重新分区将 DataFrame 写入单个文件
  • 重新分区很关键after running filtering queries. The number of partitions does not change after filtering, so if you don't repartition, you'll have way too many memory partitions (the more the filter reduces the dataset size, the bigger the problem). Watch out for the empty partition problem
  • partitionBy用于写出磁盘分区中的数据。在使用 partitionBy 之前,您需要 use repartition / coalesce to partition your data in memory properly

所有的答案都为这个经常被问到的问题增加了一些重要的知识。

按照这个问题时间线的传统,这是我的 2 美分。

我发现 重新分区比合并 更快,在非常特殊的情况下。

在我的应用程序中,当我们估计的文件数量低于特定阈值时,重新分区工作得更快。

这就是我的意思

if(numFiles > 20)
    df.coalesce(numFiles).write.mode(SaveMode.Overwrite).parquet(dest)
else
    df.repartition(numFiles).write.mode(SaveMode.Overwrite).parquet(dest)

在上面的代码片段中,如果我的文件少于 20 个,合并将永远完成,而重新分区要快得多,所以上面的代码。

当然,这个数字(20)将取决于工人的数量和数据量。

希望对您有所帮助。

简单来说 COALESCE :- 仅用于减少分区数,不对数据进行改组,它只是压缩分区

REPARTITION:- 用于增加和减少分区数,但会发生改组

示例:-

val rdd = sc.textFile("path",7)
rdd.repartition(10)
rdd.repartition(2)

两者都很好

但是当我们需要在一个集群中查看输出时,我们通常会做这两件事,我们会这样做。

但是你也应该确保,如果你正在处理大量数据,来自合并节点的数据应该是高度配置的。因为所有的数据都会加载到那些节点上,可能会导致内存异常。 虽然修复很昂贵,但我更愿意使用它。因为它平等地洗牌和分配数据。

明智地 select 在合并和重新分区之间。

repartition - 建议在增加分区数的同时使用它,因为它涉及到所有数据的混洗。

coalesce - 建议在减少分区数的情况下使用。例如,如果你有 3 个分区并且你想将其减少到 2 个,coalesce 会将第 3 个分区数据移动到分区 1 和 2。分区 1 和 2 将保留在同一个容器中。 另一方面,repartition 会在所有分区中混洗数据,因此执行器之间的网络使用率会很高,并且会影响性​​能。

coalesce 在减少分区数量的同时比 repartition 表现更好。

我想补充 Justin 和 Power 的回答 -

repartition 将忽略现有分区并创建新分区。所以你可以用它来修复数据倾斜。您可以提及分区键来定义分布。数据倾斜是 'big data' 问题中最大的问题之一 space.

coalesce 将使用现有分区并随机播放其中的一个子集。它无法像 repartition 那样修复数据偏差。因此,即使它更便宜,也可能不是您需要的东西。

对于所有很好的答案,我想补充一点,repartition 是利用数据并行化的最佳选择之一。虽然 coalesce 提供了一个减少分区的廉价选项,并且在将数据写入 HDFS 或其他一些接收器以利用大写入时非常有用。

我发现这在以 parquet 格式写入数据以充分利用时很有用。

对于从 PySpark (AWS EMR) 生成单个 csv 文件作为输出并将其保存在 s3 上时遇到问题的人,使用重新分区很有帮助。原因是,合并不能完全洗牌,但重新分区可以。本质上,您可以使用 repartition 增加或减少分区数,但只能使用 coalesce 减少分区数(但不是 1 个)。以下代码适用于尝试将 csv 从 AWS EMR 写入 s3 的任何人:

df.repartition(1).write.format('csv')\
.option("path", "s3a://my.bucket.name/location")\
.save(header = 'true')

code 和代码文档的内容是 coalesce(n)coalesce(n, shuffle = false) 相同,repartition(n)coalesce(n, shuffle = true)

因此,coalescerepartition都可以用来增加分区数

With shuffle = true, you can actually coalesce to a larger number of partitions. This is useful if you have a small number of partitions, say 100, potentially with a few partitions being abnormally large.

另一个需要强调的重要注意事项是,如果您 大幅减少 分区数量,您应该考虑使用 shuffled 版本的 coalesce(在这种情况下与 repartition 相同)。这将允许您的计算在父分区上并行执行(多任务)。

However, if you're doing a drastic coalesce, e.g. to numPartitions = 1, this may result in your computation taking place on fewer nodes than you like (e.g. one node in the case of numPartitions = 1). To avoid this, you can pass shuffle = true. This will add a shuffle step, but means the current upstream partitions will be executed in parallel (per whatever the current partitioning is).

请参考相关回答

重新分区:将数据随机分配到新数量的分区中。

例如。初始数据帧分为 200 个分区。

df.repartition(500): 数据将从 200 个分区洗牌到新的 500 个分区。

Coalesce:将数据洗牌到现有数量的分区中。

df.coalesce(5): 数据将从剩余的 195 个分区洗牌到 5 个现有分区。

repartition 算法对数据进行全面洗牌并创建大小相等的数据分区。 coalesce 合并现有分区以避免完全随机播放。

Coalesce 非常适合采用具有大量分区的 RDD 并在单个工作节点上组合分区以生成具有较少分区的最终 RDD。

Repartition 将重新排列 RDD 中的数据,以生成您请求的最终分区数。 DataFrames 的分区似乎是一个应该由框架管理的低级实现细节,但事实并非如此。当将大型 DataFrame 过滤成较小的 DataFrame 时,您几乎应该总是对数据进行重新分区。 您可能会经常将大型 DataFrame 过滤成较小的 DataFrame,因此请习惯重新分区。

Read this blog post 如果您想了解更多详细信息。

另外一个区别是考虑到存在倾斜连接并且您必须在其之上合并的情况。在大多数情况下,重新分区将解决倾斜连接,然后您可以进行合并。

另一种情况是,假设你在一个DataFrame中保存了medium/large量的数据,你要批量生产到Kafka。在某些情况下,重新分区有助于在生成到 Kafka 之前收集列表。但是,当音量非常大时,重新分区可能会对性能造成严重影响。在这种情况下,直接从数据帧生成 Kafka 会有所帮助。

旁注:Coalesce 不会像工作人员之间的完整数据移动那样避免数据移动。它确实减少了洗牌的次数。我想这就是这本书的意思。

即使在@Rob 的回答中提到的分区数量减少的情况下,也有重新分区>>合并的用例,即将数据写入单个文件。

@Rob 的回答暗示了好的方向,但我认为需要进一步解释以了解幕后发生的事情。

如果您需要在写入前过滤数据,那么 repartitioncoalesce 更 suitable,因为coalesce 将在加载操作之前被下推。

例如: load().map(…).filter(…).coalesce(1).save()

转换为: load().coalesce(1).map(…).filter(…).save()

这意味着您的所有数据都将合并到一个分区中,并在其中进行过滤,从而失去所有并行性。 即使对于像 column='value'.

这样非常简单的过滤器,也会发生这种情况

重新分区不会发生这种情况:load().map(…).filter(…).repartition(1).save()

在这种情况下,过滤会在原始分区上并行进行。

只是给出一个数量级,在我的例子中,当从 Hive table 加载后过滤 109M 行(~105G)和 ~1000 个分区时,运行时间从 ~6h 下降到 coalesce(1 ) 到 ~2m 重新分区(1).

具体的例子取自this article from AirBnB,很好,涵盖了Spark重新分区技术的更多方面。

基本上,重新分区允许您增加或减少分区数。重新分区重新分配所有分区的数据,这会导致完全洗牌,这是非常昂贵的操作。

Coalesce 是 Repartition 的优化版本,您只能减少分区的数量。因为我们只能减少分区的数量,所以它所做的是将一些分区合并为一个分区。通过合并分区,与 Repartition 相比,跨分区的数据移动更少。所以在 Coalesce 中是最小数据移动但是说 coalesce 不做数据移动是完全错误的说法。

另一件事是通过提供分区数进行重新分区,它试图在所有分区上均匀地重新分配数据,而在 Coalesce 的情况下,在某些情况下我们仍然可能有倾斜数据。

  • Coalesce 使用现有分区来最小化数据量 被洗牌。重新分区创建新分区并执行完整操作 随机播放。

  • 在具有不同数据量的分区中合并结果 (有时分区有许多不同的大小)和 重新分区会导致大小大致相等的分区。

  • 合并我们可以减少分区,但是修复我们可以同时增加和减少分区。

合并比重新分区执行得更好。合并总是减少分区。假设如果你在 yarn 中启用动态分配,你有四个分区和执行器。如果对其应用过滤器,则一个或多个执行器可能是空的,没有数据。这个问题可以通过合并而不是重新分区来解决。