在 Kubeflow Pipelines 中,如何将元素列表发送到轻量级 python 组件?
In Kubeflow Pipelines, how to send a list of elements to a lightweight python component?
我正在尝试将元素列表作为 PipelineParameter 发送到轻量级组件。
这是重现问题的示例。这是函数:
def my_func(my_list: list) -> bool:
print(f'my_list is {my_list}')
print(f'my_list is of type {type(my_list)}')
print(f'elem 0 is {my_list[0]}')
print(f'elem 1 is {my_list[1]}')
return True
如果我用这个执行它:
test_data = ['abc', 'def']
my_func(test_data)
它的行为符合预期:
my_list is ['abc', 'def']
my_list is of type <class 'list'>
elem 0 is abc
elem 1 is def
但是如果我将它包装在一个操作中并设置一个管道:
import kfp
my_op = kfp.components.func_to_container_op(my_func)
@kfp.dsl.pipeline()
def my_pipeline(my_list: kfp.dsl.PipelineParam = kfp.dsl.PipelineParam('my_list', param_type=kfp.dsl.types.List())):
my_op(my_list)
kfp.compiler.Compiler().compile(my_pipeline, 'my_pipeline.zip')
然后 运行 管道:
client = kfp.Client()
experiment = client.create_experiment('Default')
client.run_pipeline(experiment.id, 'my job', 'my_pipeline.zip', params={'my_list': test_data})
然后似乎在某个时候我的列表被转换为字符串!
my_list is ['abc', 'def']
my_list is of type <class 'str'>
elem 0 is [
elem 1 is '
这是我发现的一个解决方法,将参数序列化为 json 字符串。不确定这是否真的是最好的方法...
裸函数变为:
def my_func(json_arg_str: str) -> bool:
import json
args = json.loads(json_arg_str)
my_list = args['my_list']
print(f'my_list is {my_list}')
print(f'my_list is of type {type(my_list)}')
print(f'elem 0 is {my_list[0]}')
print(f'elem 1 is {my_list[1]}')
return True
只要您将 args 作为 json 字符串而不是列表传递,它仍然有效:
test_data = '{"my_list":["abc", "def"]}'
my_func(test_data)
产生预期结果:
my_list is ['abc', 'def']
my_list is of type <class 'list'>
elem 0 is abc
elem 1 is def
现在管道已更改为接受 str
而不是 kfp.dsl.types.List
类型的 PipelineParam
:
import kfp
my_op = kfp.components.func_to_container_op(my_func)
@kfp.dsl.pipeline()
def my_pipeline(json_arg_str: str):
my_op(json_arg_str)
kfp.compiler.Compiler().compile(my_pipeline, 'my_pipeline.zip')
其中,当这样执行时:
client = kfp.Client()
experiment = client.create_experiment('Default')
client.run_pipeline(experiment.id, 'my job', 'my_pipeline.zip', params={'json_arg_str': test_data})
产生相同的结果:
my_list is ['abc', 'def']
my_list is of type <class 'list'>
elem 0 is abc
elem 1 is def
虽然它有效,但我还是觉得这个解决方法很烦人。如果不是为了允许一个 PipelineParam 是一个列表,那么 kfp.dsl.types.List 的意义何在?
目前最好的选择似乎是序列化参数。有一个与此相关的问题:https://github.com/kubeflow/pipelines/issues/1901
我正在尝试将元素列表作为 PipelineParameter 发送到轻量级组件。
这是重现问题的示例。这是函数:
def my_func(my_list: list) -> bool:
print(f'my_list is {my_list}')
print(f'my_list is of type {type(my_list)}')
print(f'elem 0 is {my_list[0]}')
print(f'elem 1 is {my_list[1]}')
return True
如果我用这个执行它:
test_data = ['abc', 'def']
my_func(test_data)
它的行为符合预期:
my_list is ['abc', 'def']
my_list is of type <class 'list'>
elem 0 is abc
elem 1 is def
但是如果我将它包装在一个操作中并设置一个管道:
import kfp
my_op = kfp.components.func_to_container_op(my_func)
@kfp.dsl.pipeline()
def my_pipeline(my_list: kfp.dsl.PipelineParam = kfp.dsl.PipelineParam('my_list', param_type=kfp.dsl.types.List())):
my_op(my_list)
kfp.compiler.Compiler().compile(my_pipeline, 'my_pipeline.zip')
然后 运行 管道:
client = kfp.Client()
experiment = client.create_experiment('Default')
client.run_pipeline(experiment.id, 'my job', 'my_pipeline.zip', params={'my_list': test_data})
然后似乎在某个时候我的列表被转换为字符串!
my_list is ['abc', 'def']
my_list is of type <class 'str'>
elem 0 is [
elem 1 is '
这是我发现的一个解决方法,将参数序列化为 json 字符串。不确定这是否真的是最好的方法...
裸函数变为:
def my_func(json_arg_str: str) -> bool:
import json
args = json.loads(json_arg_str)
my_list = args['my_list']
print(f'my_list is {my_list}')
print(f'my_list is of type {type(my_list)}')
print(f'elem 0 is {my_list[0]}')
print(f'elem 1 is {my_list[1]}')
return True
只要您将 args 作为 json 字符串而不是列表传递,它仍然有效:
test_data = '{"my_list":["abc", "def"]}' my_func(test_data)
产生预期结果:
my_list is ['abc', 'def']
my_list is of type <class 'list'>
elem 0 is abc
elem 1 is def
现在管道已更改为接受 str
而不是 kfp.dsl.types.List
类型的 PipelineParam
:
import kfp
my_op = kfp.components.func_to_container_op(my_func)
@kfp.dsl.pipeline()
def my_pipeline(json_arg_str: str):
my_op(json_arg_str)
kfp.compiler.Compiler().compile(my_pipeline, 'my_pipeline.zip')
其中,当这样执行时:
client = kfp.Client()
experiment = client.create_experiment('Default')
client.run_pipeline(experiment.id, 'my job', 'my_pipeline.zip', params={'json_arg_str': test_data})
产生相同的结果:
my_list is ['abc', 'def']
my_list is of type <class 'list'>
elem 0 is abc
elem 1 is def
虽然它有效,但我还是觉得这个解决方法很烦人。如果不是为了允许一个 PipelineParam 是一个列表,那么 kfp.dsl.types.List 的意义何在?
目前最好的选择似乎是序列化参数。有一个与此相关的问题:https://github.com/kubeflow/pipelines/issues/1901