如何从spark目录中的数千个文件中加载特定数量的文件
How to loading specific no of files out of thousands of files in a directory in spark
我只需要将那些与特定模式匹配的目录中的文件加载到 运行 我的 spark 作业中。
例如,我在具有以下命名模式的目录中有 5k 个文本文件
Fundamental.FinancialLineItem.FinancialLineItem.Japan.BAL.1.2018-04-12-0542.Full
Fundamental.FinancialLineItem.FinancialLineItem.Japan.BUS.1.2018-04-12-0542.Full
Fundamental.FinancialLineItem.FinancialLineItem.SelfSourcedPrivate.SHE.1.2018-04-12-0542.Full
Fundamental.FinancialLineItem.FinancialLineItem.SelfSourcedPublic.PEN.1.2018-04-12-0542.Full
Fundamental.FinancialLineItem.Segments.Japan.1.2018-04-12-0542.Full
Fundamental.FinancialPeriod.FinancialPeriod.Japan.2018.1.2018-04-16-0348.Full
Fundamental.FinancialPeriod.Interim2Annual.Japan.1970.1.2018-04-13-0732.Full
像这样,我在一个目录中有 5k 个文本文件。
在我的 spark 作业中,如果我们用 "."
拆分文件名,我必须只加载从左到右第三个位置具有 FinancialLineItem
的文件。
所以这样只会加载前 4 个文件。
无论如何我只加载那个文件。如果我在 sc.TextFile()
中提供目录名称,那么将加载完整目录,这将占用大量 space。
目前我正在加载这样的文件
//Loading main file
val rdd = sc.textFile(mainFileURL)
//val rdd =sc.wholeTextFiles(mainFileURL).filter(x => x._1.contains("FinancialLineItem")).flatMap(_._2.split("\n"))
//val rdd =sc.wholeTextFiles(mainFileURL).filter(x => Try(x._1.split("/").last.split("\.")(2)).getOrElse("").equals("FinancialLineItem")).flatMap(_._2.split("\n"))
val header = rdd.filter(_.contains("LineItem.organizationId")).map(line => line.split("\|\^\|")).first()
val schema = StructType(header.map(cols => StructField(cols.replace(".", "_"), StringType)).toSeq)
val data = sqlContext.createDataFrame(rdd.filter(!_.contains("LineItem.organizationId")).map(line => Row.fromSeq(line.split("\|\^\|").toSeq)), schema)
val get_cus_val = sqlContext.udf.register("get_cus_val", (filePath: String) => filePath.split("\.")(3))
val df1resultFinal = data.withColumn("DataPartition", get_cus_val(input_file_name))
val dataMain = df1resultFinal.withColumn("TimeStamp", lit(null: String))
mainFileURL 是我的目录名。
您可以使用 sc.wholeTextFiles 过滤掉您的文件名。使用下面的代码以获得所需的结果。
sc.wholeTextFiles(mainFileURL).filter(x => Try(x._1.split("/").last.split("\.")(2)).getOrElse("").equals("FinancialLineItem")).flatMap(_._2.split("\n"))
使用 glob 模式定义输入。随着 Dataset
spark.read.text(
s"${mainFileURL}/Fundamental.FinancialLineItem.FinancialLineItem*"
)
与RDD
:
spark.sparkContext.textFile(
s"${mainFileURL}/Fundamental.FinancialLineItem.FinancialLineItem*"
)
这样您将只扫描文件系统,避免加载整个数据(就像 wholeTextFiles
那样)。
我只需要将那些与特定模式匹配的目录中的文件加载到 运行 我的 spark 作业中。 例如,我在具有以下命名模式的目录中有 5k 个文本文件
Fundamental.FinancialLineItem.FinancialLineItem.Japan.BAL.1.2018-04-12-0542.Full
Fundamental.FinancialLineItem.FinancialLineItem.Japan.BUS.1.2018-04-12-0542.Full
Fundamental.FinancialLineItem.FinancialLineItem.SelfSourcedPrivate.SHE.1.2018-04-12-0542.Full
Fundamental.FinancialLineItem.FinancialLineItem.SelfSourcedPublic.PEN.1.2018-04-12-0542.Full
Fundamental.FinancialLineItem.Segments.Japan.1.2018-04-12-0542.Full
Fundamental.FinancialPeriod.FinancialPeriod.Japan.2018.1.2018-04-16-0348.Full
Fundamental.FinancialPeriod.Interim2Annual.Japan.1970.1.2018-04-13-0732.Full
像这样,我在一个目录中有 5k 个文本文件。
在我的 spark 作业中,如果我们用 "."
拆分文件名,我必须只加载从左到右第三个位置具有 FinancialLineItem
的文件。
所以这样只会加载前 4 个文件。
无论如何我只加载那个文件。如果我在 sc.TextFile()
中提供目录名称,那么将加载完整目录,这将占用大量 space。
目前我正在加载这样的文件
//Loading main file
val rdd = sc.textFile(mainFileURL)
//val rdd =sc.wholeTextFiles(mainFileURL).filter(x => x._1.contains("FinancialLineItem")).flatMap(_._2.split("\n"))
//val rdd =sc.wholeTextFiles(mainFileURL).filter(x => Try(x._1.split("/").last.split("\.")(2)).getOrElse("").equals("FinancialLineItem")).flatMap(_._2.split("\n"))
val header = rdd.filter(_.contains("LineItem.organizationId")).map(line => line.split("\|\^\|")).first()
val schema = StructType(header.map(cols => StructField(cols.replace(".", "_"), StringType)).toSeq)
val data = sqlContext.createDataFrame(rdd.filter(!_.contains("LineItem.organizationId")).map(line => Row.fromSeq(line.split("\|\^\|").toSeq)), schema)
val get_cus_val = sqlContext.udf.register("get_cus_val", (filePath: String) => filePath.split("\.")(3))
val df1resultFinal = data.withColumn("DataPartition", get_cus_val(input_file_name))
val dataMain = df1resultFinal.withColumn("TimeStamp", lit(null: String))
mainFileURL 是我的目录名。
您可以使用 sc.wholeTextFiles 过滤掉您的文件名。使用下面的代码以获得所需的结果。
sc.wholeTextFiles(mainFileURL).filter(x => Try(x._1.split("/").last.split("\.")(2)).getOrElse("").equals("FinancialLineItem")).flatMap(_._2.split("\n"))
使用 glob 模式定义输入。随着 Dataset
spark.read.text(
s"${mainFileURL}/Fundamental.FinancialLineItem.FinancialLineItem*"
)
与RDD
:
spark.sparkContext.textFile(
s"${mainFileURL}/Fundamental.FinancialLineItem.FinancialLineItem*"
)
这样您将只扫描文件系统,避免加载整个数据(就像 wholeTextFiles
那样)。