Spark 分区和分桶是否与 DataFrame 重新分区方法相似?

Is Spark partitioning and bucketing similar with DataFrame repartition method?

我知道分区和分桶是用来避免数据混洗的。
分桶也解决了在分区上创建许多目录的问题。

DataFrame的repartition方法可以在(in)内存中进行分区。

除了partitioning和bucketing是物理存储的,DataFrame的repartition方法可以partitioning an(in) memory,

分区和分桶和DataFrame的重新分区方法分区方式一样吗?

例如:

dataFrame.repartition(col("colName"))

dataFrame.write...partitionBy("colName")...

一样吗?

dataFrame.repartition(10, col("colName"))

dataFrame.write...bucketBy(10, "colName")...

一样吗?

为了理解这一点,您只需要知道 partitionBy 方法 不会触发任何随机播放 。如果任务正在处理 X 天的事件,partitionBy 方法将导致在 HDFS

中写入 X 个文件

让我们做一个简单的简单场景检查。 df 是具有列 eventTimestamp 的数据帧,我们通过使用 partitonBy:

添加三列将数据帧写回 HDFS
   df.withColumn("year", year(col("eventTimestamp")))
      .withColumn("month", month(col("eventTimestamp")))
      .withColumn("day", dayofmonth(col("eventTimestamp")))
      .repartition(col("year"), col("month"), col("day"))
      .write
      .partitionBy("year", "month", "day")
      .save(output)
  • 场景一:输入数据:事件存储在HDFS的200个块中。每个块是128M。这些活动只是过去一周的活动。

    输出:每个任务将在 HDFS 中生成 7 个文件(每天 1 个),从而导致
    作业共产生7×200=1400个文件

  • 场景二:输入数据:事件存储在HDFS的200个块中。每个 块是128M。这些事件是过去 365 天的(所以相同 数据量,但事件来自去年,而不仅仅是一年 周)

    Output 每个任务将产生 365 个文件(再次 - 每天 1 个 - 这就是 partitionBy 的工作方式)。

This leads to 365×200=73000 files. 73 thousand!

这会伤害到你!原因是:

HDFS is optimized for large files and batch processing rather than handling many tiny files Metadata for these files will take a lot of space in NameNode memory. With slightly bigger numbers you could even kill your cluster!

你会如何解决文件过多的问题?在某些情况下,这非常简单——您需要做的就是 重新分区 您的 DataSet:

The repartition call will cause Spark to shuffle the data

Shuffle 机制使用哈希来决定特定记录将转到哪个存储桶。规则是一天的数据总是属于同一个桶。因此,第 2 阶段的任务会将编号为 X 的所有桶拉到同一个地方并将它们合并在一起。这意味着全天数据将在同一个任务中处理。最终结果是你最终得到的文件数量等于数据集中的天数(记住——你使用 partitionBy 方法并将一天作为参数传递)。

详情请参考下方link:

http://tantusdata.com/spark-shuffle-case-1-partition-by-and-repartition/