如何拆分输入文件名并在 spark 数据框列中添加特定值
How to split input file name and add specific value in the spark data frame column
这就是我在 spark 数据框中加载 csv 文件的方式
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
import sqlContext.implicits._
import org.apache.spark.{ SparkConf, SparkContext }
import java.sql.{Date, Timestamp}
import org.apache.spark.sql.Row
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions.udf
val get_cus_val = spark.udf.register("get_cus_val", (filePath: String) => filePath.split("\.")(4))
val df1With_ = df.toDF(df.columns.map(_.replace(".", "_")): _*)
val column_to_keep = df1With_.columns.filter(v => (!v.contains("^") && !v.contains("!") && !v.contains("_c"))).toSeq
val df1result = df1With_.select(column_to_keep.head, column_to_keep.tail: _*)
val df1Final=df1result.withColumn("DataPartition", lit(null: String))
这是我的输入文件名之一的示例。
Fundamental.FinancialLineItem.FinancialLineItem.SelfSourcedPrivate.CUS.1.2017-09-07-1056.Full
Fundamental.FinancialLineItem.FinancialLineItem.Japan.CUS.1.2017-09-07-1056.Full.txt
现在我想读取这个文件并用“.”分割它。运算符,然后将 CUS 添加为新列以代替 DataPartition 。
我可以不使用任何 UDF 吗?
这是现有数据框的模式
root
|-- LineItem_organizationId: long (nullable = true)
|-- LineItem_lineItemId: integer (nullable = true)
|-- StatementTypeCode: string (nullable = true)
|-- LineItemName: string (nullable = true)
|-- LocalLanguageLabel: string (nullable = true)
|-- FinancialConceptLocal: string (nullable = true)
|-- FinancialConceptGlobal: string (nullable = true)
|-- IsDimensional: boolean (nullable = true)
|-- InstrumentId: string (nullable = true)
|-- LineItemSequence: string (nullable = true)
|-- PhysicalMeasureId: string (nullable = true)
|-- FinancialConceptCodeGlobalSecondary: string (nullable = true)
|-- IsRangeAllowed: boolean (nullable = true)
|-- IsSegmentedByOrigin: boolean (nullable = true)
|-- SegmentGroupDescription: string (nullable = true)
|-- SegmentChildDescription: string (nullable = true)
|-- SegmentChildLocalLanguageLabel: string (nullable = true)
|-- LocalLanguageLabel_languageId: integer (nullable = true)
|-- LineItemName_languageId: integer (nullable = true)
|-- SegmentChildDescription_languageId: integer (nullable = true)
|-- SegmentChildLocalLanguageLabel_languageId: integer (nullable = true)
|-- SegmentGroupDescription_languageId: integer (nullable = true)
|-- SegmentMultipleFundbDescription: string (nullable = true)
|-- SegmentMultipleFundbDescription_languageId: integer (nullable = true)
|-- IsCredit: boolean (nullable = true)
|-- FinancialConceptLocalId: integer (nullable = true)
|-- FinancialConceptGlobalId: integer (nullable = true)
|-- FinancialConceptCodeGlobalSecondaryId: string (nullable = true)
|-- FFAction: string (nullable = true)
根据建议的答案更新代码
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
import sqlContext.implicits._
import org.apache.spark.{ SparkConf, SparkContext }
import java.sql.{Date, Timestamp}
import org.apache.spark.sql.Row
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions.udf
import org.apache.spark.sql.functions.{input_file_name, regexp_extract}
spark.udf.register("get_cus_val", (filePath: String) => filePath.split("\.")(4))
import org.apache.spark.sql.functions.input_file_name
val df = sqlContext.read.format("csv").option("header", "true").option("delimiter", "|").option("inferSchema","true").load("s3://trfsdisu/SPARK/FinancialLineItem/MAIN")
val df1With_ = df.toDF(df.columns.map(_.replace(".", "_")): _*)
val column_to_keep = df1With_.columns.filter(v => (!v.contains("^") && !v.contains("!") && !v.contains("_c"))).toSeq
val df1result = df1With_.select(column_to_keep.head, column_to_keep.tail: _*)
df1result.withColumn("cus_val", get_cus_val(input_file_name))
df1result.printSchema()
您可以使用预定义的 UDF 获取文件名,即 input_file_name()
,之后您可以创建一个 UDF 来提取 CUS 或使用 regexp_extract
wo自定义函数。
使用 regexp_extract
wo UDF regex usage here
import org.apache.spark.sql.functions.input_file_name
import org.apache.spark.sql.functions.regexp_extract
df.withColumn("cus_val",
regexp_extract(input_file_name, "\.(\w+)\.[0-9]+\.", 1))
使用自定义 UDF
import org.apache.spark.sql.functions.udf
val get_cus_val = udf(filePath: String => filePath.split("\.")(4))
import org.apache.spark.sql.functions.input_file_name
df.withColumn("cus_val", get_cus_val(input_file_name))
这就是我在 spark 数据框中加载 csv 文件的方式
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
import sqlContext.implicits._
import org.apache.spark.{ SparkConf, SparkContext }
import java.sql.{Date, Timestamp}
import org.apache.spark.sql.Row
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions.udf
val get_cus_val = spark.udf.register("get_cus_val", (filePath: String) => filePath.split("\.")(4))
val df1With_ = df.toDF(df.columns.map(_.replace(".", "_")): _*)
val column_to_keep = df1With_.columns.filter(v => (!v.contains("^") && !v.contains("!") && !v.contains("_c"))).toSeq
val df1result = df1With_.select(column_to_keep.head, column_to_keep.tail: _*)
val df1Final=df1result.withColumn("DataPartition", lit(null: String))
这是我的输入文件名之一的示例。
Fundamental.FinancialLineItem.FinancialLineItem.SelfSourcedPrivate.CUS.1.2017-09-07-1056.Full
Fundamental.FinancialLineItem.FinancialLineItem.Japan.CUS.1.2017-09-07-1056.Full.txt
现在我想读取这个文件并用“.”分割它。运算符,然后将 CUS 添加为新列以代替 DataPartition 。
我可以不使用任何 UDF 吗?
这是现有数据框的模式
root
|-- LineItem_organizationId: long (nullable = true)
|-- LineItem_lineItemId: integer (nullable = true)
|-- StatementTypeCode: string (nullable = true)
|-- LineItemName: string (nullable = true)
|-- LocalLanguageLabel: string (nullable = true)
|-- FinancialConceptLocal: string (nullable = true)
|-- FinancialConceptGlobal: string (nullable = true)
|-- IsDimensional: boolean (nullable = true)
|-- InstrumentId: string (nullable = true)
|-- LineItemSequence: string (nullable = true)
|-- PhysicalMeasureId: string (nullable = true)
|-- FinancialConceptCodeGlobalSecondary: string (nullable = true)
|-- IsRangeAllowed: boolean (nullable = true)
|-- IsSegmentedByOrigin: boolean (nullable = true)
|-- SegmentGroupDescription: string (nullable = true)
|-- SegmentChildDescription: string (nullable = true)
|-- SegmentChildLocalLanguageLabel: string (nullable = true)
|-- LocalLanguageLabel_languageId: integer (nullable = true)
|-- LineItemName_languageId: integer (nullable = true)
|-- SegmentChildDescription_languageId: integer (nullable = true)
|-- SegmentChildLocalLanguageLabel_languageId: integer (nullable = true)
|-- SegmentGroupDescription_languageId: integer (nullable = true)
|-- SegmentMultipleFundbDescription: string (nullable = true)
|-- SegmentMultipleFundbDescription_languageId: integer (nullable = true)
|-- IsCredit: boolean (nullable = true)
|-- FinancialConceptLocalId: integer (nullable = true)
|-- FinancialConceptGlobalId: integer (nullable = true)
|-- FinancialConceptCodeGlobalSecondaryId: string (nullable = true)
|-- FFAction: string (nullable = true)
根据建议的答案更新代码
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
import sqlContext.implicits._
import org.apache.spark.{ SparkConf, SparkContext }
import java.sql.{Date, Timestamp}
import org.apache.spark.sql.Row
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions.udf
import org.apache.spark.sql.functions.{input_file_name, regexp_extract}
spark.udf.register("get_cus_val", (filePath: String) => filePath.split("\.")(4))
import org.apache.spark.sql.functions.input_file_name
val df = sqlContext.read.format("csv").option("header", "true").option("delimiter", "|").option("inferSchema","true").load("s3://trfsdisu/SPARK/FinancialLineItem/MAIN")
val df1With_ = df.toDF(df.columns.map(_.replace(".", "_")): _*)
val column_to_keep = df1With_.columns.filter(v => (!v.contains("^") && !v.contains("!") && !v.contains("_c"))).toSeq
val df1result = df1With_.select(column_to_keep.head, column_to_keep.tail: _*)
df1result.withColumn("cus_val", get_cus_val(input_file_name))
df1result.printSchema()
您可以使用预定义的 UDF 获取文件名,即 input_file_name()
,之后您可以创建一个 UDF 来提取 CUS 或使用 regexp_extract
wo自定义函数。
使用 regexp_extract
wo UDF regex usage here
import org.apache.spark.sql.functions.input_file_name
import org.apache.spark.sql.functions.regexp_extract
df.withColumn("cus_val",
regexp_extract(input_file_name, "\.(\w+)\.[0-9]+\.", 1))
使用自定义 UDF
import org.apache.spark.sql.functions.udf
val get_cus_val = udf(filePath: String => filePath.split("\.")(4))
import org.apache.spark.sql.functions.input_file_name
df.withColumn("cus_val", get_cus_val(input_file_name))