如何在 EMR 上的 spark 中加载大量的小文件

How to load huge no of small files in spark on EMR

我正在加载 50 GB 的文本文件,这些文件最初分为 190 个文本文件。 我 运行 我在这方面的火花工作,效果很好。完成这项工作花了 12 分钟。 此作业的输出再次为 50 GB,并且使用默认分区 spark 创建了大量小文件。

现在我想 运行 我的 spark 作业再次输出文件。 运行宁慢得要命。两个小时后,我不得不停止集群。

我调试后发现 spark 本身正在忙于加载文件,这就是我如何确定问题出在大量小文件上。

这很烦人,因为 spark 想要加载大文件,但又不想输出大文件。

如何处理这种情况?

我试过了

val rdd =sc.textFile(mainFileURL, 10).repartition(10)

但是我遇到了从文件名获取信息的问题;我得到这个错误:

Caused by: java.lang.ArrayIndexOutOfBoundsException: 3

val get_cus_val = sqlContext.udf.register("get_cus_val", (filePath: String) => filePath.split("\.")(3))

改变 WholeTextFiles 行得通吗?

val rdd = sc.wholeTextFiles(mainFileURL)

当我这样做时,我在下面的行中得到了错误 说

value contains is not a member of (String, String)

val header = rdd.filter(_.contains("FundamentalSeriesId")).map(line => line.split("\|\^\|")).first()

有人可以建议如何处理这个小文件问题吗?

终于 我也有 partitionBy 列,我指示 spark 将某些记录放在特定的分区中。但是有些分区的大小非常大。 50 GB.If 我进一步分区文件的数量会增加。

dfMainOutputFinalWithoutNull.write.partitionBy("DataPartition", "PartitionYear")

所以,我不太确定你是哪个版本的 Spark 运行,但你使用 sqlContextsc.wholeTextFiles,所以我猜你是 [=23] =] 一些 2.x 之前的版本。一般来说,Spark 不能很好地处理许多小文件,正如评论中所建议的那样,我也强烈建议您首先减少输出文件的数量。为了做到这一点而不它会永远,你需要在你调用.write.partitionBy之前分割你的数据帧,所以请尝试修改你这样编码:

dfMainOutputFinalWithoutNull
  .repartition("DataPartition", "PartitionYear")
  .write
  .partitionBy("DataPartition", "PartitionYear")
  ...

这应该会显着加快工作速度:)