使用 Spark 为每个分区创建一个 CSV
Create a single CSV per partition with Spark
我有一个大约 10GB 的数据帧,应该写成一堆 CSV 文件,每个分区一个。
CSV 应按 3 个字段进行分区:“系统”、“date_month”和“客户”。
每个文件夹内只应写入一个CSV文件,CSV文件内的数据应按另外两个字段排序:“date_day”和“date_hour”。
文件系统(S3 存储桶)应如下所示:
/system=foo/date_month=2022-04/customer=CU000001/part-00000-x.c000.csv
/system=foo/date_month=2022-04/customer=CU000002/part-00000-x.c000.csv
/system=foo/date_month=2022-04/customer=CU000003/part-00000-x.c000.csv
/system=foo/date_month=2022-04/customer=CU000004/part-00000-x.c000.csv
/system=foo/date_month=2022-05/customer=CU000001/part-00000-x.c000.csv
/system=foo/date_month=2022-05/customer=CU000002/part-00000-x.c000.csv
/system=foo/date_month=2022-05/customer=CU000003/part-00000-x.c000.csv
/system=foo/date_month=2022-05/customer=CU000004/part-00000-x.c000.csv
我知道我可以使用 coalesce(1)
轻松实现这一目标,但这只会使用一名工人,我想避免这种情况。
我试过这个策略
mydataframe.
repartition($"system", $"date_month", $"customer").
sort("date_day", "date_hour").
write.
partitionBy("system", "date_month", "customer").
option("header", "false").
option("sep", "\t").
format("csv").
save(s"s3://bucket/spool/")
我的想法是每个工作人员都会得到一个不同的分区,这样它就可以轻松地对数据进行排序并在分区路径中写入一个文件。在 运行 代码之后,我注意到每个分区都有很多 CSV,如下所示:
/system=foo/date_month=2022-05/customer=CU000001/part-00000-df027d9e-3d57-492b-b97a-daa5e80fdc93.c000.csv
/system=foo/date_month=2022-05/customer=CU000001/part-00001-df027d9e-3d57-492b-b97a-daa5e80fdc93.c000.csv
/system=foo/date_month=2022-05/customer=CU000001/part-00002-df027d9e-3d57-492b-b97a-daa5e80fdc93.c000.csv
/system=foo/date_month=2022-05/customer=CU000001/part-00003-df027d9e-3d57-492b-b97a-daa5e80fdc93.c000.csv
/system=foo/date_month=2022-05/customer=CU000001/part-00004-df027d9e-3d57-492b-b97a-daa5e80fdc93.c000.csv
/system=foo/date_month=2022-05/customer=CU000001/part-00005-df027d9e-3d57-492b-b97a-daa5e80fdc93.c000.csv
/system=foo/date_month=2022-05/customer=CU000001/part-00006-df027d9e-3d57-492b-b97a-daa5e80fdc93.c000.csv
/system=foo/date_month=2022-05/customer=CU000001/part-00007-df027d9e-3d57-492b-b97a-daa5e80fdc93.c000.csv
[...]
每个文件中的数据都按预期排序,所有文件的串联将创建正确的文件,但这需要太多时间,我更愿意依赖 Spark。
有没有一种方法可以为每个分区创建一个有序的 CSV 文件,而无需使用 coalesce(1)
将所有数据移动到单个工作人员?
如果重要的话,我正在使用 scala。
sort()
(以及 orderBy()
)触发随机播放,因为它对整个数据帧进行排序,要在分区内排序,您应该使用恰当命名的 sortWithinPartitions
.
mydataframe.
repartition($"system", $"date_month", $"customer").
sortWithinPartitions("date_day", "date_hour").
write.
partitionBy("system", "date_month", "customer").
option("header", "false").
option("sep", "\t").
format("csv").
save(s"s3://bucket/spool/")
我有一个大约 10GB 的数据帧,应该写成一堆 CSV 文件,每个分区一个。
CSV 应按 3 个字段进行分区:“系统”、“date_month”和“客户”。
每个文件夹内只应写入一个CSV文件,CSV文件内的数据应按另外两个字段排序:“date_day”和“date_hour”。
文件系统(S3 存储桶)应如下所示:
/system=foo/date_month=2022-04/customer=CU000001/part-00000-x.c000.csv
/system=foo/date_month=2022-04/customer=CU000002/part-00000-x.c000.csv
/system=foo/date_month=2022-04/customer=CU000003/part-00000-x.c000.csv
/system=foo/date_month=2022-04/customer=CU000004/part-00000-x.c000.csv
/system=foo/date_month=2022-05/customer=CU000001/part-00000-x.c000.csv
/system=foo/date_month=2022-05/customer=CU000002/part-00000-x.c000.csv
/system=foo/date_month=2022-05/customer=CU000003/part-00000-x.c000.csv
/system=foo/date_month=2022-05/customer=CU000004/part-00000-x.c000.csv
我知道我可以使用 coalesce(1)
轻松实现这一目标,但这只会使用一名工人,我想避免这种情况。
我试过这个策略
mydataframe.
repartition($"system", $"date_month", $"customer").
sort("date_day", "date_hour").
write.
partitionBy("system", "date_month", "customer").
option("header", "false").
option("sep", "\t").
format("csv").
save(s"s3://bucket/spool/")
我的想法是每个工作人员都会得到一个不同的分区,这样它就可以轻松地对数据进行排序并在分区路径中写入一个文件。在 运行 代码之后,我注意到每个分区都有很多 CSV,如下所示:
/system=foo/date_month=2022-05/customer=CU000001/part-00000-df027d9e-3d57-492b-b97a-daa5e80fdc93.c000.csv
/system=foo/date_month=2022-05/customer=CU000001/part-00001-df027d9e-3d57-492b-b97a-daa5e80fdc93.c000.csv
/system=foo/date_month=2022-05/customer=CU000001/part-00002-df027d9e-3d57-492b-b97a-daa5e80fdc93.c000.csv
/system=foo/date_month=2022-05/customer=CU000001/part-00003-df027d9e-3d57-492b-b97a-daa5e80fdc93.c000.csv
/system=foo/date_month=2022-05/customer=CU000001/part-00004-df027d9e-3d57-492b-b97a-daa5e80fdc93.c000.csv
/system=foo/date_month=2022-05/customer=CU000001/part-00005-df027d9e-3d57-492b-b97a-daa5e80fdc93.c000.csv
/system=foo/date_month=2022-05/customer=CU000001/part-00006-df027d9e-3d57-492b-b97a-daa5e80fdc93.c000.csv
/system=foo/date_month=2022-05/customer=CU000001/part-00007-df027d9e-3d57-492b-b97a-daa5e80fdc93.c000.csv
[...]
每个文件中的数据都按预期排序,所有文件的串联将创建正确的文件,但这需要太多时间,我更愿意依赖 Spark。
有没有一种方法可以为每个分区创建一个有序的 CSV 文件,而无需使用 coalesce(1)
将所有数据移动到单个工作人员?
如果重要的话,我正在使用 scala。
sort()
(以及 orderBy()
)触发随机播放,因为它对整个数据帧进行排序,要在分区内排序,您应该使用恰当命名的 sortWithinPartitions
.
mydataframe.
repartition($"system", $"date_month", $"customer").
sortWithinPartitions("date_day", "date_hour").
write.
partitionBy("system", "date_month", "customer").
option("header", "false").
option("sep", "\t").
format("csv").
save(s"s3://bucket/spool/")