使用 spark dataframe 进行字段数据验证
Field data validation using spark dataframe
我有一堆列,样本如下所示。
我需要检查列是否有错误,并且必须生成两个输出文件。
我正在使用 Apache Spark 2.0,我想以高效的方式执行此操作。
Schema Details
---------------
EMPID - (NUMBER)
ENAME - (STRING,SIZE(50))
GENDER - (STRING,SIZE(1))
Data
----
EMPID,ENAME,GENDER
1001,RIO,M
1010,RICK,MM
1015,123MYA,F
我的输出文件应该如下所示:
1.
EMPID,ENAME,GENDER
1001,RIO,M
1010,RICK,NULL
1015,NULL,F
2.
EMPID,ERROR_COLUMN,ERROR_VALUE,ERROR_DESCRIPTION
1010,GENDER,"MM","OVERSIZED"
1010,GENDER,"MM","VALUE INVALID FOR GENDER"
1015,ENAME,"123MYA","NAME SHOULD BE A STRING"
谢谢
我还没有真正使用过 Spark 2.0,因此我将尝试使用 Spark 1.6 中的解决方案来回答您的问题。
// Load you base data
val input = <<you input dataframe>>
//Extract the schema of your base data
val originalSchema = input.schema
// Modify you existing schema with you additional metadata fields
val modifiedSchema= originalSchema.add("ERROR_COLUMN", StringType, true)
.add("ERROR_VALUE", StringType, true)
.add("ERROR_DESCRIPTION", StringType, true)
// write a custom validation function
def validateColumns(row: Row): Row = {
var err_col: String = null
var err_val: String = null
var err_desc: String = null
val empId = row.getAs[String]("EMPID")
val ename = row.getAs[String]("ENAME")
val gender = row.getAs[String]("GENDER")
// do checking here and populate (err_col,err_val,err_desc) with values if applicable
Row.merge(row, Row(err_col),Row(err_val),Row(err_desc))
}
// Call you custom validation function
val validateDF = input.map { row => validateColumns(row) }
// Reconstruct the DataFrame with additional columns
val checkedDf = sqlContext.createDataFrame(validateDF, newSchema)
// Filter out row having errors
val errorDf = checkedDf.filter($"ERROR_COLUMN".isNotNull && $"ERROR_VALUE".isNotNull && $"ERROR_DESCRIPTION".isNotNull)
// Filter our row having no errors
val errorFreeDf = checkedDf.filter($"ERROR_COLUMN".isNull && !$"ERROR_VALUE".isNull && !$"ERROR_DESCRIPTION".isNull)
我个人使用过这种方法,它对我很有效。我希望它能为您指明正确的方向。
我有一堆列,样本如下所示。 我需要检查列是否有错误,并且必须生成两个输出文件。 我正在使用 Apache Spark 2.0,我想以高效的方式执行此操作。
Schema Details
---------------
EMPID - (NUMBER)
ENAME - (STRING,SIZE(50))
GENDER - (STRING,SIZE(1))
Data
----
EMPID,ENAME,GENDER
1001,RIO,M
1010,RICK,MM
1015,123MYA,F
我的输出文件应该如下所示:
1.
EMPID,ENAME,GENDER
1001,RIO,M
1010,RICK,NULL
1015,NULL,F
2.
EMPID,ERROR_COLUMN,ERROR_VALUE,ERROR_DESCRIPTION
1010,GENDER,"MM","OVERSIZED"
1010,GENDER,"MM","VALUE INVALID FOR GENDER"
1015,ENAME,"123MYA","NAME SHOULD BE A STRING"
谢谢
我还没有真正使用过 Spark 2.0,因此我将尝试使用 Spark 1.6 中的解决方案来回答您的问题。
// Load you base data
val input = <<you input dataframe>>
//Extract the schema of your base data
val originalSchema = input.schema
// Modify you existing schema with you additional metadata fields
val modifiedSchema= originalSchema.add("ERROR_COLUMN", StringType, true)
.add("ERROR_VALUE", StringType, true)
.add("ERROR_DESCRIPTION", StringType, true)
// write a custom validation function
def validateColumns(row: Row): Row = {
var err_col: String = null
var err_val: String = null
var err_desc: String = null
val empId = row.getAs[String]("EMPID")
val ename = row.getAs[String]("ENAME")
val gender = row.getAs[String]("GENDER")
// do checking here and populate (err_col,err_val,err_desc) with values if applicable
Row.merge(row, Row(err_col),Row(err_val),Row(err_desc))
}
// Call you custom validation function
val validateDF = input.map { row => validateColumns(row) }
// Reconstruct the DataFrame with additional columns
val checkedDf = sqlContext.createDataFrame(validateDF, newSchema)
// Filter out row having errors
val errorDf = checkedDf.filter($"ERROR_COLUMN".isNotNull && $"ERROR_VALUE".isNotNull && $"ERROR_DESCRIPTION".isNotNull)
// Filter our row having no errors
val errorFreeDf = checkedDf.filter($"ERROR_COLUMN".isNull && !$"ERROR_VALUE".isNull && !$"ERROR_DESCRIPTION".isNull)
我个人使用过这种方法,它对我很有效。我希望它能为您指明正确的方向。