在 Apache Spark 的“bucketBy”中,如何为每个存储桶生成 1 个文件而不是每个分区每个存储桶生成 1 个文件?

In Apache Spark's `bucketBy`, how do you generate 1 file per bucket instead of 1 file per bucket per partition?

我正在尝试在相当大的数据集上使用 Spark 的 bucketBy 功能。

dataframe.write()
    .format("parquet")
    .bucketBy(500, bucketColumn1, bucketColumn2)
    .mode(SaveMode.Overwrite)
    .option("path", "s3://my-bucket")
    .saveAsTable("my_table");

问题是我的 Spark 集群有大约 500 个 partitions/tasks/executors(不确定术语),所以我最终得到的文件如下所示:

part-00001-{UUID}_00001.c000.snappy.parquet
part-00001-{UUID}_00002.c000.snappy.parquet
...
part-00001-{UUID}_00500.c000.snappy.parquet

part-00002-{UUID}_00001.c000.snappy.parquet
part-00002-{UUID}_00002.c000.snappy.parquet
...
part-00002-{UUID}_00500.c000.snappy.parquet

part-00500-{UUID}_00001.c000.snappy.parquet
part-00500-{UUID}_00002.c000.snappy.parquet
...
part-00500-{UUID}_00500.c000.snappy.parquet

那是 500x500=250000 个分桶镶木地板文件! FileOutputCommitter 需要很长时间才能将其提交给 S3。

有没有办法像在 Hive 中那样为每个存储桶生成一个文件?或者有更好的方法来处理这个问题吗?截至目前,我似乎必须在降低集群的并行性(减少编写器数量)或降低镶木地板文件的并行性(减少桶数)之间做出选择。

谢谢

这应该可以解决。

dataframe.write()
.format("parquet")
.bucketBy(1, bucketColumn1, bucketColumn2)
.mode(SaveMode.Overwrite)
.option("path", "s3://my-bucket")
.saveAsTable("my_table");

修改BucketBy函数的输入参数为1。 您可以从 spark 的 git 存储库中查看 bucketBy 的代码 - https://github.com/apache/spark/blob/f8d59572b014e5254b0c574b26e101c2e4157bdd/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala

第一次拆分part-00001、part-00002是根据你保存分桶table时的并行任务数运行。在您的例子中,您有 500 个并行任务 运行。每个零件文件中的文件数量取决于您为 bucketBy 函数提供的输入。

要了解有关 Spark 任务、分区、执行程序的更多信息,请查看我的 Medium 文章 - https://medium.com/@tharun026

为了每个最终桶获得 1 个文件,请执行以下操作。在将数据帧写入 table 之前,使用与用于分桶的列完全相同的列对其进行重新分区,并将新分区的数量设置为等于您将在 bucketBy 中使用的桶数(或更小的数字,即桶数的除数,但我看不出有理由在这里使用较小的数字)。

在你的情况下可能看起来像这样:

dataframe.repartition(500, bucketColumn1, bucketColumn2)
    .write()
    .format("parquet")
    .bucketBy(500, bucketColumn1, bucketColumn2)
    .mode(SaveMode.Overwrite)
    .option("path", "s3://my-bucket")
    .saveAsTable("my_table");

在保存到现有 table 的情况下,您需要确保列的类型完全匹配(例如,如果您的列 X 在数据框中是 INT,但在 table 你正在按 X 将你的重新分区插入到 500 个桶中,这将不匹配按 X 被视为 BIGINT 的重新分区,你将最终得到 500 个执行程序中的每个执行程序再次写入 500 个文件)。

只是为了 100% 清楚 - 这种重新分区将在您的执行中添加另一个步骤,即收集 1 个执行程序上每个存储桶的数据(如果数据之前没有以相同的方式分区,则进行一次完整的数据重新洗牌)。我假设这正是您想要的。

另一个答案的评论中也提到,如果您的分桶键倾斜,您需要为可能出现的问题做好准备。这是真的,但是如果你在加载 table 之后做的第一件事是在你存储的相同列上 aggregate/join (这似乎是一个很有可能选择按这些列存储的人的场景)。相反,您会遇到延迟问题,并且只会在写入后尝试加载数据时看到偏斜。

在我看来,如果 Spark 提供一个设置,以便在写入分桶 table 之前始终对数据进行重新分区(尤其是在插入现有 table 时),那就太好了。