如何减少 AWS Deequ 上的代码重复
How to reduce code repetition on AWS Deequ
我有大约 5 个数据集(将来会增加,因此泛化很重要)它们调用具有共同标题的相同代码库,但我不确定如何着手确保
- 加载数据集
- 调用代码并写入不同的文件夹。
如果你能提供帮助那就太棒了,因为我是 Scala 的新手。这些是 AWS Glue 上的作业。唯一改变的是输入文件和结果的位置。
这里有一些示例 - 我想减少代码的重复:
import com.amazon.deequ.{VerificationSuite, VerificationResult}
import com.amazon.deequ.VerificationResult.checkResultsAsDataFrame
import com.amazon.deequ.checks.{Check, CheckLevel}
import com.amazon.deequ.constraints.{ConstrainableDataTypes}
import org.apache.spark.sql.functions.{length, max}
import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
object Deequ {
def main(args: Array[String]) {
val conf = new SparkConf().setAppName("dq")
val spark = SparkSession.builder().appName("dq").getOrCreate()
val dataset = spark.read.option("header",true).option("delimiter",",").csv("s3://ct-ire-
fin-stg-data-dev-raw-gib/templates
/Contract_Portfolio_Assignment/Contract_Portfolio_Assignement_Compass/contract-portfolio-
assignment-compass - Sheet1.csv")
val verificationResult: VerificationResult = { VerificationSuite()
// data to run the verification on
.onData(dataset)
// define a data quality check
.addCheck(
Check(CheckLevel.Error, "Template Validations")
.hasDataType("* Contract Category", ConstrainableDataTypes.Integral)
.hasMaxLength("* Contract Category", _==1)
.isComplete("* Contract Category")
.hasDataType("* Contract ID",ConstrainableDataTypes.String )
.hasMaxLength("* Contract ID", _ <= 40)
.isComplete("* Contract ID")
.hasDataType("* Key Date",ConstrainableDataTypes.Integral )
.hasMaxLength("* Key Date", _ <= 8)
.isComplete("* Key Date")
.hasDataType("* Portfolio Category",ConstrainableDataTypes.Integral )
.hasMaxLength("* Portfolio Category", _ <= 4)
.isComplete("* Portfolio Category")
.hasDataType("* Tranche Start Date",ConstrainableDataTypes.Integral)
.hasMaxLength("* Tranche Start Date", _ <= 8)
.isComplete("* Tranche Start Date")
// .isContainedIn("Portfolio Category", Array("2100"))
.hasDataType("Portfolio",ConstrainableDataTypes.String)
.hasMaxLength("Portfolio", _ <= 40)
.isComplete("Portfolio")
.hasDataType("Source System",ConstrainableDataTypes.String )
.hasMaxLength("Source System", _ <= 10)
.isComplete("Source System")
.isContainedIn("Source System", Array("LFST", "CLPB","CLCB","CLHR","CCLU"))
.hasDataType("Delivery Package",ConstrainableDataTypes.String)
.hasMaxLength("Delivery Package", _ <= 20)
.isComplete("Delivery Package")
// .isContainedIn("Legal Entity", Array("LP01", "LLAL"))
)
// compute metrics and verify check conditions
.run()
}
//val metrics1 = successMetricsAsDataFrame(spark, analysisResult1)
val resultDataFrame = checkResultsAsDataFrame(spark, verificationResult)
resultDataFrame.write.mode("overwrite").parquet("s3://ct-ire-fin-stg-data-dev-raw-
gib/template_validations/Contract-Portfolio-Assignment-Validations/Contract-Portfolio-
Assignment-Compass/")
}}
这是第二个代码库:
import com.amazon.deequ.{VerificationSuite, VerificationResult}
import com.amazon.deequ.VerificationResult.checkResultsAsDataFrame
import com.amazon.deequ.checks.{Check, CheckLevel}
import com.amazon.deequ.constraints.{ConstrainableDataTypes}
import org.apache.spark.sql.functions.{length, max}
import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
object Deequ {
def main(args: Array[String]) {
val conf = new SparkConf().setAppName("dq")
val spark = SparkSession.builder().appName("dq").getOrCreate()
val dataset = spark.read.option("header",true).option("delimiter",",").csv("s3://ct-ire-
fin-stg-data-dev-raw-gib/templates /Contract_Portfolio_Assignment/Contract-Portfolio-
Assignment-GIP/Portfolio-Assignment-GIP - Sheet1.csv")
val verificationResult: VerificationResult = { VerificationSuite()
// data to run the verification on
.onData(dataset)
// define a data quality check
.addCheck(
Check(CheckLevel.Error, "Template Validations")
.hasDataType("* Contract Category", ConstrainableDataTypes.Integral)
.hasMaxLength("* Contract Category", _==1)
.isComplete("* Contract Category")
.hasDataType("* Contract ID",ConstrainableDataTypes.String )
.hasMaxLength("* Contract ID", _ <= 40)
.isComplete("* Contract ID")
.hasDataType("* Key Date",ConstrainableDataTypes.Integral )
.hasMaxLength("* Key Date", _ <= 8)
.isComplete("* Key Date")
.hasDataType("* Portfolio Category",ConstrainableDataTypes.Integral )
.hasMaxLength("* Portfolio Category", _ <= 4)
.isComplete("* Portfolio Category")
.hasDataType("* Tranche Start Date",ConstrainableDataTypes.Integral)
.hasMaxLength("* Tranche Start Date", _ <= 8)
.isComplete("* Tranche Start Date")
// .isContainedIn("Portfolio Category", Array("2100"))
.hasDataType("Portfolio",ConstrainableDataTypes.String)
.hasMaxLength("Portfolio", _ <= 40)
.isComplete("Portfolio")
.hasDataType("Source System",ConstrainableDataTypes.String )
.hasMaxLength("Source System", _ <= 10)
.isComplete("Source System")
.isContainedIn("Source System", Array("LFST", "CLPB","CLCB","CLHR","CCLU"))
.hasDataType("Delivery Package",ConstrainableDataTypes.String)
.hasMaxLength("Delivery Package", _ <= 20)
.isComplete("Delivery Package")
// .isContainedIn("Legal Entity", Array("LP01", "LLAL"))
)
// compute metrics and verify check conditions
.run()
}
//val metrics1 = successMetricsAsDataFrame(spark, analysisResult1)
val resultDataFrame = checkResultsAsDataFrame(spark, verificationResult)
resultDataFrame.write.mode("overwrite").parquet("s3://ct-ire-fin-stg-data-dev-raw-
gib/template_validations/Contract-Portfolio-Assignment-Validations/Contract-Portfolio-
Assignement-GIP-Validations/")
}}
这是第三个:
import com.amazon.deequ.{VerificationSuite, VerificationResult}
import com.amazon.deequ.VerificationResult.checkResultsAsDataFrame
import com.amazon.deequ.checks.{Check, CheckLevel}
import com.amazon.deequ.constraints.{ConstrainableDataTypes}
import org.apache.spark.sql.functions.{length, max}
import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
object Deequ {
def main(args: Array[String]) {
val conf = new SparkConf().setAppName("dq")
val spark = SparkSession.builder().appName("dq").getOrCreate()
val dataset = spark.read.option("header",true).option("delimiter",",").csv("s3://ct-
ire-fin-stg-data-dev-raw-gib/templates /Contract_Portfolio_Assignment/Portfolio-
Assignment-Mobilife/Mobilife-Portforlio-Assessment - Sheet1.csv")
val verificationResult: VerificationResult = { VerificationSuite()
// data to run the verification on
.onData(dataset)
// define a data quality check
.addCheck(
Check(CheckLevel.Error, "Template Validations")
.hasDataType("* Contract Category", ConstrainableDataTypes.Integral)
.hasMaxLength("* Contract Category", _==1)
.isComplete("* Contract Category")
.hasDataType("* Contract ID",ConstrainableDataTypes.String )
.hasMaxLength("* Contract ID", _ <= 40)
.isComplete("* Contract ID")
.hasDataType("* Key Date",ConstrainableDataTypes.Integral )
.hasMaxLength("* Key Date", _ <= 8)
.isComplete("* Key Date")
.hasDataType("* Portfolio Category",ConstrainableDataTypes.Integral )
.hasMaxLength("* Portfolio Category", _ <= 4)
.isComplete("* Portfolio Category")
.hasDataType("* Tranche Start Date",ConstrainableDataTypes.Integral)
.hasMaxLength("* Tranche Start Date", _ <= 8)
.isComplete("* Tranche Start Date")
// .isContainedIn("Portfolio Category", Array("2100"))
.hasDataType("Portfolio",ConstrainableDataTypes.String)
.hasMaxLength("Portfolio", _ <= 40)
.isComplete("Portfolio")
.hasDataType("Source System",ConstrainableDataTypes.String )
.hasMaxLength("Source System", _ <= 10)
.isComplete("Source System")
.isContainedIn("Source System", Array("LFST", "CLPB","CLCB","CLHR","CCLU"))
.hasDataType("Delivery Package",ConstrainableDataTypes.String)
.hasMaxLength("Delivery Package", _ <= 20)
.isComplete("Delivery Package")
// .isContainedIn("Legal Entity", Array("LP01", "LLAL"))
)
// compute metrics and verify check conditions
.run()
}
//val metrics1 = successMetricsAsDataFrame(spark, analysisResult1)
val resultDataFrame = checkResultsAsDataFrame(spark, verificationResult)
resultDataFrame.write.mode("overwrite").parquet("s3://ct-ire-fin-stg-data-dev-raw-
gib/template_validations/Contract-Portfolio-Assignment-Validations/contract-portfolio-
assessment_Mobilife-Validations/")
}}
根据我从您的问题中了解到的情况,您可以创建执行通用逻辑的函数,并且可以从不同的地方调用相同的函数。根据不同工作流程的不同值,您的函数可以有多个参数。
//Common Function just for your reference but you can modify it as you want.
object CommonHelper {
def ProcessDataSet(spark : SparkSession,sourcePath : String , targetPath : String) : Unit = {
val dataset = spark.read.option("header",true).option("delimiter",",").csv(sourcePath)
val verificationResult: VerificationResult = { VerificationSuite()
// data to run the verification on
.onData(dataset)
// define a data quality check
.addCheck(
Check(CheckLevel.Error, "Template Validations")
.hasDataType("* Contract Category", ConstrainableDataTypes.Integral)
.hasMaxLength("* Contract Category", _==1)
.isComplete("* Contract Category")
.hasDataType("* Contract ID",ConstrainableDataTypes.String )
.hasMaxLength("* Contract ID", _ <= 40)
.isComplete("* Contract ID")
.hasDataType("* Key Date",ConstrainableDataTypes.Integral )
.hasMaxLength("* Key Date", _ <= 8)
.isComplete("* Key Date")
.hasDataType("* Portfolio Category",ConstrainableDataTypes.Integral )
.hasMaxLength("* Portfolio Category", _ <= 4)
.isComplete("* Portfolio Category")
.hasDataType("* Tranche Start Date",ConstrainableDataTypes.Integral)
.hasMaxLength("* Tranche Start Date", _ <= 8)
.isComplete("* Tranche Start Date")
// .isContainedIn("Portfolio Category", Array("2100"))
.hasDataType("Portfolio",ConstrainableDataTypes.String)
.hasMaxLength("Portfolio", _ <= 40)
.isComplete("Portfolio")
.hasDataType("Source System",ConstrainableDataTypes.String )
.hasMaxLength("Source System", _ <= 10)
.isComplete("Source System")
.isContainedIn("Source System", Array("LFST", "CLPB","CLCB","CLHR","CCLU"))
.hasDataType("Delivery Package",ConstrainableDataTypes.String)
.hasMaxLength("Delivery Package", _ <= 20)
.isComplete("Delivery Package")
// .isContainedIn("Legal Entity", Array("LP01", "LLAL"))
)
// compute metrics and verify check conditions
.run()
}
//val metrics1 = successMetricsAsDataFrame(spark, analysisResult1)
val resultDataFrame = checkResultsAsDataFrame(spark, verificationResult)
resultDataFrame.write.mode("overwrite").parquet(targetPath)
}
}
现在您可以从启动对象的主函数中调用它,如下所示。我已经为一个数据集展示了它,您可以将它重复用于其他数据集。
object Deequ {
def main(args: Array[String]) {
val conf = new SparkConf().setAppName("dq")
val spark = SparkSession.builder().appName("dq").getOrCreate()
val sourcePath1 = "s3://ct-ire-fin-stg-data-dev-raw-gib/templates/Contract_Portfolio_Assignment/Contract-Portfolio-Assignment-GIP/Portfolio-Assignment-GIP - Sheet1.csv"
val targetPath1 = "s3://ct-ire-fin-stg-data-dev-raw-gib/template_validations/Contract-Portfolio-Assignment-Validations/Contract-Portfolio-Assignement-GIP-Validations/"
CommonHelper.ProcessDataSet(spark,sourcePath1,targetPath1) //you can call this function from multiple places based on how you want to use that.
}}
我有大约 5 个数据集(将来会增加,因此泛化很重要)它们调用具有共同标题的相同代码库,但我不确定如何着手确保
- 加载数据集
- 调用代码并写入不同的文件夹。 如果你能提供帮助那就太棒了,因为我是 Scala 的新手。这些是 AWS Glue 上的作业。唯一改变的是输入文件和结果的位置。
这里有一些示例 - 我想减少代码的重复:
import com.amazon.deequ.{VerificationSuite, VerificationResult}
import com.amazon.deequ.VerificationResult.checkResultsAsDataFrame
import com.amazon.deequ.checks.{Check, CheckLevel}
import com.amazon.deequ.constraints.{ConstrainableDataTypes}
import org.apache.spark.sql.functions.{length, max}
import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
object Deequ {
def main(args: Array[String]) {
val conf = new SparkConf().setAppName("dq")
val spark = SparkSession.builder().appName("dq").getOrCreate()
val dataset = spark.read.option("header",true).option("delimiter",",").csv("s3://ct-ire-
fin-stg-data-dev-raw-gib/templates
/Contract_Portfolio_Assignment/Contract_Portfolio_Assignement_Compass/contract-portfolio-
assignment-compass - Sheet1.csv")
val verificationResult: VerificationResult = { VerificationSuite()
// data to run the verification on
.onData(dataset)
// define a data quality check
.addCheck(
Check(CheckLevel.Error, "Template Validations")
.hasDataType("* Contract Category", ConstrainableDataTypes.Integral)
.hasMaxLength("* Contract Category", _==1)
.isComplete("* Contract Category")
.hasDataType("* Contract ID",ConstrainableDataTypes.String )
.hasMaxLength("* Contract ID", _ <= 40)
.isComplete("* Contract ID")
.hasDataType("* Key Date",ConstrainableDataTypes.Integral )
.hasMaxLength("* Key Date", _ <= 8)
.isComplete("* Key Date")
.hasDataType("* Portfolio Category",ConstrainableDataTypes.Integral )
.hasMaxLength("* Portfolio Category", _ <= 4)
.isComplete("* Portfolio Category")
.hasDataType("* Tranche Start Date",ConstrainableDataTypes.Integral)
.hasMaxLength("* Tranche Start Date", _ <= 8)
.isComplete("* Tranche Start Date")
// .isContainedIn("Portfolio Category", Array("2100"))
.hasDataType("Portfolio",ConstrainableDataTypes.String)
.hasMaxLength("Portfolio", _ <= 40)
.isComplete("Portfolio")
.hasDataType("Source System",ConstrainableDataTypes.String )
.hasMaxLength("Source System", _ <= 10)
.isComplete("Source System")
.isContainedIn("Source System", Array("LFST", "CLPB","CLCB","CLHR","CCLU"))
.hasDataType("Delivery Package",ConstrainableDataTypes.String)
.hasMaxLength("Delivery Package", _ <= 20)
.isComplete("Delivery Package")
// .isContainedIn("Legal Entity", Array("LP01", "LLAL"))
)
// compute metrics and verify check conditions
.run()
}
//val metrics1 = successMetricsAsDataFrame(spark, analysisResult1)
val resultDataFrame = checkResultsAsDataFrame(spark, verificationResult)
resultDataFrame.write.mode("overwrite").parquet("s3://ct-ire-fin-stg-data-dev-raw-
gib/template_validations/Contract-Portfolio-Assignment-Validations/Contract-Portfolio-
Assignment-Compass/")
}}
这是第二个代码库:
import com.amazon.deequ.{VerificationSuite, VerificationResult}
import com.amazon.deequ.VerificationResult.checkResultsAsDataFrame
import com.amazon.deequ.checks.{Check, CheckLevel}
import com.amazon.deequ.constraints.{ConstrainableDataTypes}
import org.apache.spark.sql.functions.{length, max}
import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
object Deequ {
def main(args: Array[String]) {
val conf = new SparkConf().setAppName("dq")
val spark = SparkSession.builder().appName("dq").getOrCreate()
val dataset = spark.read.option("header",true).option("delimiter",",").csv("s3://ct-ire-
fin-stg-data-dev-raw-gib/templates /Contract_Portfolio_Assignment/Contract-Portfolio-
Assignment-GIP/Portfolio-Assignment-GIP - Sheet1.csv")
val verificationResult: VerificationResult = { VerificationSuite()
// data to run the verification on
.onData(dataset)
// define a data quality check
.addCheck(
Check(CheckLevel.Error, "Template Validations")
.hasDataType("* Contract Category", ConstrainableDataTypes.Integral)
.hasMaxLength("* Contract Category", _==1)
.isComplete("* Contract Category")
.hasDataType("* Contract ID",ConstrainableDataTypes.String )
.hasMaxLength("* Contract ID", _ <= 40)
.isComplete("* Contract ID")
.hasDataType("* Key Date",ConstrainableDataTypes.Integral )
.hasMaxLength("* Key Date", _ <= 8)
.isComplete("* Key Date")
.hasDataType("* Portfolio Category",ConstrainableDataTypes.Integral )
.hasMaxLength("* Portfolio Category", _ <= 4)
.isComplete("* Portfolio Category")
.hasDataType("* Tranche Start Date",ConstrainableDataTypes.Integral)
.hasMaxLength("* Tranche Start Date", _ <= 8)
.isComplete("* Tranche Start Date")
// .isContainedIn("Portfolio Category", Array("2100"))
.hasDataType("Portfolio",ConstrainableDataTypes.String)
.hasMaxLength("Portfolio", _ <= 40)
.isComplete("Portfolio")
.hasDataType("Source System",ConstrainableDataTypes.String )
.hasMaxLength("Source System", _ <= 10)
.isComplete("Source System")
.isContainedIn("Source System", Array("LFST", "CLPB","CLCB","CLHR","CCLU"))
.hasDataType("Delivery Package",ConstrainableDataTypes.String)
.hasMaxLength("Delivery Package", _ <= 20)
.isComplete("Delivery Package")
// .isContainedIn("Legal Entity", Array("LP01", "LLAL"))
)
// compute metrics and verify check conditions
.run()
}
//val metrics1 = successMetricsAsDataFrame(spark, analysisResult1)
val resultDataFrame = checkResultsAsDataFrame(spark, verificationResult)
resultDataFrame.write.mode("overwrite").parquet("s3://ct-ire-fin-stg-data-dev-raw-
gib/template_validations/Contract-Portfolio-Assignment-Validations/Contract-Portfolio-
Assignement-GIP-Validations/")
}}
这是第三个:
import com.amazon.deequ.{VerificationSuite, VerificationResult}
import com.amazon.deequ.VerificationResult.checkResultsAsDataFrame
import com.amazon.deequ.checks.{Check, CheckLevel}
import com.amazon.deequ.constraints.{ConstrainableDataTypes}
import org.apache.spark.sql.functions.{length, max}
import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
object Deequ {
def main(args: Array[String]) {
val conf = new SparkConf().setAppName("dq")
val spark = SparkSession.builder().appName("dq").getOrCreate()
val dataset = spark.read.option("header",true).option("delimiter",",").csv("s3://ct-
ire-fin-stg-data-dev-raw-gib/templates /Contract_Portfolio_Assignment/Portfolio-
Assignment-Mobilife/Mobilife-Portforlio-Assessment - Sheet1.csv")
val verificationResult: VerificationResult = { VerificationSuite()
// data to run the verification on
.onData(dataset)
// define a data quality check
.addCheck(
Check(CheckLevel.Error, "Template Validations")
.hasDataType("* Contract Category", ConstrainableDataTypes.Integral)
.hasMaxLength("* Contract Category", _==1)
.isComplete("* Contract Category")
.hasDataType("* Contract ID",ConstrainableDataTypes.String )
.hasMaxLength("* Contract ID", _ <= 40)
.isComplete("* Contract ID")
.hasDataType("* Key Date",ConstrainableDataTypes.Integral )
.hasMaxLength("* Key Date", _ <= 8)
.isComplete("* Key Date")
.hasDataType("* Portfolio Category",ConstrainableDataTypes.Integral )
.hasMaxLength("* Portfolio Category", _ <= 4)
.isComplete("* Portfolio Category")
.hasDataType("* Tranche Start Date",ConstrainableDataTypes.Integral)
.hasMaxLength("* Tranche Start Date", _ <= 8)
.isComplete("* Tranche Start Date")
// .isContainedIn("Portfolio Category", Array("2100"))
.hasDataType("Portfolio",ConstrainableDataTypes.String)
.hasMaxLength("Portfolio", _ <= 40)
.isComplete("Portfolio")
.hasDataType("Source System",ConstrainableDataTypes.String )
.hasMaxLength("Source System", _ <= 10)
.isComplete("Source System")
.isContainedIn("Source System", Array("LFST", "CLPB","CLCB","CLHR","CCLU"))
.hasDataType("Delivery Package",ConstrainableDataTypes.String)
.hasMaxLength("Delivery Package", _ <= 20)
.isComplete("Delivery Package")
// .isContainedIn("Legal Entity", Array("LP01", "LLAL"))
)
// compute metrics and verify check conditions
.run()
}
//val metrics1 = successMetricsAsDataFrame(spark, analysisResult1)
val resultDataFrame = checkResultsAsDataFrame(spark, verificationResult)
resultDataFrame.write.mode("overwrite").parquet("s3://ct-ire-fin-stg-data-dev-raw-
gib/template_validations/Contract-Portfolio-Assignment-Validations/contract-portfolio-
assessment_Mobilife-Validations/")
}}
根据我从您的问题中了解到的情况,您可以创建执行通用逻辑的函数,并且可以从不同的地方调用相同的函数。根据不同工作流程的不同值,您的函数可以有多个参数。
//Common Function just for your reference but you can modify it as you want.
object CommonHelper {
def ProcessDataSet(spark : SparkSession,sourcePath : String , targetPath : String) : Unit = {
val dataset = spark.read.option("header",true).option("delimiter",",").csv(sourcePath)
val verificationResult: VerificationResult = { VerificationSuite()
// data to run the verification on
.onData(dataset)
// define a data quality check
.addCheck(
Check(CheckLevel.Error, "Template Validations")
.hasDataType("* Contract Category", ConstrainableDataTypes.Integral)
.hasMaxLength("* Contract Category", _==1)
.isComplete("* Contract Category")
.hasDataType("* Contract ID",ConstrainableDataTypes.String )
.hasMaxLength("* Contract ID", _ <= 40)
.isComplete("* Contract ID")
.hasDataType("* Key Date",ConstrainableDataTypes.Integral )
.hasMaxLength("* Key Date", _ <= 8)
.isComplete("* Key Date")
.hasDataType("* Portfolio Category",ConstrainableDataTypes.Integral )
.hasMaxLength("* Portfolio Category", _ <= 4)
.isComplete("* Portfolio Category")
.hasDataType("* Tranche Start Date",ConstrainableDataTypes.Integral)
.hasMaxLength("* Tranche Start Date", _ <= 8)
.isComplete("* Tranche Start Date")
// .isContainedIn("Portfolio Category", Array("2100"))
.hasDataType("Portfolio",ConstrainableDataTypes.String)
.hasMaxLength("Portfolio", _ <= 40)
.isComplete("Portfolio")
.hasDataType("Source System",ConstrainableDataTypes.String )
.hasMaxLength("Source System", _ <= 10)
.isComplete("Source System")
.isContainedIn("Source System", Array("LFST", "CLPB","CLCB","CLHR","CCLU"))
.hasDataType("Delivery Package",ConstrainableDataTypes.String)
.hasMaxLength("Delivery Package", _ <= 20)
.isComplete("Delivery Package")
// .isContainedIn("Legal Entity", Array("LP01", "LLAL"))
)
// compute metrics and verify check conditions
.run()
}
//val metrics1 = successMetricsAsDataFrame(spark, analysisResult1)
val resultDataFrame = checkResultsAsDataFrame(spark, verificationResult)
resultDataFrame.write.mode("overwrite").parquet(targetPath)
}
}
现在您可以从启动对象的主函数中调用它,如下所示。我已经为一个数据集展示了它,您可以将它重复用于其他数据集。
object Deequ {
def main(args: Array[String]) {
val conf = new SparkConf().setAppName("dq")
val spark = SparkSession.builder().appName("dq").getOrCreate()
val sourcePath1 = "s3://ct-ire-fin-stg-data-dev-raw-gib/templates/Contract_Portfolio_Assignment/Contract-Portfolio-Assignment-GIP/Portfolio-Assignment-GIP - Sheet1.csv"
val targetPath1 = "s3://ct-ire-fin-stg-data-dev-raw-gib/template_validations/Contract-Portfolio-Assignment-Validations/Contract-Portfolio-Assignement-GIP-Validations/"
CommonHelper.ProcessDataSet(spark,sourcePath1,targetPath1) //you can call this function from multiple places based on how you want to use that.
}}