df.repartition 和 DataFrameWriter partitionBy 的区别?

Difference between df.repartition and DataFrameWriter partitionBy?

DataFrame repartition() 和 DataFrameWriter partitionBy() 方法有什么区别?

希望都习惯"partition data based on dataframe column"?或者有什么区别吗?

如果您 运行 repartition(COL) 在计算期间更改分区 - 您将得到 spark.sql.shuffle.partitions(默认值:200)个分区。如果您随后调用 .write,您将获得一个包含许多文件的目录。

如果您 运行 .write.partitionBy(COL) 那么您将获得与 COL 中唯一值一样多的目录。这会加快进一步的数据读取速度(如果您按分区列过滤)并节省一些 space 存储空间(分区列已从数据文件中删除)。

更新:请参阅@conradlee 的回答。他不仅详细解释了应用不同方法后目录结构的外观,还详细解释了两种情况下的文件数量。

注意:我认为接受的答案不太正确!很高兴你问这个问题,因为这些名称相似的函数的行为在重要的和意想不到的方面有所不同,官方 spark 文档中没有详细记录。

已接受答案的第一部分是正确的:调用 df.repartition(COL, numPartitions=k) 将使用基于散列的分区程序创建具有 k 分区的数据帧。 COL 此处定义分区键--它可以是单个列或列列表。基于散列的分区程序获取每个输入行的分区键,通过类似 partition = hash(partitionKey) % k 的方式将其散列到 space 的 k 分区中。这保证了具有相同分区键的所有行最终都在同一个分区中。但是,来自多个分区键的行也可能最终出现在同一个分区中(当分区键之间发生散列冲突时)并且一些分区可能是空的.

总而言之,df.repartition(COL, numPartitions=k) 不直观的方面是

  • 分区不会严格隔离分区键
  • 您的某些 k 分区可能是空的,而其他分区可能包含来自多个分区键的行

df.write.partitionBy 的行为完全不同,这是许多用户意想不到的。假设您希望输出文件按日期分区,并且您的数据跨越 7 天。我们还假设 df 有 10 个分区。当您 运行 df.write.partitionBy('day') 时,您应该期望有多少个输出文件?答案是'it depends'。如果 df 中起始分区的每个分区都包含每一天的数据,则答案为 70。如果 df 中每个起始分区仅包含一天的数据,则答案为 10。

我们如何解释这种行为?当你 运行 df.write 时, df 中的每个原始分区都是独立编写的。也就是你原来的10个分区,每一个都在'day'列上分别进行子分区,每个子分区写一个单独的文件。

我觉得这种行为很烦人,希望有一种方法可以在编写数据帧时进行全局重新分区。

repartition()用于对内存中的数据进行分区,partitionBy用于对磁盘中的数据进行分区。它们经常结合使用。

repartition()partitionBy都可以用来“根据dataframe列对数据进行分区”,但是repartition()对内存中的数据进行分区,partitionBy对数据进行分区在磁盘上。

重新分区()

让我们尝试一些代码以更好地理解分区。假设您有以下 CSV 数据。

first_name,last_name,country
Ernesto,Guevara,Argentina
Vladimir,Putin,Russia
Maria,Sharapova,Russia
Bruce,Lee,China
Jack,Ma,China

df.repartition(col("country")) 将在内存中按国家/地区重新分区数据。

让我们把数据写出来,这样我们就可以检查每个内存分区的内容了。

val outputPath = new java.io.File("./tmp/partitioned_by_country/").getCanonicalPath
df.repartition(col("country"))
  .write
  .csv(outputPath)

以下是数据在磁盘上的写法:

partitioned_by_country/
  part-00002-95acd280-42dc-457e-ad4f-c6c73be6226f-c000.csv
  part-00044-95acd280-42dc-457e-ad4f-c6c73be6226f-c000.csv
  part-00059-95acd280-42dc-457e-ad4f-c6c73be6226f-c000.csv

每个文件包含单个国家/地区的数据 - part-00059-95acd280-42dc-457e-ad4f-c6c73be6226f-c000.csv 文件包含此中国数据,例如:

Bruce,Lee,China
Jack,Ma,China

partitionBy()

让我们使用 partitionBy 将数据写入磁盘,看看文件系统输出有何不同。

这是将数据写出到磁盘分区的代码。

val outputPath = new java.io.File("./tmp/partitionedBy_disk/").getCanonicalPath
df
  .write
  .partitionBy("country")
  .csv(outputPath)

磁盘上的数据如下所示:

partitionedBy_disk/
  country=Argentina/
    part-00000-906f845c-ecdc-4b37-a13d-099c211527b4.c000.csv
  country=China/
    part-00000-906f845c-ecdc-4b37-a13d-099c211527b4.c000
  country=Russia/
    part-00000-906f845c-ecdc-4b37-a13d-099c211527b4.c000

为什么要在磁盘上分区数据?

在磁盘上对数据进行分区可以使某些查询 运行 更快。