尝试使用文件在 Kubeflow 组件之间传递数据时出现问题
Issue when trying to pass data between Kubeflow components using files
我使用 python 函数制作了两个组件,我试图使用文件在它们之间传递数据,但我无法这样做。我想计算总和,然后使用文件将答案发送给其他组件。下面是部分代码(代码在没有文件传递的情况下工作)。请协助。
# Define your components code as standalone python functions:======================
def add(a: float, b: float, f: comp.OutputTextFile(float)) -> NamedTuple(
'AddOutput',
[
('out', comp.OutputTextFile(float))
]):
'''Calculates sum of two arguments'''
sum = a+b
f.write(sum)
from collections import namedtuple
addOutput = namedtuple(
'AddOutput',
['out'])
return addOutput(f) # the metrics will be uploaded to the cloud
def multiply(c:float, d:float, f: comp.InputTextFile(float) ):
'''Calculates the product'''
product = c * d
print(f.read())
add_op = comp.func_to_container_op(add, output_component_file='add_component.yaml')
product_op = comp.create_component_from_func(multiply,
output_component_file='multiple_component.yaml')
@dsl.pipeline(
name='Addition-pipeline',
description='An example pipeline that performs addition calculations.'
)
def my_pipeline(a, b='7', c='4', d='1'):
add_op = pl_comp_list[0]
product_op = pl_comp_list[1]
first_add_task = add_op(a, 4)
second_add_task = product_op(c, d, first_add_task.outputs['out'])
这是我测试过并且有效的管道的稍微简化的版本。
传递给 OutputTextFile
和 InputTextFile
的 class 类型并不重要。它将被读取和写入为 str
。所以这是你应该改变的:
- 写入
OutputTextFile
时:从 float to str
转换 sum_
- 从
InputTextFile
读取时:从 str to float
转换 f.read()
值
import kfp
from kfp import dsl
from kfp import components as comp
def add(a: float, b: float, f: comp.OutputTextFile()):
'''Calculates sum of two arguments'''
sum_ = a + b
f.write(str(sum_)) # cast to str
return sum_
def multiply(c: float, d: float, f: comp.InputTextFile()):
'''Calculates the product'''
in_ = float(f.read()) # cast to float
product = c * d * in_
print(product)
return product
add_op = comp.func_to_container_op(add,
output_component_file='add_component.yaml')
product_op = comp.create_component_from_func(
multiply, output_component_file='multiple_component.yaml')
@dsl.pipeline(
name='Addition-pipeline',
description='An example pipeline that performs addition calculations.')
def my_pipeline(a, b='7', c='4', d='1'):
first_add_task = add_op(a, b)
second_add_task = product_op(c, d, first_add_task.output)
if __name__ == "__main__":
compiled_name = __file__ + ".yaml"
kfp.compiler.Compiler().compile(my_pipeline, compiled_name)
('out', comp.OutputTextFile(float))
这不是真的有效。 OutputTextFile
注释(和其他类似的注释)只能在函数参数中使用。函数 return value 仅适用于您想要输出为值(而不是文件)的输出。
因为您已经有了 f: comp.OutputTextFile(float)
,您可以在其中完全删除函数 return 值。然后将 f
输出传递给下游组件:product_op(c, d, first_add_task.outputs['f'])
.
我使用 python 函数制作了两个组件,我试图使用文件在它们之间传递数据,但我无法这样做。我想计算总和,然后使用文件将答案发送给其他组件。下面是部分代码(代码在没有文件传递的情况下工作)。请协助。
# Define your components code as standalone python functions:======================
def add(a: float, b: float, f: comp.OutputTextFile(float)) -> NamedTuple(
'AddOutput',
[
('out', comp.OutputTextFile(float))
]):
'''Calculates sum of two arguments'''
sum = a+b
f.write(sum)
from collections import namedtuple
addOutput = namedtuple(
'AddOutput',
['out'])
return addOutput(f) # the metrics will be uploaded to the cloud
def multiply(c:float, d:float, f: comp.InputTextFile(float) ):
'''Calculates the product'''
product = c * d
print(f.read())
add_op = comp.func_to_container_op(add, output_component_file='add_component.yaml')
product_op = comp.create_component_from_func(multiply,
output_component_file='multiple_component.yaml')
@dsl.pipeline(
name='Addition-pipeline',
description='An example pipeline that performs addition calculations.'
)
def my_pipeline(a, b='7', c='4', d='1'):
add_op = pl_comp_list[0]
product_op = pl_comp_list[1]
first_add_task = add_op(a, 4)
second_add_task = product_op(c, d, first_add_task.outputs['out'])
这是我测试过并且有效的管道的稍微简化的版本。
传递给 OutputTextFile
和 InputTextFile
的 class 类型并不重要。它将被读取和写入为 str
。所以这是你应该改变的:
- 写入
OutputTextFile
时:从float to str
转换 - 从
InputTextFile
读取时:从str to float
转换
sum_
f.read()
值
import kfp
from kfp import dsl
from kfp import components as comp
def add(a: float, b: float, f: comp.OutputTextFile()):
'''Calculates sum of two arguments'''
sum_ = a + b
f.write(str(sum_)) # cast to str
return sum_
def multiply(c: float, d: float, f: comp.InputTextFile()):
'''Calculates the product'''
in_ = float(f.read()) # cast to float
product = c * d * in_
print(product)
return product
add_op = comp.func_to_container_op(add,
output_component_file='add_component.yaml')
product_op = comp.create_component_from_func(
multiply, output_component_file='multiple_component.yaml')
@dsl.pipeline(
name='Addition-pipeline',
description='An example pipeline that performs addition calculations.')
def my_pipeline(a, b='7', c='4', d='1'):
first_add_task = add_op(a, b)
second_add_task = product_op(c, d, first_add_task.output)
if __name__ == "__main__":
compiled_name = __file__ + ".yaml"
kfp.compiler.Compiler().compile(my_pipeline, compiled_name)
('out', comp.OutputTextFile(float))
这不是真的有效。 OutputTextFile
注释(和其他类似的注释)只能在函数参数中使用。函数 return value 仅适用于您想要输出为值(而不是文件)的输出。
因为您已经有了 f: comp.OutputTextFile(float)
,您可以在其中完全删除函数 return 值。然后将 f
输出传递给下游组件:product_op(c, d, first_add_task.outputs['f'])
.