GCP 顶点管道 - 为什么 kfp.v2.dsl.Output 作为函数参数在没有提供的情况下工作?

GCP Vertex Pipeline - Why kfp.v2.dsl.Output as function arguments work without being provided?

为什么 kfp.v2.dsl.Output 作为函数参数在没有提供的情况下起作用?

我正在关注来自 GCP 的 Create and run ML pipelines with Vertex Pipelines! Jupyter notebook 示例。

函数classif_model_eval_metrics接受没有默认值的metrics: Output[Metrics]metricsc: Output[ClassificationMetrics]

@component(
    base_image="gcr.io/deeplearning-platform-release/tf2-cpu.2-6:latest",
    output_component_file="tables_eval_component.yaml", # Optional: you can use this to load the component later
    packages_to_install=["google-cloud-aiplatform"],
)
def classif_model_eval_metrics(
    project: str,
    location: str,      # "us-central1",
    api_endpoint: str,  # "us-central1-aiplatform.googleapis.com",
    thresholds_dict_str: str,
    model: Input[Model],
    metrics: Output[Metrics],                # No default value set, hence must be mandatory
    metricsc: Output[ClassificationMetrics], # No default value set, hence must be mandatory
) -> NamedTuple("Outputs", [("dep_decision", str)]): 
    # Full code at the bottom.

因此这些参数应该是强制性的,但是调用函数时没有这些参数。

    model_eval_task = classif_model_eval_metrics(
        project,
        gcp_region,
        api_endpoint,
        thresholds_dict_str,
        training_op.outputs["model"],
        # <--- No arguments for ``metrics: Output[Metrics]``` and ```metricsc: Output[ClassificationMetrics]```
    )

整个流水线代码如下。

@kfp.dsl.pipeline(name="automl-tab-beans-training-v2",
                  pipeline_root=PIPELINE_ROOT)
def pipeline(
    bq_source: str = "bq://aju-dev-demos.beans.beans1",
    display_name: str = DISPLAY_NAME,
    project: str = PROJECT_ID,
    gcp_region: str = "us-central1",
    api_endpoint: str = "us-central1-aiplatform.googleapis.com",
    thresholds_dict_str: str = '{"auRoc": 0.95}',
):
    dataset_create_op = gcc_aip.TabularDatasetCreateOp(
        project=project, display_name=display_name, bq_source=bq_source
    )
    training_op = gcc_aip.AutoMLTabularTrainingJobRunOp(
        project=project,
        display_name=display_name,
        optimization_prediction_type="classification",
        budget_milli_node_hours=1000,
        column_transformations=COLUMNS,
        dataset=dataset_create_op.outputs["dataset"],
        target_column="Class",
    )
    model_eval_task = classif_model_eval_metrics(
        project,
        gcp_region,
        api_endpoint,
        thresholds_dict_str,
        training_op.outputs["model"],
        # <--- No arguments for ``metrics: Output[Metrics]``` and ```metricsc: Output[ClassificationMetrics]```
    )

它为什么起作用,kfp.v2.dsl.Output 类型的 metrics: Output[Metrics]metricsc: Output[ClassificationMetrics] 是什么?


classif_model_eval_metrics功能代码

from kfp.v2.dsl import (
    Dataset, Model, Output, Input, 
    OutputPath, ClassificationMetrics, Metrics, component
)

