Spark 分区和分桶是否与 DataFrame 重新分区方法相似?
Is Spark partitioning and bucketing similar with DataFrame repartition method?
我知道分区和分桶是用来避免数据混洗的。
分桶也解决了在分区上创建许多目录的问题。
和
DataFrame的repartition方法可以在(in)内存中进行分区。
除了partitioning和bucketing是物理存储的,DataFrame的repartition方法可以partitioning an(in) memory,
分区和分桶和DataFrame的重新分区方法分区方式一样吗?
例如:
- 是
dataFrame.repartition(col("colName"))
和
dataFrame.write...partitionBy("colName")...
一样吗?
- 是
dataFrame.repartition(10, col("colName"))
和
dataFrame.write...bucketBy(10, "colName")...
一样吗?
为了理解这一点,您只需要知道 partitionBy
方法 不会触发任何随机播放 。如果任务正在处理 X 天的事件,partitionBy
方法将导致在 HDFS
中写入 X 个文件
让我们做一个简单的简单场景检查。 df
是具有列 eventTimestamp
的数据帧,我们通过使用 partitonBy
:
添加三列将数据帧写回 HDFS
df.withColumn("year", year(col("eventTimestamp")))
.withColumn("month", month(col("eventTimestamp")))
.withColumn("day", dayofmonth(col("eventTimestamp")))
.repartition(col("year"), col("month"), col("day"))
.write
.partitionBy("year", "month", "day")
.save(output)
场景一:输入数据:事件存储在HDFS的200个块中。每个块是128M。这些活动只是过去一周的活动。
输出:每个任务将在 HDFS 中生成 7 个文件(每天 1 个),从而导致
作业共产生7×200=1400个文件
场景二:输入数据:事件存储在HDFS的200个块中。每个
块是128M。这些事件是过去 365 天的(所以相同
数据量,但事件来自去年,而不仅仅是一年
周)
Output 每个任务将产生 365 个文件(再次 - 每天 1 个 - 这就是 partitionBy 的工作方式)。
This leads to 365×200=73000 files. 73 thousand!
这会伤害到你!原因是:
HDFS is optimized for large files and batch processing rather than
handling many tiny files Metadata for these files will take a lot of
space in NameNode memory. With slightly bigger numbers you could even
kill your cluster!
你会如何解决文件过多的问题?在某些情况下,这非常简单——您需要做的就是 重新分区 您的 DataSet:
The repartition call will cause Spark to shuffle the data
Shuffle 机制使用哈希来决定特定记录将转到哪个存储桶。规则是一天的数据总是属于同一个桶。因此,第 2 阶段的任务会将编号为 X 的所有桶拉到同一个地方并将它们合并在一起。这意味着全天数据将在同一个任务中处理。最终结果是你最终得到的文件数量等于数据集中的天数(记住——你使用 partitionBy
方法并将一天作为参数传递)。
详情请参考下方link:
http://tantusdata.com/spark-shuffle-case-1-partition-by-and-repartition/
我知道分区和分桶是用来避免数据混洗的。
分桶也解决了在分区上创建许多目录的问题。
和
DataFrame的repartition方法可以在(in)内存中进行分区。
除了partitioning和bucketing是物理存储的,DataFrame的repartition方法可以partitioning an(in) memory,
分区和分桶和DataFrame的重新分区方法分区方式一样吗?
例如:
- 是
dataFrame.repartition(col("colName"))
和
dataFrame.write...partitionBy("colName")...
一样吗?
- 是
dataFrame.repartition(10, col("colName"))
和
dataFrame.write...bucketBy(10, "colName")...
一样吗?
为了理解这一点,您只需要知道 partitionBy
方法 不会触发任何随机播放 。如果任务正在处理 X 天的事件,partitionBy
方法将导致在 HDFS
让我们做一个简单的简单场景检查。 df
是具有列 eventTimestamp
的数据帧,我们通过使用 partitonBy
:
df.withColumn("year", year(col("eventTimestamp")))
.withColumn("month", month(col("eventTimestamp")))
.withColumn("day", dayofmonth(col("eventTimestamp")))
.repartition(col("year"), col("month"), col("day"))
.write
.partitionBy("year", "month", "day")
.save(output)
场景一:输入数据:事件存储在HDFS的200个块中。每个块是128M。这些活动只是过去一周的活动。
输出:每个任务将在 HDFS 中生成 7 个文件(每天 1 个),从而导致
作业共产生7×200=1400个文件场景二:输入数据:事件存储在HDFS的200个块中。每个 块是128M。这些事件是过去 365 天的(所以相同 数据量,但事件来自去年,而不仅仅是一年 周)
Output 每个任务将产生 365 个文件(再次 - 每天 1 个 - 这就是 partitionBy 的工作方式)。
This leads to 365×200=73000 files. 73 thousand!
这会伤害到你!原因是:
HDFS is optimized for large files and batch processing rather than handling many tiny files Metadata for these files will take a lot of space in NameNode memory. With slightly bigger numbers you could even kill your cluster!
你会如何解决文件过多的问题?在某些情况下,这非常简单——您需要做的就是 重新分区 您的 DataSet:
The repartition call will cause Spark to shuffle the data
Shuffle 机制使用哈希来决定特定记录将转到哪个存储桶。规则是一天的数据总是属于同一个桶。因此,第 2 阶段的任务会将编号为 X 的所有桶拉到同一个地方并将它们合并在一起。这意味着全天数据将在同一个任务中处理。最终结果是你最终得到的文件数量等于数据集中的天数(记住——你使用 partitionBy
方法并将一天作为参数传递)。
详情请参考下方link:
http://tantusdata.com/spark-shuffle-case-1-partition-by-and-repartition/