让 PySpark 为每个列值输出一个文件(repartition / partitionBy 不起作用)

Get PySpark to output one file per column value (repartition / partitionBy not working)

我看到很多答案和 blob 帖子暗示:

df.repartition('category').write().partitionBy('category')

将为每个类别输出一个文件,但如果 df 中唯一 'category' 值的数量小于默认分区的数量(通常为 200),这似乎不是真的.

当我在包含 100 个类别的文件上使用上述代码时,我最终得到 100 个文件夹,每个文件夹包含 1 到 3 个“部分”文件,而不是将具有给定“类别”值的所有行都放在同一个目录中“部分”。 的答案似乎解释了这一点。

为每个分区值准确获取一个文件的最快方法是什么?


我试过的东西

我看到很多这样的说法

df.repartition(1, 'category').write().partitionBy('category')
df.repartition(2, 'category').write().partitionBy('category')

将分别创建“每个类别恰好一个文件”和“每个类别恰好两个文件”,但这似乎不是此参数的工作原理。 documentation 清楚地表明 numPartitions 参数是要创建的 分区总数 ,而不是每列值的分区数。根据该文档,将此参数指定为 1 应该(意外地)在写入文件时为每个分区输出一个文件,但大概只是因为它消除了所有并行性并强制在单个节点上对整个 RDD 进行洗牌/重新计算。

required_partitions = df.select('category').distinct().count()
df.repartition(required_partitions, 'category').write().partitionBy('category')

以上似乎是一种基于已记录行为的解决方法,但由于多种原因,该方法的成本很高。其一,如果 df 很昂贵且未缓存(and/or 大到为此目的缓存会很浪费),则单独计数,而且数据帧的任何重新分区都可能导致多阶段中不必要的洗牌一路上有各种数据框输出的工作流。

你可以试试coalesce(n)coalesce用于减少分区数量,是重新分区的优化版本。

n = 您要输出的分区数。

“最快”的方式可能取决于实际的硬件设置和实际数据(以防出现偏差)。据我所知,我也同意 df.repartition('category').write().partitionBy('category') 无助于解决您的问题。

我们在应用程序中遇到了类似的问题,但我们没有先进行计数然后重新分区,而是将数据写入和每个分区只有一个文件的要求分离到两个不同的 Spark 作业中。第一个作业经过优化以写入数据。第二个作业只是遍历分区文件夹结构并简单地读取每个 folder/partition 的数据,将其数据合并到一个分区并将它们覆盖回去。同样,我无法确定这是否也是对您的环境最快的方法,但对我们来说它确实有效。

对该主题进行了一些研究后,Databricks 上的 Auto Optimize Writes 功能用于写入 Delta Table。在这里,他们使用了类似的方法:首先写入数据,然后 运行 一个单独的 OPTIMIZE 作业将文件聚合到一个文件中。在提到的link中你会找到这样的解释:

"After an individual write, Azure Databricks checks if files can further be compacted, and runs an OPTIMIZE job [...] to further compact files for partitions that have the most number of small files."

附带说明:确保将配置 spark.sql.files.maxRecordsPerFile 保持为 0(默认值)或负数。否则,仅此配置可能会导致多个文件的数据在“类别”列中具有相同的值。