是否有在管道外使用 kubeflow 管道输出的实现方法?
Is there an implemented way to use a kubeflow pipeline's output outside the pipeline?
我正在使用本地 kubeflow 管道构建连续机器学习测试项目。我有一个使用 TFX 预处理数据的管道,它会自动将输出保存到 minio。在此管道之外,我想使用 tfx 的 Trainer 训练模型,但我需要在预处理管道中生成的工件。是否有导入此输出的实施方式?我查看了文档和一些问题,但找不到答案。而且因为我正在尝试连续进行,所以我不能依赖手动进行。
我的预处理管道示例:
@kfp.dsl.pipeline(
name='TFX',
description='TFX pipeline'
)
def tfx_pipeline():
# DL with wget, can use gcs instead as well
fetch = kfp.dsl.ContainerOp(
name='download',
image='busybox',
command=['sh', '-c'],
arguments=[
'sleep 1;'
'mkdir -p /tmp/data;'
'wget <gcp link> -O /tmp/data/results.csv'],
file_outputs={'downloaded': '/tmp/data'})
records_example = tfx_csv_gen(input_base=fetch.output)
stats = tfx_statistic_gen(input_data=records_example.output)
schema_op = tfx_schema_gen(stats.output)
tfx_example_validator(stats=stats.outputs['output'], schema=schema_op.outputs['output'])
#tag::tft[]
transformed_output = tfx_transform(
input_data=records_example.output,
schema=schema_op.outputs['output'],
module_file=module_file) # Path to your TFT code on GCS/S3
#end::tft[]
然后用
执行
kfp.compiler.Compiler().compile(tfx_pipeline, 'tfx_pipeline.zip')
client = kfp.Client()
client.list_experiments()
#exp = client.create_experiment(name='mdupdate')
my_experiment = client.create_experiment(name='tfx_pipeline')
my_run = client.run_pipeline(my_experiment.id, 'tfx',
'tfx_pipeline.zip')
我正在 visual studio 代码中开发 .ipynb
您可以获得这样的信息:
https://github.com/kubeflow/pipelines/issues/4327#issuecomment-687255001
component_name:这可以在管道的 yaml 定义中检查,在 templates.name
下(搜索包含您想要的输出的组件)
artifact_name:这也可以在管道的 yaml 定义中检查,在同一组件下 outputs
属性
得到这两个参数后,就可以使用上面描述的函数了url:
#!/usr/bin/env python3
import json
import tarfile
from base64 import b64decode
from io import BytesIO
import kfp
def get_node_id(*, run_id: str, component_name: str, client: kfp.Client):
run = client.runs.get_run(run_id)
workflow = json.loads(run.pipeline_runtime.workflow_manifest)
nodes = workflow["status"]["nodes"]
for node_id, node_info in nodes.items():
if node_info["displayName"] == component_name:
return node_id
else:
raise RuntimeError(f"Unable to find node_id for Component '{component_name}'")
def get_artifact(*, run_id: str, node_id: str, artifact_name: str, client: kfp.Client):
artifact = client.runs.read_artifact(run_id, node_id, artifact_name)
# Artifacts are returned as base64-encoded .tar.gz strings
data = b64decode(artifact.data)
io_buffer = BytesIO()
io_buffer.write(data)
io_buffer.seek(0)
data = None
with tarfile.open(fileobj=io_buffer) as tar:
member_names = tar.getnames()
if len(member_names) == 1:
data = tar.extractfile(member_names[0]).read().decode('utf-8')
else:
# Is it possible for KFP artifacts to have multiple members?
data = {}
for member_name in member_names:
data[member_name] = tar.extractfile(member_name).read().decode('utf-8')
return data
if __name__ == "__main__":
run_id = "e498b0da-036e-4e81-84e9-6e9c6e64960b"
component_name = "my-component"
# For an output variable named "output_data"
artifact_name = "my-component-output_data"
client = kfp.Client()
node_id = get_node_id(run_id=run_id, component_name=component_name, client=client)
artifact = get_artifact(
run_id=run_id, node_id=node_id, artifact_name=artifact_name, client=client,
)
# Do something with artifact ...
我正在使用本地 kubeflow 管道构建连续机器学习测试项目。我有一个使用 TFX 预处理数据的管道,它会自动将输出保存到 minio。在此管道之外,我想使用 tfx 的 Trainer 训练模型,但我需要在预处理管道中生成的工件。是否有导入此输出的实施方式?我查看了文档和一些问题,但找不到答案。而且因为我正在尝试连续进行,所以我不能依赖手动进行。
我的预处理管道示例:
@kfp.dsl.pipeline(
name='TFX',
description='TFX pipeline'
)
def tfx_pipeline():
# DL with wget, can use gcs instead as well
fetch = kfp.dsl.ContainerOp(
name='download',
image='busybox',
command=['sh', '-c'],
arguments=[
'sleep 1;'
'mkdir -p /tmp/data;'
'wget <gcp link> -O /tmp/data/results.csv'],
file_outputs={'downloaded': '/tmp/data'})
records_example = tfx_csv_gen(input_base=fetch.output)
stats = tfx_statistic_gen(input_data=records_example.output)
schema_op = tfx_schema_gen(stats.output)
tfx_example_validator(stats=stats.outputs['output'], schema=schema_op.outputs['output'])
#tag::tft[]
transformed_output = tfx_transform(
input_data=records_example.output,
schema=schema_op.outputs['output'],
module_file=module_file) # Path to your TFT code on GCS/S3
#end::tft[]
然后用
执行
kfp.compiler.Compiler().compile(tfx_pipeline, 'tfx_pipeline.zip')
client = kfp.Client()
client.list_experiments()
#exp = client.create_experiment(name='mdupdate')
my_experiment = client.create_experiment(name='tfx_pipeline')
my_run = client.run_pipeline(my_experiment.id, 'tfx',
'tfx_pipeline.zip')
我正在 visual studio 代码中开发 .ipynb
您可以获得这样的信息: https://github.com/kubeflow/pipelines/issues/4327#issuecomment-687255001
component_name:这可以在管道的 yaml 定义中检查,在 templates.name
下(搜索包含您想要的输出的组件)
artifact_name:这也可以在管道的 yaml 定义中检查,在同一组件下 outputs
属性
得到这两个参数后,就可以使用上面描述的函数了url:
#!/usr/bin/env python3
import json
import tarfile
from base64 import b64decode
from io import BytesIO
import kfp
def get_node_id(*, run_id: str, component_name: str, client: kfp.Client):
run = client.runs.get_run(run_id)
workflow = json.loads(run.pipeline_runtime.workflow_manifest)
nodes = workflow["status"]["nodes"]
for node_id, node_info in nodes.items():
if node_info["displayName"] == component_name:
return node_id
else:
raise RuntimeError(f"Unable to find node_id for Component '{component_name}'")
def get_artifact(*, run_id: str, node_id: str, artifact_name: str, client: kfp.Client):
artifact = client.runs.read_artifact(run_id, node_id, artifact_name)
# Artifacts are returned as base64-encoded .tar.gz strings
data = b64decode(artifact.data)
io_buffer = BytesIO()
io_buffer.write(data)
io_buffer.seek(0)
data = None
with tarfile.open(fileobj=io_buffer) as tar:
member_names = tar.getnames()
if len(member_names) == 1:
data = tar.extractfile(member_names[0]).read().decode('utf-8')
else:
# Is it possible for KFP artifacts to have multiple members?
data = {}
for member_name in member_names:
data[member_name] = tar.extractfile(member_name).read().decode('utf-8')
return data
if __name__ == "__main__":
run_id = "e498b0da-036e-4e81-84e9-6e9c6e64960b"
component_name = "my-component"
# For an output variable named "output_data"
artifact_name = "my-component-output_data"
client = kfp.Client()
node_id = get_node_id(run_id=run_id, component_name=component_name, client=client)
artifact = get_artifact(
run_id=run_id, node_id=node_id, artifact_name=artifact_name, client=client,
)
# Do something with artifact ...