将多个小文件合并到 Spark 中的几个大文件中

merge multiple small files in to few larger files in Spark

我通过 Spark 使用配置单元。我的 spark 代码中有一个 Insert into partitioned table 查询。输入数据为 200+gb。当 Spark 写入分区 table 时,它会生成非常小的文件(kb 的文件)。所以现在输出分区 table 文件夹有 5000 多个小 kb 文件。我想将这些合并到几个大 MB 文件中,可能是几个 200mb 文件。我厌倦了使用配置单元合并设置,但它们似乎不起作用。

'val result7A = hiveContext.sql("set hive.exec.dynamic.partition=true")

 val result7B = hiveContext.sql("set hive.exec.dynamic.partition.mode=nonstrict")

val result7C = hiveContext.sql("SET hive.merge.size.per.task=256000000")

val result7D = hiveContext.sql("SET hive.merge.mapfiles=true")

val result7E = hiveContext.sql("SET hive.merge.mapredfiles=true")

val result7F = hiveContext.sql("SET hive.merge.sparkfiles = true")

val result7G = hiveContext.sql("set hive.aux.jars.path=c:\Applications\json-serde-1.1.9.3-SNAPSHOT-jar-with-dependencies.jar")

val result8 = hiveContext.sql("INSERT INTO TABLE partition_table PARTITION (date) select a,b,c from partition_json_table")'

以上配置单元设置在 mapreduce 配置单元执行中起作用,并吐出指定大小的文件。是否有任何选项可以执行此 Spark 或 Scala?

您可能想尝试使用 DataFrame.coalesce 方法;它 returns 具有指定分区数的 DataFrame(每个分区在插入时成为一个文件)。因此,使用您要插入的记录数和每条记录的典型大小,如果您想要约 200MB 的文件,您可以估计要合并到多少个分区。

我遇到了同样的问题。解决方案是为分区列添加 DISTRIBUTE BY 子句。这确保一个分区的数据进入单个减速器。您的示例:

INSERT INTO TABLE partition_table PARTITION (date) select a,b,c from partition_json_table DISTRIBUTE BY date

dataframe repartition(1) 方法适用于这种情况。