使用 Spark 为每个分区创建一个 CSV

Create a single CSV per partition with Spark

我有一个大约 10GB 的数据帧,应该写成一堆 CSV 文件,每个分区一个。

CSV 应按 3 个字段进行分区:“系统”、“date_month”和“客户”。

每个文件夹内只应写入一个CSV文件,CSV文件内的数据应按另外两个字段排序:“date_day”和“date_hour”。

文件系统(S3 存储桶)应如下所示:

/system=foo/date_month=2022-04/customer=CU000001/part-00000-x.c000.csv
/system=foo/date_month=2022-04/customer=CU000002/part-00000-x.c000.csv
/system=foo/date_month=2022-04/customer=CU000003/part-00000-x.c000.csv
/system=foo/date_month=2022-04/customer=CU000004/part-00000-x.c000.csv
/system=foo/date_month=2022-05/customer=CU000001/part-00000-x.c000.csv
/system=foo/date_month=2022-05/customer=CU000002/part-00000-x.c000.csv
/system=foo/date_month=2022-05/customer=CU000003/part-00000-x.c000.csv
/system=foo/date_month=2022-05/customer=CU000004/part-00000-x.c000.csv

我知道我可以使用 coalesce(1) 轻松实现这一目标,但这只会使用一名工人,我想避免这种情况。

我试过这个策略

  mydataframe.
      repartition($"system", $"date_month", $"customer").
      sort("date_day", "date_hour").
      write.
      partitionBy("system", "date_month", "customer").
      option("header", "false").
      option("sep", "\t").
      format("csv").
      save(s"s3://bucket/spool/")

我的想法是每个工作人员都会得到一个不同的分区,这样它就可以轻松地对数据进行排序并在分区路径中写入一个文件。在 运行 代码之后,我注意到每个分区都有很多 CSV,如下所示:

/system=foo/date_month=2022-05/customer=CU000001/part-00000-df027d9e-3d57-492b-b97a-daa5e80fdc93.c000.csv                                                                                                                                                                                           
/system=foo/date_month=2022-05/customer=CU000001/part-00001-df027d9e-3d57-492b-b97a-daa5e80fdc93.c000.csv                                                                                                                                                                                           
/system=foo/date_month=2022-05/customer=CU000001/part-00002-df027d9e-3d57-492b-b97a-daa5e80fdc93.c000.csv                                                                                                                                                                                           
/system=foo/date_month=2022-05/customer=CU000001/part-00003-df027d9e-3d57-492b-b97a-daa5e80fdc93.c000.csv                                                                                                                                                                                           
/system=foo/date_month=2022-05/customer=CU000001/part-00004-df027d9e-3d57-492b-b97a-daa5e80fdc93.c000.csv                                                                                                                                                                                           
/system=foo/date_month=2022-05/customer=CU000001/part-00005-df027d9e-3d57-492b-b97a-daa5e80fdc93.c000.csv                                                                                                                                                                                           
/system=foo/date_month=2022-05/customer=CU000001/part-00006-df027d9e-3d57-492b-b97a-daa5e80fdc93.c000.csv                                                                                                                                                                                           
/system=foo/date_month=2022-05/customer=CU000001/part-00007-df027d9e-3d57-492b-b97a-daa5e80fdc93.c000.csv      
[...]                                                                                                                                                                                     

每个文件中的数据都按预期排序,所有文件的串联将创建正确的文件,但这需要太多时间,我更愿意依赖 Spark。

有没有一种方法可以为每个分区创建一个有序的 CSV 文件,而无需使用 coalesce(1) 将所有数据移动到单个工作人员?

如果重要的话,我正在使用 scala。

sort()(以及 orderBy())触发随机播放,因为它对整个数据帧进行排序,要在分区内排序,您应该使用恰当命名的 sortWithinPartitions.

  mydataframe.
      repartition($"system", $"date_month", $"customer").
      sortWithinPartitions("date_day", "date_hour").
      write.
      partitionBy("system", "date_month", "customer").
      option("header", "false").
      option("sep", "\t").
      format("csv").
      save(s"s3://bucket/spool/")