是否有在管道外使用 kubeflow 管道输出的实现方法?

Is there an implemented way to use a kubeflow pipeline's output outside the pipeline?

我正在使用本地 kubeflow 管道构建连续机器学习测试项目。我有一个使用 TFX 预处理数据的管道,它会自动将输出保存到 minio。在此管道之外,我想使用 tfx 的 Trainer 训练模型,但我需要在预处理管道中生成的工件。是否有导入此输出的实施方式?我查看了文档和一些问题,但找不到答案。而且因为我正在尝试连续进行,所以我不能依赖手动进行。

我的预处理管道示例:


    @kfp.dsl.pipeline(
      name='TFX',
      description='TFX pipeline'
    )
    def tfx_pipeline():
    
        # DL with wget, can use gcs instead as well
        fetch = kfp.dsl.ContainerOp(
          name='download',
          image='busybox',
          command=['sh', '-c'],
          arguments=[
              'sleep 1;'
              'mkdir -p /tmp/data;'
              'wget <gcp link> -O /tmp/data/results.csv'],
          file_outputs={'downloaded': '/tmp/data'})
        records_example = tfx_csv_gen(input_base=fetch.output)
        stats = tfx_statistic_gen(input_data=records_example.output)
        schema_op = tfx_schema_gen(stats.output)
        tfx_example_validator(stats=stats.outputs['output'], schema=schema_op.outputs['output'])
        #tag::tft[]
        transformed_output = tfx_transform(
            input_data=records_example.output,
            schema=schema_op.outputs['output'],
            module_file=module_file) # Path to your TFT code on GCS/S3
        #end::tft[]

然后用

执行

    kfp.compiler.Compiler().compile(tfx_pipeline, 'tfx_pipeline.zip')


    client = kfp.Client()
    client.list_experiments()
    #exp = client.create_experiment(name='mdupdate')


    my_experiment = client.create_experiment(name='tfx_pipeline')
    my_run = client.run_pipeline(my_experiment.id, 'tfx', 
      'tfx_pipeline.zip')

我正在 visual studio 代码中开发 .ipynb

您可以获得这样的信息: https://github.com/kubeflow/pipelines/issues/4327#issuecomment-687255001

component_name:这可以在管道的 yaml 定义中检查,在 templates.name 下(搜索包含您想要的输出的组件)

artifact_name:这也可以在管道的 yaml 定义中检查,在同一组件下 outputs 属性

得到这两个参数后,就可以使用上面描述的函数了url:

#!/usr/bin/env python3

import json
import tarfile
from base64 import b64decode
from io import BytesIO

import kfp


def get_node_id(*, run_id: str, component_name: str, client: kfp.Client):
    run = client.runs.get_run(run_id)
    workflow = json.loads(run.pipeline_runtime.workflow_manifest)
    nodes = workflow["status"]["nodes"]
    for node_id, node_info in nodes.items():
        if node_info["displayName"] == component_name:
            return node_id
    else:
        raise RuntimeError(f"Unable to find node_id for Component '{component_name}'")


def get_artifact(*, run_id: str, node_id: str, artifact_name: str, client: kfp.Client):
    artifact = client.runs.read_artifact(run_id, node_id, artifact_name)
    # Artifacts are returned as base64-encoded .tar.gz strings
    data = b64decode(artifact.data)
    io_buffer = BytesIO()
    io_buffer.write(data)
    io_buffer.seek(0)
    data = None
    with tarfile.open(fileobj=io_buffer) as tar:
        member_names = tar.getnames()
        if len(member_names) == 1:
            data = tar.extractfile(member_names[0]).read().decode('utf-8')
        else:
            # Is it possible for KFP artifacts to have multiple members?
            data = {}
            for member_name in member_names:
                data[member_name] = tar.extractfile(member_name).read().decode('utf-8')
    return data


if __name__ == "__main__":
    run_id = "e498b0da-036e-4e81-84e9-6e9c6e64960b"
    component_name = "my-component"
    # For an output variable named "output_data"
    artifact_name = "my-component-output_data"

    client = kfp.Client()
    node_id = get_node_id(run_id=run_id, component_name=component_name, client=client)
    artifact = get_artifact(
        run_id=run_id, node_id=node_id, artifact_name=artifact_name, client=client,
    )
    # Do something with artifact ...