Apache Spark 中的混洗与非混洗合并
Shuffled vs non-shuffled coalesce in Apache Spark
在将 RDD 写入文件之前执行以下转换有什么区别?
- 合并(1,洗牌 = 真)
- 合并(1,洗牌 = 假)
代码示例:
val input = sc.textFile(inputFile)
val filtered = input.filter(doSomeFiltering)
val mapped = filtered.map(doSomeMapping)
mapped.coalesce(1, shuffle = true).saveAsTextFile(outputFile)
vs
mapped.coalesce(1, shuffle = false).saveAsTextFile(outputFile)
它与 collect() 相比如何?我完全知道 Spark 保存方法将使用 HDFS 样式的结构存储它,但是我对 collect() 和 shuffled/non-shuffled coalesce().
的数据分区方面更感兴趣
shuffle=true 和 shuffle=false 在结果输出中不会有任何实际差异,因为它们都下降到一个分区。但是,当您将其设置为 true 时,您将进行无用的随机播放。使用 shuffle=true 时,输出在分区之间均匀分布(如果需要,您还可以增加分区的数量),但由于您的目标是 1 个分区,因此无论如何一切都将在一个分区中结束。
与collect()相比,不同之处在于所有数据都存储在单个执行器上,而不是驱动程序上。
通过查看 Spark 2.3.1 的合并文档,
https://spark.apache.org/docs/2.3.1/api/java/org/apache/spark/rdd/RDD.html#coalesce-int-boolean-scala.Option-scala.math.Ordering-
当您将分区数减少到 1 时添加 shuffle=true 看起来更方便,以避免计算发生在比您想要的更少的节点上。这将添加一个混洗步骤,但意味着当前上游分区将并行执行。
coalesce(n, shuffle = true)
也等同于 repartition(n)
可能有,这取决于 mapping 或您在父 RDD 中拥有的任何其他处理登录,相当大影响你的工作表现。
一般来说,当你的父分区中的数据分布均匀并且你没有大幅减少分区数量时,你应该避免在使用coalesce
时使用shuffle。
但是,在您的情况下,这大大减少了分区数量,并且根据 documentation
However, if you're doing a drastic coalesce, e.g. to numPartitions = 1,
this may result in your computation taking place on fewer nodes than
you like (e.g. one node in the case of numPartitions = 1). To avoid this,
you can pass shuffle = true. This will add a shuffle step, but means the
current upstream partitions will be executed in parallel (per whatever
the current partitioning is)
鉴于此,现在您需要正确评估并在
之间做出选择
- 洗牌 潜在的大量数据但是 在父分区中进行计算并行
- 将所有分区收集到一个而不完全重新洗牌(当然仍然会有数据移动)但是在单个任务中进行计算
例如,考虑以下片段,它们与您可能拥有的实际逻辑相去甚远,但可以让您了解正在发生的事情
// fast
sc.parallelize(0 to 1000000, 10)
.mapPartitions(it => {Thread.sleep(5000); it.map(_.toString)})
.coalesce(1, shuffle = true)
.toDF.write.text("shuffleTrue")
// slow
sc.parallelize(0 to 1000000, 10)
.mapPartitions(it => {Thread.sleep(5000); it.map(_.toString)})
.coalesce(1, shuffle = false)
.toDF.write.text("shuffleFalse")
在我的集群上,shuffle = true
显示总时间约为 5 秒,有 10 个任务,在每个父分区上并行执行计算逻辑。
另一个 shuffle = false
在一个执行器上的单个任务 中完成了大约 50 秒的所有计算。
在将 RDD 写入文件之前执行以下转换有什么区别?
- 合并(1,洗牌 = 真)
- 合并(1,洗牌 = 假)
代码示例:
val input = sc.textFile(inputFile)
val filtered = input.filter(doSomeFiltering)
val mapped = filtered.map(doSomeMapping)
mapped.coalesce(1, shuffle = true).saveAsTextFile(outputFile)
vs
mapped.coalesce(1, shuffle = false).saveAsTextFile(outputFile)
它与 collect() 相比如何?我完全知道 Spark 保存方法将使用 HDFS 样式的结构存储它,但是我对 collect() 和 shuffled/non-shuffled coalesce().
的数据分区方面更感兴趣shuffle=true 和 shuffle=false 在结果输出中不会有任何实际差异,因为它们都下降到一个分区。但是,当您将其设置为 true 时,您将进行无用的随机播放。使用 shuffle=true 时,输出在分区之间均匀分布(如果需要,您还可以增加分区的数量),但由于您的目标是 1 个分区,因此无论如何一切都将在一个分区中结束。
与collect()相比,不同之处在于所有数据都存储在单个执行器上,而不是驱动程序上。
通过查看 Spark 2.3.1 的合并文档, https://spark.apache.org/docs/2.3.1/api/java/org/apache/spark/rdd/RDD.html#coalesce-int-boolean-scala.Option-scala.math.Ordering-
当您将分区数减少到 1 时添加 shuffle=true 看起来更方便,以避免计算发生在比您想要的更少的节点上。这将添加一个混洗步骤,但意味着当前上游分区将并行执行。
coalesce(n, shuffle = true)
也等同于 repartition(n)
可能有,这取决于 mapping 或您在父 RDD 中拥有的任何其他处理登录,相当大影响你的工作表现。
一般来说,当你的父分区中的数据分布均匀并且你没有大幅减少分区数量时,你应该避免在使用coalesce
时使用shuffle。
但是,在您的情况下,这大大减少了分区数量,并且根据 documentation
However, if you're doing a drastic coalesce, e.g. to numPartitions = 1, this may result in your computation taking place on fewer nodes than you like (e.g. one node in the case of numPartitions = 1). To avoid this, you can pass shuffle = true. This will add a shuffle step, but means the current upstream partitions will be executed in parallel (per whatever the current partitioning is)
鉴于此,现在您需要正确评估并在
之间做出选择- 洗牌 潜在的大量数据但是 在父分区中进行计算并行
- 将所有分区收集到一个而不完全重新洗牌(当然仍然会有数据移动)但是在单个任务中进行计算
例如,考虑以下片段,它们与您可能拥有的实际逻辑相去甚远,但可以让您了解正在发生的事情
// fast
sc.parallelize(0 to 1000000, 10)
.mapPartitions(it => {Thread.sleep(5000); it.map(_.toString)})
.coalesce(1, shuffle = true)
.toDF.write.text("shuffleTrue")
// slow
sc.parallelize(0 to 1000000, 10)
.mapPartitions(it => {Thread.sleep(5000); it.map(_.toString)})
.coalesce(1, shuffle = false)
.toDF.write.text("shuffleFalse")
在我的集群上,shuffle = true
显示总时间约为 5 秒,有 10 个任务,在每个父分区上并行执行计算逻辑。
另一个 shuffle = false
在一个执行器上的单个任务 中完成了大约 50 秒的所有计算。