aws glue 3.0 中的 v3 API 寄予厚望
Great expectations v3 API in aws glue 3.0
我正在尝试使用 AWS glue 3.0 上的 Great expectations 在管道中进行验证。
这是我根据他们的文档
在运行时间创建数据上下文的初步尝试
def create_context():
logger.info("Create DataContext Config.")
data_context_config = DataContextConfig(
config_version=2,
plugins_directory=None,
config_variables_file_path=None,
# concurrency={"enabled": "true"},
datasources={
"my_spark_datasource": DatasourceConfig(
class_name="Datasource",
execution_engine={
"class_name": "SparkDFExecutionEngine",
"module_name": "great_expectations.execution_engine",
},
data_connectors={
"my_spark_dataconnector": {
"module_name": "great_expectations.datasource.data_connector",
"class_name": "RuntimeDataConnector",
"batch_identifiers": [""],
}
},
)
},
stores={
"expectations_S3_store": {
"class_name": "ExpectationsStore",
"store_backend": {
"class_name": "TupleS3StoreBackend",
"bucket": data_profile_s3_store_bucket,
"prefix": "expectations/",
"s3_put_options": {"ACL": "bucket-owner-full-control"},
},
},
"validations_S3_store": {
"class_name": "ValidationsStore",
"store_backend": {
"class_name": "TupleS3StoreBackend",
"bucket": data_profile_s3_store_bucket,
"prefix": "validations/",
"s3_put_options": {"ACL": "bucket-owner-full-control"},
},
},
"evaluation_parameter_store": {"class_name": "EvaluationParameterStore"},
"checkpoint_S3_store": {
"class_name": "CheckpointStore",
"store_backend": {
"class_name": "TupleS3StoreBackend",
"suppress_store_backend_id": "true",
"bucket": data_profile_s3_store_bucket,
"prefix": "checkpoints/",
"s3_put_options": {"ACL": "bucket-owner-full-control"},
},
},
},
expectations_store_name="expectations_S3_store",
validations_store_name="validations_S3_store",
evaluation_parameter_store_name="evaluation_parameter_store",
checkpoint_store_name="checkpoint_S3_store",
data_docs_sites={
"s3_site": {
"class_name": "SiteBuilder",
"store_backend": {
"class_name": "TupleS3StoreBackend",
"bucket": data_profile_s3_store_bucket,
"prefix": "data_docs/",
"s3_put_options": {"ACL": "bucket-owner-full-control"},
},
"site_index_builder": {
"class_name": "DefaultSiteIndexBuilder",
"show_cta_footer": True,
},
}
},
anonymous_usage_statistics={"enabled": True},
)
# Pass the DataContextConfig as a project_config to BaseDataContext
context = BaseDataContext(project_config=data_context_config)
logger.info("Create Checkpoint Config.")
checkpoint_config = {
"name": "my_checkpoint",
"config_version": 1,
"class_name": "Checkpoint",
"run_name_template": "ingest_date=%YYYY-%MM-%DD",
"expectation_suite_name": data_profile_expectation_suite_name,
"runtime_configuration": {
"result_format": {
"result_format": "COMPLETE",
"include_unexpected_rows": True,
}
},
"evaluation_parameters": {},
}
context.add_checkpoint(**checkpoint_config)
# logger.info(f'GE Data Context Config: "{data_context_config}"')
return context
使用这个我得到一个错误说试图运行操作停止的火花上下文。
glue3.0的spark源有没有更好的使用方法?
我希望能够尽可能地留在 glue3.0 上,以防止必须维护两个版本的胶水作业
您可以通过将 force_reuse_spark_context 设置为 True 来解决此问题,这是一个简单的示例 (YML):
config_version: 3.0
datasources:
my_spark_datasource:
class_name: Datasource
module_name: great_expectations.datasource
data_connectors:
my_spark_dataconnector:
class_name: RuntimeDataConnector
module_name: great_expectations.datasource.data_connector
batch_identifiers: {}
execution_engine:
class_name: SparkDFExecutionEngine
force_reuse_spark_context: true
我想补充的另一件事是,您可以在 YML 文件中定义上下文并将其上传到 S3。然后,您可以使用以下函数在胶水作业中解析此文件:
def parse_data_context_from_S3(bucket: str, prefix: str = ""):
object_key = os.path.join(prefix, "great_expectations.yml")
print(f"Parsing s3://{bucket}/{object_key}")
s3 = boto3.session.Session().client("s3")
s3_object = s3.get_object(Bucket=bucket, Key=object_key)["Body"]
datacontext_config = yaml.safe_load(s3_object.read())
project_config = DataContextConfig(**datacontext_config)
context = BaseDataContext(project_config=project_config)
return context
您的 CI/CD 管道可以轻松替换 YML 文件中的商店后端,同时将其部署到您的环境(dev、hom、prod)。
如果您使用的是 RuntimeDataConnector,那么使用 Glue 3.0 应该没有问题。如果您使用的是 InferredAssetS3DataConnector 并且您的数据集是使用 KMS 加密的,则同样不适用。在这种情况下,我只能使用 Glue 2.0。
我正在尝试使用 AWS glue 3.0 上的 Great expectations 在管道中进行验证。
这是我根据他们的文档
在运行时间创建数据上下文的初步尝试def create_context():
logger.info("Create DataContext Config.")
data_context_config = DataContextConfig(
config_version=2,
plugins_directory=None,
config_variables_file_path=None,
# concurrency={"enabled": "true"},
datasources={
"my_spark_datasource": DatasourceConfig(
class_name="Datasource",
execution_engine={
"class_name": "SparkDFExecutionEngine",
"module_name": "great_expectations.execution_engine",
},
data_connectors={
"my_spark_dataconnector": {
"module_name": "great_expectations.datasource.data_connector",
"class_name": "RuntimeDataConnector",
"batch_identifiers": [""],
}
},
)
},
stores={
"expectations_S3_store": {
"class_name": "ExpectationsStore",
"store_backend": {
"class_name": "TupleS3StoreBackend",
"bucket": data_profile_s3_store_bucket,
"prefix": "expectations/",
"s3_put_options": {"ACL": "bucket-owner-full-control"},
},
},
"validations_S3_store": {
"class_name": "ValidationsStore",
"store_backend": {
"class_name": "TupleS3StoreBackend",
"bucket": data_profile_s3_store_bucket,
"prefix": "validations/",
"s3_put_options": {"ACL": "bucket-owner-full-control"},
},
},
"evaluation_parameter_store": {"class_name": "EvaluationParameterStore"},
"checkpoint_S3_store": {
"class_name": "CheckpointStore",
"store_backend": {
"class_name": "TupleS3StoreBackend",
"suppress_store_backend_id": "true",
"bucket": data_profile_s3_store_bucket,
"prefix": "checkpoints/",
"s3_put_options": {"ACL": "bucket-owner-full-control"},
},
},
},
expectations_store_name="expectations_S3_store",
validations_store_name="validations_S3_store",
evaluation_parameter_store_name="evaluation_parameter_store",
checkpoint_store_name="checkpoint_S3_store",
data_docs_sites={
"s3_site": {
"class_name": "SiteBuilder",
"store_backend": {
"class_name": "TupleS3StoreBackend",
"bucket": data_profile_s3_store_bucket,
"prefix": "data_docs/",
"s3_put_options": {"ACL": "bucket-owner-full-control"},
},
"site_index_builder": {
"class_name": "DefaultSiteIndexBuilder",
"show_cta_footer": True,
},
}
},
anonymous_usage_statistics={"enabled": True},
)
# Pass the DataContextConfig as a project_config to BaseDataContext
context = BaseDataContext(project_config=data_context_config)
logger.info("Create Checkpoint Config.")
checkpoint_config = {
"name": "my_checkpoint",
"config_version": 1,
"class_name": "Checkpoint",
"run_name_template": "ingest_date=%YYYY-%MM-%DD",
"expectation_suite_name": data_profile_expectation_suite_name,
"runtime_configuration": {
"result_format": {
"result_format": "COMPLETE",
"include_unexpected_rows": True,
}
},
"evaluation_parameters": {},
}
context.add_checkpoint(**checkpoint_config)
# logger.info(f'GE Data Context Config: "{data_context_config}"')
return context
使用这个我得到一个错误说试图运行操作停止的火花上下文。
glue3.0的spark源有没有更好的使用方法? 我希望能够尽可能地留在 glue3.0 上,以防止必须维护两个版本的胶水作业
您可以通过将 force_reuse_spark_context 设置为 True 来解决此问题,这是一个简单的示例 (YML):
config_version: 3.0
datasources:
my_spark_datasource:
class_name: Datasource
module_name: great_expectations.datasource
data_connectors:
my_spark_dataconnector:
class_name: RuntimeDataConnector
module_name: great_expectations.datasource.data_connector
batch_identifiers: {}
execution_engine:
class_name: SparkDFExecutionEngine
force_reuse_spark_context: true
我想补充的另一件事是,您可以在 YML 文件中定义上下文并将其上传到 S3。然后,您可以使用以下函数在胶水作业中解析此文件:
def parse_data_context_from_S3(bucket: str, prefix: str = ""):
object_key = os.path.join(prefix, "great_expectations.yml")
print(f"Parsing s3://{bucket}/{object_key}")
s3 = boto3.session.Session().client("s3")
s3_object = s3.get_object(Bucket=bucket, Key=object_key)["Body"]
datacontext_config = yaml.safe_load(s3_object.read())
project_config = DataContextConfig(**datacontext_config)
context = BaseDataContext(project_config=project_config)
return context
您的 CI/CD 管道可以轻松替换 YML 文件中的商店后端,同时将其部署到您的环境(dev、hom、prod)。
如果您使用的是 RuntimeDataConnector,那么使用 Glue 3.0 应该没有问题。如果您使用的是 InferredAssetS3DataConnector 并且您的数据集是使用 KMS 加密的,则同样不适用。在这种情况下,我只能使用 Glue 2.0。