python 如何在 Kubeflow 容器化组件之间传递数据或文件

How to pass data or files between Kubeflow containerized components in python

我正在探索 Kubeflow 作为部署和连接典型 ML 管道的各种组件的选项。我正在使用 docker 容器作为 Kubeflow 组件,到目前为止我一直无法成功使用 ContainerOp.file_outputs 对象在组件之间传递结果。

根据我对该功能的理解,创建并保存到声明为组件 file_outputs 之一的文件应该会导致它持续存在并可供以下组件读取。

这就是我试图在我的管道中声明它的方式 python 代码:

import kfp.dsl as dsl 
import kfp.gcp as gcp

@dsl.pipeline(name='kubeflow demo')
def pipeline(project_id='kubeflow-demo-254012'):
    data_collector = dsl.ContainerOp(
        name='data collector', 
        image='eu.gcr.io/kubeflow-demo-254012/data-collector',
        arguments=[ "--project_id", project_id ],
        file_outputs={ "output": '/output.txt' }
    )   
    data_preprocessor = dsl.ContainerOp(
        name='data preprocessor',
        image='eu.gcr.io/kubeflow-demo-254012/data-preprocessor',
        arguments=[ "--project_id", project_id ]
    )
    data_preprocessor.after(data_collector)
    #TODO: add other components
if __name__ == '__main__':
    import kfp.compiler as compiler
    compiler.Compiler().compile(pipeline, __file__ + '.tar.gz')

data-collector.py 组件的 python 代码中,我获取了数据集,然后将其写入 output.txt。我可以从同一组件内的文件读取,但不能在 data-preprocessor.py 内读取 FileNotFoundError.

file_outputs 的使用对基于容器的 Kubeflow 组件无效,还是我在我的代码中错误地使用了它?如果在我的情况下这不是一个选项,是否可以在管道声明 python 代码中以编程方式创建 Kubernetes 卷并使用它们而不是 file_outputs?

在一个 Kubeflow 管道组件中创建的文件对于容器是本地的。要在后续步骤中引用它,您需要将其传递为:

data_preprocessor = dsl.ContainerOp(
        name='data preprocessor',
        image='eu.gcr.io/kubeflow-demo-254012/data-preprocessor',
        arguments=["--fetched_dataset", data_collector.outputs['output'],
                   "--project_id", project_id,
                  ]

注意: data_collector.outputs['output'] 将包含文件 /output.txt 的实际字符串内容(不是文件路径)。如果你想让它包含文件的路径,你需要将数据集写入共享存储(如 s3 或已安装的 PVC 卷)并将 path/link 写入共享存储到 /output.txt. data_preprocessor然后可以根据路径读取数据集

主要分为三个步骤:

  1. 保存一个 outputs.txt 文件,其中将包含要传递给下一个组件的 data/parameter/anything。 注意: 它应该在根级别,即 /output.txt

  2. 传递 file_outputs={'output': '/output.txt'} 作为参数,如图所示。

  3. inside a container_op which you will write inside dsl.pipeline pass argument (to respective argument of component which needs output from earlier component) as comp1.output (here comp1 是产生输出并将其存储在 /output.txt)

  4. 中的第一个组件
import kfp
from kfp import dsl

def SendMsg(
    send_msg: str = 'akash'
):
    return dsl.ContainerOp(
        name = 'Print msg', 
        image = 'docker.io/akashdesarda/comp1:latest', 
        command = ['python', 'msg.py'],
        arguments=[
            '--msg', send_msg
        ],
        file_outputs={
            'output': '/output.txt',
        }
    )

def GetMsg(
    get_msg: str
):
    return dsl.ContainerOp(
        name = 'Read msg from 1st component',
        image = 'docker.io/akashdesarda/comp2:latest',
        command = ['python', 'msg.py'],
        arguments=[
            '--msg', get_msg
        ]
    )

@dsl.pipeline(
    name = 'Pass parameter',
    description = 'Passing para')
def  passing_parameter(send_msg):
    comp1 = SendMsg(send_msg)
    comp2 = GetMsg(comp1.output)


if __name__ == '__main__':
  import kfp.compiler as compiler
  compiler.Compiler().compile(passing_parameter, __file__ + '.tar.gz')

您不必将数据写入共享存储,您可以使用 kfp.dsl.InputArgumentPath 将 python 函数的输出传递到容器操作的输入。

@kfp.dsl.pipeline(
  name='Build Model Server Pipeline',
  description='Build a kserve model server pipeline.'
)
def build_model_server_pipeline(s3_src_path):
    download_s3_files_task = download_archive_step(s3_src_path)

    tarball_path = "/tmp/artifact.tar"
    artifact_tarball = kfp.dsl.InputArgumentPath(download_s3_files_task.outputs['output_tarball'], path=tarball_path)
    
    build_container = kfp.dsl.ContainerOp(name ='build_container',
                                          image ='python:3.8',
                                          command=['sh', '-c'],
                                          arguments=[
                                              'ls -l ' + tarball_path + ';'
                                          ],
                                          artifact_argument_paths=[artifact_tarball],
                                         )