python 如何在 Kubeflow 容器化组件之间传递数据或文件
How to pass data or files between Kubeflow containerized components in python
我正在探索 Kubeflow 作为部署和连接典型 ML 管道的各种组件的选项。我正在使用 docker 容器作为 Kubeflow 组件,到目前为止我一直无法成功使用 ContainerOp.file_outputs
对象在组件之间传递结果。
根据我对该功能的理解,创建并保存到声明为组件 file_outputs
之一的文件应该会导致它持续存在并可供以下组件读取。
这就是我试图在我的管道中声明它的方式 python 代码:
import kfp.dsl as dsl
import kfp.gcp as gcp
@dsl.pipeline(name='kubeflow demo')
def pipeline(project_id='kubeflow-demo-254012'):
data_collector = dsl.ContainerOp(
name='data collector',
image='eu.gcr.io/kubeflow-demo-254012/data-collector',
arguments=[ "--project_id", project_id ],
file_outputs={ "output": '/output.txt' }
)
data_preprocessor = dsl.ContainerOp(
name='data preprocessor',
image='eu.gcr.io/kubeflow-demo-254012/data-preprocessor',
arguments=[ "--project_id", project_id ]
)
data_preprocessor.after(data_collector)
#TODO: add other components
if __name__ == '__main__':
import kfp.compiler as compiler
compiler.Compiler().compile(pipeline, __file__ + '.tar.gz')
在 data-collector.py
组件的 python 代码中,我获取了数据集,然后将其写入 output.txt
。我可以从同一组件内的文件读取,但不能在 data-preprocessor.py
内读取 FileNotFoundError
.
file_outputs
的使用对基于容器的 Kubeflow 组件无效,还是我在我的代码中错误地使用了它?如果在我的情况下这不是一个选项,是否可以在管道声明 python 代码中以编程方式创建 Kubernetes 卷并使用它们而不是 file_outputs
?
在一个 Kubeflow 管道组件中创建的文件对于容器是本地的。要在后续步骤中引用它,您需要将其传递为:
data_preprocessor = dsl.ContainerOp(
name='data preprocessor',
image='eu.gcr.io/kubeflow-demo-254012/data-preprocessor',
arguments=["--fetched_dataset", data_collector.outputs['output'],
"--project_id", project_id,
]
注意: data_collector.outputs['output']
将包含文件 /output.txt
的实际字符串内容(不是文件路径)。如果你想让它包含文件的路径,你需要将数据集写入共享存储(如 s3 或已安装的 PVC 卷)并将 path/link 写入共享存储到 /output.txt
. data_preprocessor
然后可以根据路径读取数据集
主要分为三个步骤:
保存一个 outputs.txt 文件,其中将包含要传递给下一个组件的 data/parameter/anything。
注意: 它应该在根级别,即 /output.txt
传递 file_outputs={'output': '/output.txt'} 作为参数,如图所示。
inside a container_op which you will write inside dsl.pipeline pass argument (to respective argument of component which needs output from earlier component) as comp1.output (here comp1 是产生输出并将其存储在 /output.txt)
中的第一个组件
import kfp
from kfp import dsl
def SendMsg(
send_msg: str = 'akash'
):
return dsl.ContainerOp(
name = 'Print msg',
image = 'docker.io/akashdesarda/comp1:latest',
command = ['python', 'msg.py'],
arguments=[
'--msg', send_msg
],
file_outputs={
'output': '/output.txt',
}
)
def GetMsg(
get_msg: str
):
return dsl.ContainerOp(
name = 'Read msg from 1st component',
image = 'docker.io/akashdesarda/comp2:latest',
command = ['python', 'msg.py'],
arguments=[
'--msg', get_msg
]
)
@dsl.pipeline(
name = 'Pass parameter',
description = 'Passing para')
def passing_parameter(send_msg):
comp1 = SendMsg(send_msg)
comp2 = GetMsg(comp1.output)
if __name__ == '__main__':
import kfp.compiler as compiler
compiler.Compiler().compile(passing_parameter, __file__ + '.tar.gz')
您不必将数据写入共享存储,您可以使用 kfp.dsl.InputArgumentPath
将 python 函数的输出传递到容器操作的输入。
@kfp.dsl.pipeline(
name='Build Model Server Pipeline',
description='Build a kserve model server pipeline.'
)
def build_model_server_pipeline(s3_src_path):
download_s3_files_task = download_archive_step(s3_src_path)
tarball_path = "/tmp/artifact.tar"
artifact_tarball = kfp.dsl.InputArgumentPath(download_s3_files_task.outputs['output_tarball'], path=tarball_path)
build_container = kfp.dsl.ContainerOp(name ='build_container',
image ='python:3.8',
command=['sh', '-c'],
arguments=[
'ls -l ' + tarball_path + ';'
],
artifact_argument_paths=[artifact_tarball],
)
我正在探索 Kubeflow 作为部署和连接典型 ML 管道的各种组件的选项。我正在使用 docker 容器作为 Kubeflow 组件,到目前为止我一直无法成功使用 ContainerOp.file_outputs
对象在组件之间传递结果。
根据我对该功能的理解,创建并保存到声明为组件 file_outputs
之一的文件应该会导致它持续存在并可供以下组件读取。
这就是我试图在我的管道中声明它的方式 python 代码:
import kfp.dsl as dsl
import kfp.gcp as gcp
@dsl.pipeline(name='kubeflow demo')
def pipeline(project_id='kubeflow-demo-254012'):
data_collector = dsl.ContainerOp(
name='data collector',
image='eu.gcr.io/kubeflow-demo-254012/data-collector',
arguments=[ "--project_id", project_id ],
file_outputs={ "output": '/output.txt' }
)
data_preprocessor = dsl.ContainerOp(
name='data preprocessor',
image='eu.gcr.io/kubeflow-demo-254012/data-preprocessor',
arguments=[ "--project_id", project_id ]
)
data_preprocessor.after(data_collector)
#TODO: add other components
if __name__ == '__main__':
import kfp.compiler as compiler
compiler.Compiler().compile(pipeline, __file__ + '.tar.gz')
在 data-collector.py
组件的 python 代码中,我获取了数据集,然后将其写入 output.txt
。我可以从同一组件内的文件读取,但不能在 data-preprocessor.py
内读取 FileNotFoundError
.
file_outputs
的使用对基于容器的 Kubeflow 组件无效,还是我在我的代码中错误地使用了它?如果在我的情况下这不是一个选项,是否可以在管道声明 python 代码中以编程方式创建 Kubernetes 卷并使用它们而不是 file_outputs
?
在一个 Kubeflow 管道组件中创建的文件对于容器是本地的。要在后续步骤中引用它,您需要将其传递为:
data_preprocessor = dsl.ContainerOp(
name='data preprocessor',
image='eu.gcr.io/kubeflow-demo-254012/data-preprocessor',
arguments=["--fetched_dataset", data_collector.outputs['output'],
"--project_id", project_id,
]
注意: data_collector.outputs['output']
将包含文件 /output.txt
的实际字符串内容(不是文件路径)。如果你想让它包含文件的路径,你需要将数据集写入共享存储(如 s3 或已安装的 PVC 卷)并将 path/link 写入共享存储到 /output.txt
. data_preprocessor
然后可以根据路径读取数据集
主要分为三个步骤:
保存一个 outputs.txt 文件,其中将包含要传递给下一个组件的 data/parameter/anything。 注意: 它应该在根级别,即 /output.txt
传递 file_outputs={'output': '/output.txt'} 作为参数,如图所示。
inside a container_op which you will write inside dsl.pipeline pass argument (to respective argument of component which needs output from earlier component) as comp1.output (here comp1 是产生输出并将其存储在 /output.txt)
中的第一个组件
import kfp
from kfp import dsl
def SendMsg(
send_msg: str = 'akash'
):
return dsl.ContainerOp(
name = 'Print msg',
image = 'docker.io/akashdesarda/comp1:latest',
command = ['python', 'msg.py'],
arguments=[
'--msg', send_msg
],
file_outputs={
'output': '/output.txt',
}
)
def GetMsg(
get_msg: str
):
return dsl.ContainerOp(
name = 'Read msg from 1st component',
image = 'docker.io/akashdesarda/comp2:latest',
command = ['python', 'msg.py'],
arguments=[
'--msg', get_msg
]
)
@dsl.pipeline(
name = 'Pass parameter',
description = 'Passing para')
def passing_parameter(send_msg):
comp1 = SendMsg(send_msg)
comp2 = GetMsg(comp1.output)
if __name__ == '__main__':
import kfp.compiler as compiler
compiler.Compiler().compile(passing_parameter, __file__ + '.tar.gz')
您不必将数据写入共享存储,您可以使用 kfp.dsl.InputArgumentPath
将 python 函数的输出传递到容器操作的输入。
@kfp.dsl.pipeline(
name='Build Model Server Pipeline',
description='Build a kserve model server pipeline.'
)
def build_model_server_pipeline(s3_src_path):
download_s3_files_task = download_archive_step(s3_src_path)
tarball_path = "/tmp/artifact.tar"
artifact_tarball = kfp.dsl.InputArgumentPath(download_s3_files_task.outputs['output_tarball'], path=tarball_path)
build_container = kfp.dsl.ContainerOp(name ='build_container',
image ='python:3.8',
command=['sh', '-c'],
arguments=[
'ls -l ' + tarball_path + ';'
],
artifact_argument_paths=[artifact_tarball],
)