Scala Spark:如何将生成的方法列表添加到函数
Scala Spark: how to add list of generated methods to a function
我正在使用 Amazon deequ 生成测试用例,这些用例 returns 遵循我想在进一步功能中使用的方法列表,而不是单独编码。
var rows = suggestionDataFrame.select("_3").collect().map(_.getString(0)).mkString(" ")
// var rows = suggestionDataFrame.select("_3").collect.map { row => row.toString() .mkString("")}
返回的行低于方法列表
.hasCompleteness("Id", _ >= 0.95, Some("It should be above 0.95!")) .isNonNegative("Id")
.isComplete("LastModifiedDate")
在下一个函数中,我想在下面传递这些值,例如
val verificationResult: VerificationResult = {
VerificationSuite()
.onData(datasource)
.addCheck(
Check(CheckLevel.Error, "Data Validation Check")
//this is how i want to pass
.hasCompleteness("Id", _ >= 0.95, Some("It should be above 0.95!"))
.isNonNegative("Id")
.isComplete("LastModifiedDate"))
.run()
}
当我像下面这样直接传递行时,它会抛出错误
val verificationResult: VerificationResult = {
VerificationSuite()
.onData(datasource)
.addCheck(
Check(CheckLevel.Error, "Data Validation Check")
rows).run() //throwing error here
}
有什么办法吗??
参考:https://aws.amazon.com/blogs/big-data/test-data-quality-at-scale-with-deequ/
这是我目前尝试过的方法
package com.myorg.dataquality
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.SparkSession
import com.amazon.deequ.suggestions.{ ConstraintSuggestionRunner, Rules }
import com.amazon.deequ.{ VerificationSuite, VerificationResult }
import com.amazon.deequ.VerificationResult.checkResultsAsDataFrame
import com.amazon.deequ.checks.{ Check, CheckLevel }
import scala.collection.mutable.ArrayBuffer
object DataVerification2 {
def main(args: Array[String]) {
val spark = SparkSession.builder.appName("Sample")
.master("local")
.getOrCreate()
val datasource = spark.read.format("jdbc").option("url", "jdbc:sqlserver://host:1433;database=mydb").option("driver", "com.microsoft.sqlserver.jdbc.SQLServerDriver").option("dbtable", "dbo.table").option("user", "myuser").option("password", "password").option("useSSL", "false").load()
datasource.printSchema()
val datadestination = spark.read.format("jdbc").option("url", "jdbc:sqlserver://host:1433;database=mydb").option("driver", "com.microsoft.sqlserver.jdbc.SQLServerDriver").option("dbtable", "dbo.table").option("user", "myuser").option("password", "password").option("useSSL", "false").load()
//datapond.printSchema()
import spark.implicits._
//Compute constraint suggestions for us on the data
val suggestionResult = {
ConstraintSuggestionRunner()
// data to suggest constraints for
.onData(datasource)
// default set of rules for constraint suggestion
.addConstraintRules(Rules.DEFAULT)
// run data profiling and constraint suggestion
.run()
}
// We can now investigate the constraints that Deequ suggested.
val suggestionDataFrame = suggestionResult.constraintSuggestions.flatMap {
case (column, suggestions) =>
suggestions.map { constraint =>
(column, constraint.description, constraint.codeForConstraint)
}
}.toSeq.toDS()
suggestionDataFrame.toJSON.collect.foreach(println)
var rows = suggestionDataFrame.select("_3").collect().map(_.getString(0)).mkString(" ")
// var rows = suggestionDataFrame.select("_3").collect.map { row => row.toString() .mkString("")}
// var rows = suggestionDataFrame.select("_3").collect().map(t => println(t))
// var rows = suggestionDataFrame.select("_3").collect.map(_.toSeq)
var checks = Array[Check]()
var checkLevel = "Check(CheckLevel.Error)"
var finalcheck = checkLevel.concat(rows)
checks :+ finalcheck
// I am expecting validation result but this is returning me empty result
val verificationResult: VerificationResult = {
VerificationSuite().onData(datadestination).addChecks(checks).run()
}
val resultDataFrame = checkResultsAsDataFrame(spark, verificationResult)
resultDataFrame.show()
resultDataFrame.filter(resultDataFrame("constraint_status") === "Failure").toJSON.collect.foreach(println)
}
}
这将返回一个空结果:
+-----+-----------+------------+----------+-----------------+------------------+
|check|check_level|check_status|constraint|constraint_status|constraint_message|
+-----+-----------+------------+----------+-----------------+------------------+
+-----+-----------+------------+----------+-----------------+------------------+
看来我没有在数组中添加元素或以错误的方式实现它,正在寻找一些关于此的建议。
更新 1:
我试过使用下面的代码,但是它抛出错误:
val constraints = suggestionResult.constraintSuggestions.flatMap {
case (column, suggestions) =>
suggestions.map { constraint =>
(constraint.codeForConstraint)
}
}
val generatedCheck = Check(CheckLevel.Warning, "generated constraints", constraints)
val verificationResult = VerificationSuite()
.onData(datadestination)
.addChecks(generatedCheck)
.run()
错误:
type mismatch; found : scala.collection.immutable.Iterable[String] required: Seq[com.amazon.deequ.constraints.Constraint]
更新二:
var rows = suggestionDataFrame.select("_3").collect.map(_.toSeq)
var checks: Seq[Check] = Seq()
checks :+ rows
val generatedCheck = Check(CheckLevel.Warning, "generated constraints", checks)
val verificationResult = VerificationSuite()
.onData(datadestination)
.addChecks(generatedCheck)
.run()
错误:
type mismatch; found : Seq[com.amazon.deequ.checks.Check] required: Seq[com.amazon.deequ.constraints.Constraint]
如果我正确理解你的问题,那么你想将建议的约束添加到你的验证中 运行。这是 deequ 中的代码片段的 link,它执行类似的操作:
我希望这可以作为您如何继续的模板。您需要从约束建议(而不是数据框)中收集约束并根据它们创建检查。
更新 1:
我们实际上提供了带有建议结果的约束方法,如果您按如下方式替换上面的行,您的代码应该可以工作:
val allConstraints = suggestionResult.constraintSuggestions
.flatMap { case (_, suggestions) => suggestions.map { _.constraint }}
.toSeq
val generatedCheck = Check(CheckLevel.Error, "generated constraints", allConstraints)
val verificationResult = VerificationSuite()
.onData(datasource)
.addChecks(Seq(generatedCheck))
.run()
我正在使用 Amazon deequ 生成测试用例,这些用例 returns 遵循我想在进一步功能中使用的方法列表,而不是单独编码。
var rows = suggestionDataFrame.select("_3").collect().map(_.getString(0)).mkString(" ")
// var rows = suggestionDataFrame.select("_3").collect.map { row => row.toString() .mkString("")}
返回的行低于方法列表
.hasCompleteness("Id", _ >= 0.95, Some("It should be above 0.95!")) .isNonNegative("Id")
.isComplete("LastModifiedDate")
在下一个函数中,我想在下面传递这些值,例如
val verificationResult: VerificationResult = {
VerificationSuite()
.onData(datasource)
.addCheck(
Check(CheckLevel.Error, "Data Validation Check")
//this is how i want to pass
.hasCompleteness("Id", _ >= 0.95, Some("It should be above 0.95!"))
.isNonNegative("Id")
.isComplete("LastModifiedDate"))
.run()
}
当我像下面这样直接传递行时,它会抛出错误
val verificationResult: VerificationResult = {
VerificationSuite()
.onData(datasource)
.addCheck(
Check(CheckLevel.Error, "Data Validation Check")
rows).run() //throwing error here
}
有什么办法吗??
参考:https://aws.amazon.com/blogs/big-data/test-data-quality-at-scale-with-deequ/
这是我目前尝试过的方法
package com.myorg.dataquality
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.SparkSession
import com.amazon.deequ.suggestions.{ ConstraintSuggestionRunner, Rules }
import com.amazon.deequ.{ VerificationSuite, VerificationResult }
import com.amazon.deequ.VerificationResult.checkResultsAsDataFrame
import com.amazon.deequ.checks.{ Check, CheckLevel }
import scala.collection.mutable.ArrayBuffer
object DataVerification2 {
def main(args: Array[String]) {
val spark = SparkSession.builder.appName("Sample")
.master("local")
.getOrCreate()
val datasource = spark.read.format("jdbc").option("url", "jdbc:sqlserver://host:1433;database=mydb").option("driver", "com.microsoft.sqlserver.jdbc.SQLServerDriver").option("dbtable", "dbo.table").option("user", "myuser").option("password", "password").option("useSSL", "false").load()
datasource.printSchema()
val datadestination = spark.read.format("jdbc").option("url", "jdbc:sqlserver://host:1433;database=mydb").option("driver", "com.microsoft.sqlserver.jdbc.SQLServerDriver").option("dbtable", "dbo.table").option("user", "myuser").option("password", "password").option("useSSL", "false").load()
//datapond.printSchema()
import spark.implicits._
//Compute constraint suggestions for us on the data
val suggestionResult = {
ConstraintSuggestionRunner()
// data to suggest constraints for
.onData(datasource)
// default set of rules for constraint suggestion
.addConstraintRules(Rules.DEFAULT)
// run data profiling and constraint suggestion
.run()
}
// We can now investigate the constraints that Deequ suggested.
val suggestionDataFrame = suggestionResult.constraintSuggestions.flatMap {
case (column, suggestions) =>
suggestions.map { constraint =>
(column, constraint.description, constraint.codeForConstraint)
}
}.toSeq.toDS()
suggestionDataFrame.toJSON.collect.foreach(println)
var rows = suggestionDataFrame.select("_3").collect().map(_.getString(0)).mkString(" ")
// var rows = suggestionDataFrame.select("_3").collect.map { row => row.toString() .mkString("")}
// var rows = suggestionDataFrame.select("_3").collect().map(t => println(t))
// var rows = suggestionDataFrame.select("_3").collect.map(_.toSeq)
var checks = Array[Check]()
var checkLevel = "Check(CheckLevel.Error)"
var finalcheck = checkLevel.concat(rows)
checks :+ finalcheck
// I am expecting validation result but this is returning me empty result
val verificationResult: VerificationResult = {
VerificationSuite().onData(datadestination).addChecks(checks).run()
}
val resultDataFrame = checkResultsAsDataFrame(spark, verificationResult)
resultDataFrame.show()
resultDataFrame.filter(resultDataFrame("constraint_status") === "Failure").toJSON.collect.foreach(println)
}
}
这将返回一个空结果:
+-----+-----------+------------+----------+-----------------+------------------+
|check|check_level|check_status|constraint|constraint_status|constraint_message|
+-----+-----------+------------+----------+-----------------+------------------+
+-----+-----------+------------+----------+-----------------+------------------+
看来我没有在数组中添加元素或以错误的方式实现它,正在寻找一些关于此的建议。
更新 1:
我试过使用下面的代码,但是它抛出错误:
val constraints = suggestionResult.constraintSuggestions.flatMap {
case (column, suggestions) =>
suggestions.map { constraint =>
(constraint.codeForConstraint)
}
}
val generatedCheck = Check(CheckLevel.Warning, "generated constraints", constraints)
val verificationResult = VerificationSuite()
.onData(datadestination)
.addChecks(generatedCheck)
.run()
错误:
type mismatch; found : scala.collection.immutable.Iterable[String] required: Seq[com.amazon.deequ.constraints.Constraint]
更新二:
var rows = suggestionDataFrame.select("_3").collect.map(_.toSeq)
var checks: Seq[Check] = Seq()
checks :+ rows
val generatedCheck = Check(CheckLevel.Warning, "generated constraints", checks)
val verificationResult = VerificationSuite()
.onData(datadestination)
.addChecks(generatedCheck)
.run()
错误:
type mismatch; found : Seq[com.amazon.deequ.checks.Check] required: Seq[com.amazon.deequ.constraints.Constraint]
如果我正确理解你的问题,那么你想将建议的约束添加到你的验证中 运行。这是 deequ 中的代码片段的 link,它执行类似的操作:
我希望这可以作为您如何继续的模板。您需要从约束建议(而不是数据框)中收集约束并根据它们创建检查。
更新 1:
我们实际上提供了带有建议结果的约束方法,如果您按如下方式替换上面的行,您的代码应该可以工作:
val allConstraints = suggestionResult.constraintSuggestions
.flatMap { case (_, suggestions) => suggestions.map { _.constraint }}
.toSeq
val generatedCheck = Check(CheckLevel.Error, "generated constraints", allConstraints)
val verificationResult = VerificationSuite()
.onData(datasource)
.addChecks(Seq(generatedCheck))
.run()