如何在 EMR 上的 spark 中加载大量的小文件
How to load huge no of small files in spark on EMR
我正在加载 50 GB 的文本文件,这些文件最初分为 190 个文本文件。
我 运行 我在这方面的火花工作,效果很好。完成这项工作花了 12 分钟。
此作业的输出再次为 50 GB,并且使用默认分区 spark 创建了大量小文件。
现在我想 运行 我的 spark 作业再次输出文件。 运行宁慢得要命。两个小时后,我不得不停止集群。
我调试后发现 spark 本身正在忙于加载文件,这就是我如何确定问题出在大量小文件上。
这很烦人,因为 spark 想要加载大文件,但又不想输出大文件。
如何处理这种情况?
我试过了
val rdd =sc.textFile(mainFileURL, 10).repartition(10)
但是我遇到了从文件名获取信息的问题;我得到这个错误:
Caused by: java.lang.ArrayIndexOutOfBoundsException: 3
val get_cus_val = sqlContext.udf.register("get_cus_val", (filePath: String) => filePath.split("\.")(3))
改变 WholeTextFiles
行得通吗?
val rdd = sc.wholeTextFiles(mainFileURL)
当我这样做时,我在下面的行中得到了错误
说
value contains is not a member of (String, String)
val header = rdd.filter(_.contains("FundamentalSeriesId")).map(line => line.split("\|\^\|")).first()
有人可以建议如何处理这个小文件问题吗?
终于
我也有 partitionBy 列,我指示 spark 将某些记录放在特定的分区中。但是有些分区的大小非常大。 50 GB.If 我进一步分区文件的数量会增加。
dfMainOutputFinalWithoutNull.write.partitionBy("DataPartition", "PartitionYear")
所以,我不太确定你是哪个版本的 Spark 运行,但你使用 sqlContext
和 sc.wholeTextFiles
,所以我猜你是 [=23] =] 一些 2.x 之前的版本。一般来说,Spark 不能很好地处理许多小文件,正如评论中所建议的那样,我也强烈建议您首先减少输出文件的数量。为了做到这一点而不它会永远,你需要在你调用.write.partitionBy
之前分割你的数据帧,所以请尝试修改你这样编码:
dfMainOutputFinalWithoutNull
.repartition("DataPartition", "PartitionYear")
.write
.partitionBy("DataPartition", "PartitionYear")
...
这应该会显着加快工作速度:)
我正在加载 50 GB 的文本文件,这些文件最初分为 190 个文本文件。 我 运行 我在这方面的火花工作,效果很好。完成这项工作花了 12 分钟。 此作业的输出再次为 50 GB,并且使用默认分区 spark 创建了大量小文件。
现在我想 运行 我的 spark 作业再次输出文件。 运行宁慢得要命。两个小时后,我不得不停止集群。
我调试后发现 spark 本身正在忙于加载文件,这就是我如何确定问题出在大量小文件上。
这很烦人,因为 spark 想要加载大文件,但又不想输出大文件。
如何处理这种情况?
我试过了
val rdd =sc.textFile(mainFileURL, 10).repartition(10)
但是我遇到了从文件名获取信息的问题;我得到这个错误:
Caused by: java.lang.ArrayIndexOutOfBoundsException: 3
val get_cus_val = sqlContext.udf.register("get_cus_val", (filePath: String) => filePath.split("\.")(3))
改变 WholeTextFiles
行得通吗?
val rdd = sc.wholeTextFiles(mainFileURL)
当我这样做时,我在下面的行中得到了错误 说
value contains is not a member of (String, String)
val header = rdd.filter(_.contains("FundamentalSeriesId")).map(line => line.split("\|\^\|")).first()
有人可以建议如何处理这个小文件问题吗?
终于 我也有 partitionBy 列,我指示 spark 将某些记录放在特定的分区中。但是有些分区的大小非常大。 50 GB.If 我进一步分区文件的数量会增加。
dfMainOutputFinalWithoutNull.write.partitionBy("DataPartition", "PartitionYear")
所以,我不太确定你是哪个版本的 Spark 运行,但你使用 sqlContext
和 sc.wholeTextFiles
,所以我猜你是 [=23] =] 一些 2.x 之前的版本。一般来说,Spark 不能很好地处理许多小文件,正如评论中所建议的那样,我也强烈建议您首先减少输出文件的数量。为了做到这一点而不它会永远,你需要在你调用.write.partitionBy
之前分割你的数据帧,所以请尝试修改你这样编码:
dfMainOutputFinalWithoutNull
.repartition("DataPartition", "PartitionYear")
.write
.partitionBy("DataPartition", "PartitionYear")
...
这应该会显着加快工作速度:)