如何从spark目录中的数千个文件中加载特定数量的文件

How to loading specific no of files out of thousands of files in a directory in spark

我只需要将那些与特定模式匹配的目录中的文件加载到 运行 我的 spark 作业中。 例如,我在具有以下命名模式的目录中有 5k 个文本文件

Fundamental.FinancialLineItem.FinancialLineItem.Japan.BAL.1.2018-04-12-0542.Full
Fundamental.FinancialLineItem.FinancialLineItem.Japan.BUS.1.2018-04-12-0542.Full
Fundamental.FinancialLineItem.FinancialLineItem.SelfSourcedPrivate.SHE.1.2018-04-12-0542.Full
Fundamental.FinancialLineItem.FinancialLineItem.SelfSourcedPublic.PEN.1.2018-04-12-0542.Full

Fundamental.FinancialLineItem.Segments.Japan.1.2018-04-12-0542.Full
Fundamental.FinancialPeriod.FinancialPeriod.Japan.2018.1.2018-04-16-0348.Full
Fundamental.FinancialPeriod.Interim2Annual.Japan.1970.1.2018-04-13-0732.Full

像这样,我在一个目录中有 5k 个文本文件。 在我的 spark 作业中,如果我们用 "." 拆分文件名,我必须只加载从左到右第三个位置具有 FinancialLineItem 的文件。 所以这样只会加载前 4 个文件。

无论如何我只加载那个文件。如果我在 sc.TextFile() 中提供目录名称,那么将加载完整目录,这将占用大量 space。 目前我正在加载这样的文件

//Loading main file
val rdd = sc.textFile(mainFileURL)
//val rdd =sc.wholeTextFiles(mainFileURL).filter(x => x._1.contains("FinancialLineItem")).flatMap(_._2.split("\n"))
//val rdd =sc.wholeTextFiles(mainFileURL).filter(x => Try(x._1.split("/").last.split("\.")(2)).getOrElse("").equals("FinancialLineItem")).flatMap(_._2.split("\n"))
val header = rdd.filter(_.contains("LineItem.organizationId")).map(line => line.split("\|\^\|")).first()
val schema = StructType(header.map(cols => StructField(cols.replace(".", "_"), StringType)).toSeq)
val data = sqlContext.createDataFrame(rdd.filter(!_.contains("LineItem.organizationId")).map(line => Row.fromSeq(line.split("\|\^\|").toSeq)), schema)
val get_cus_val = sqlContext.udf.register("get_cus_val", (filePath: String) => filePath.split("\.")(3))
val df1resultFinal = data.withColumn("DataPartition", get_cus_val(input_file_name))
val dataMain = df1resultFinal.withColumn("TimeStamp", lit(null: String))

mainFileURL 是我的目录名。

您可以使用 sc.wholeTextFiles 过滤掉您的文件名。使用下面的代码以获得所需的结果。

sc.wholeTextFiles(mainFileURL).filter(x => Try(x._1.split("/").last.split("\.")(2)).getOrElse("").equals("FinancialLineItem")).flatMap(_._2.split("\n"))

使用 glob 模式定义输入。随着 Dataset

spark.read.text(
  s"${mainFileURL}/Fundamental.FinancialLineItem.FinancialLineItem*"
)

RDD:

spark.sparkContext.textFile(
  s"${mainFileURL}/Fundamental.FinancialLineItem.FinancialLineItem*"
)

这样您将只扫描文件系统,避免加载整个数据(就像 wholeTextFiles 那样)。