@component(
    base_image="gcr.io/deeplearning-platform-release/tf2-cpu.2-6:latest",
    output_component_file="tables_eval_component.yaml", # Optional: you can use this to load the component later
    packages_to_install=["google-cloud-aiplatform"],
)
def classif_model_eval_metrics(
    project: str,
    location: str,      # "us-central1",
    api_endpoint: str,  # "us-central1-aiplatform.googleapis.com",
    thresholds_dict_str: str,
    model: Input[Model],
    metrics: Output[Metrics],
    metricsc: Output[ClassificationMetrics],
) -> NamedTuple("Outputs", [("dep_decision", str)]):  # Return parameter.
    """Renders evaluation metrics for an AutoML Tabular classification model.
    Retrieves the classification model evaluation and render the ROC and confusion matrix
    for the model. Determine whether the model is sufficiently accurate to deploy.
    """
    import json
    import logging
    from google.cloud import aiplatform

    # Fetch model eval info
    def get_eval_info(client, model_name):
        from google.protobuf.json_format import MessageToDict
        response = client.list_model_evaluations(parent=model_name)
        metrics_list = []
        metrics_string_list = []
        for evaluation in response:
            metrics = MessageToDict(evaluation._pb.metrics)
            metrics_str = json.dumps(metrics)
            metrics_list.append(metrics)
            metrics_string_list.append(metrics_str)
        return (
            evaluation.name,
            metrics_list,
            metrics_string_list,
        )

    def classification_thresholds_check(metrics_dict, thresholds_dict):
        for k, v in thresholds_dict.items():
            if k in ["auRoc", "auPrc"]:  # higher is better
                if metrics_dict[k] < v:  # if under threshold, don't deploy
                    return False
        return True

    def log_metrics(metrics_list, metricsc):
        test_confusion_matrix = metrics_list[0]["confusionMatrix"]
        logging.info("rows: %s", test_confusion_matrix["rows"])
        # log the ROC curve
        fpr = [], tpr = [], thresholds = []
        for item in metrics_list[0]["confidenceMetrics"]:
            fpr.append(item.get("falsePositiveRate", 0.0))
            tpr.append(item.get("recall", 0.0))
            thresholds.append(item.get("confidenceThreshold", 0.0))
        metricsc.log_roc_curve(fpr, tpr, thresholds)
        # log the confusion matrix
        annotations = []
        for item in test_confusion_matrix["annotationSpecs"]:
            annotations.append(item["displayName"])
        metricsc.log_confusion_matrix(
            annotations,
            test_confusion_matrix["rows"],
        )
        # log textual metrics info as well
        for metric in metrics_list[0].keys():
            if metric != "confidenceMetrics":
                val_string = json.dumps(metrics_list[0][metric])
                metrics.log_metric(metric, val_string)
        # metrics.metadata["model_type"] = "AutoML Tabular classification"

    aiplatform.init(project=project)

    client = aiplatform.gapic.ModelServiceClient(client_options={"api_endpoint": api_endpoint})
    eval_name, metrics_list, metrics_str_list = get_eval_info(
        client, model.uri.replace("aiplatform://v1/", "")
    )
    log_metrics(metrics_list, metricsc)
    thresholds_dict = json.loads(thresholds_dict_str)

    return ("true",) if classification_thresholds_check(metrics_list[0], thresholds_dict) else ("false", )

自定义组件定义为带有 @kfp.v2.dsl.component 装饰器的 Python 函数。

@component装饰器指定了三个可选参数:要使用的基础容器镜像;任何要安装的包;以及写入组件规范的 yaml 文件。

组件函数 classif_model_eval_metrics 有一些输入参数。模型参数是一个输入 kfp.v2.dsl.Model artifact.

两个函数参数 metricsmetricsc 是组件输出,在本例中是 Metrics 和 ClassificationMetrics 类型。它们没有显式地作为输入传递给组件步骤,而是自动实例化并可以在组件中使用。

@component(
    base_image="gcr.io/deeplearning-platform-release/tf2-cpu.2-3:latest",
    output_component_file="tables_eval_component.yaml",
    packages_to_install=["google-cloud-aiplatform"],
)
def classif_model_eval_metrics(
    project: str,
    location: str,  # "us-central1",
    api_endpoint: str,  # "us-central1-aiplatform.googleapis.com",
    thresholds_dict_str: str,
    model: Input[Model],
    metrics: Output[Metrics],
    metricsc: Output[ClassificationMetrics],
)

例如,在下面的函数中,我们调用 metricsc.log_roc_curve()metricsc.log_confusion_matrix() 以在管道 UI 中呈现这些可视化效果。这些输出参数在组件编译时成为组件输出,可以被其他流水线步骤使用。

def log_metrics(metrics_list, metricsc):
        ...
        metricsc.log_roc_curve(fpr, tpr, thresholds)
        ...
        metricsc.log_confusion_matrix(
            annotations,
            test_confusion_matrix["rows"],
        )

更多信息可以参考这篇document