aws glue 3.0 中的 v3 API 寄予厚望

Great expectations v3 API in aws glue 3.0

我正在尝试使用 AWS glue 3.0 上的 Great expectations 在管道中进行验证。

这是我根据他们的文档

在运行时间创建数据上下文的初步尝试
def create_context():
    logger.info("Create DataContext Config.")
    data_context_config = DataContextConfig(
        config_version=2,
        plugins_directory=None,
        config_variables_file_path=None,
        # concurrency={"enabled": "true"},
        datasources={
            "my_spark_datasource": DatasourceConfig(
                class_name="Datasource",
                execution_engine={
                    "class_name": "SparkDFExecutionEngine",
                    "module_name": "great_expectations.execution_engine",
                },
                data_connectors={
                    "my_spark_dataconnector": {
                        "module_name": "great_expectations.datasource.data_connector",
                        "class_name": "RuntimeDataConnector",
                        "batch_identifiers": [""],
                    }
                },
            )
        },
        stores={
            "expectations_S3_store": {
                "class_name": "ExpectationsStore",
                "store_backend": {
                    "class_name": "TupleS3StoreBackend",
                    "bucket": data_profile_s3_store_bucket,
                    "prefix": "expectations/",
                    "s3_put_options": {"ACL": "bucket-owner-full-control"},
                },
            },
            "validations_S3_store": {
                "class_name": "ValidationsStore",
                "store_backend": {
                    "class_name": "TupleS3StoreBackend",
                    "bucket": data_profile_s3_store_bucket,
                    "prefix": "validations/",
                    "s3_put_options": {"ACL": "bucket-owner-full-control"},
                },
            },
            "evaluation_parameter_store": {"class_name": "EvaluationParameterStore"},
            "checkpoint_S3_store": {
                "class_name": "CheckpointStore",
                "store_backend": {
                    "class_name": "TupleS3StoreBackend",
                    "suppress_store_backend_id": "true",
                    "bucket": data_profile_s3_store_bucket,
                    "prefix": "checkpoints/",
                    "s3_put_options": {"ACL": "bucket-owner-full-control"},
                },
            },
        },
        expectations_store_name="expectations_S3_store",
        validations_store_name="validations_S3_store",
        evaluation_parameter_store_name="evaluation_parameter_store",
        checkpoint_store_name="checkpoint_S3_store",
        data_docs_sites={
            "s3_site": {
                "class_name": "SiteBuilder",
                "store_backend": {
                    "class_name": "TupleS3StoreBackend",
                    "bucket": data_profile_s3_store_bucket,
                    "prefix": "data_docs/",
                    "s3_put_options": {"ACL": "bucket-owner-full-control"},
                },
                "site_index_builder": {
                    "class_name": "DefaultSiteIndexBuilder",
                    "show_cta_footer": True,
                },
            }
        },
        anonymous_usage_statistics={"enabled": True},
    )

    # Pass the DataContextConfig as a project_config to BaseDataContext
    context = BaseDataContext(project_config=data_context_config)

    logger.info("Create Checkpoint Config.")
    checkpoint_config = {
        "name": "my_checkpoint",
        "config_version": 1,
        "class_name": "Checkpoint",
        "run_name_template": "ingest_date=%YYYY-%MM-%DD",
        "expectation_suite_name": data_profile_expectation_suite_name,
        "runtime_configuration": {
            "result_format": {
                "result_format": "COMPLETE",
                "include_unexpected_rows": True,
            }
        },
        "evaluation_parameters": {},
    }
    context.add_checkpoint(**checkpoint_config)
    # logger.info(f'GE Data Context Config: "{data_context_config}"')

    return context


使用这个我得到一个错误说试图运行操作停止的火花上下文。

glue3.0的spark源有没有更好的使用方法? 我希望能够尽可能地留在 glue3.0 上,以防止必须维护两个版本的胶水作业

您可以通过将 force_reuse_spark_context 设置为 True 来解决此问题,这是一个简单的示例 (YML):

config_version: 3.0
datasources:
  my_spark_datasource:
    class_name: Datasource
    module_name: great_expectations.datasource
    data_connectors:
      my_spark_dataconnector:
        class_name: RuntimeDataConnector
        module_name: great_expectations.datasource.data_connector
        batch_identifiers: {}
    execution_engine:
      class_name: SparkDFExecutionEngine
      force_reuse_spark_context: true

我想补充的另一件事是,您可以在 YML 文件中定义上下文并将其上传到 S3。然后,您可以使用以下函数在胶水作业中解析此文件:

def parse_data_context_from_S3(bucket: str, prefix: str = ""):
    object_key = os.path.join(prefix, "great_expectations.yml")

    print(f"Parsing s3://{bucket}/{object_key}")
    s3 = boto3.session.Session().client("s3")
    s3_object = s3.get_object(Bucket=bucket, Key=object_key)["Body"]

    datacontext_config = yaml.safe_load(s3_object.read())

    project_config = DataContextConfig(**datacontext_config)

    context = BaseDataContext(project_config=project_config)
return context

您的 CI/CD 管道可以轻松替换 YML 文件中的商店后端,同时将其部署到您的环境(dev、hom、prod)。

如果您使用的是 RuntimeDataConnector,那么使用 Glue 3.0 应该没有问题。如果您使用的是 InferredAssetS3DataConnector 并且您的数据集是使用 KMS 加密的,则同样不适用。在这种情况下,我只能使用 Glue 2.0。