Kubeflow 管道中的多个图像组件
Kubeflow Multiple images components in pipeline
我想在 Kubeflow 管道中编写一个具有 2 个组件的管道:A 和 B
A的输出是图片路径列表。
我想 运行 每个图像路径docker 图像 (B)
据我所知,B 的 dsl.ContainerOp
可以等待 A 的输出,但我不知道如何创建 B 的多个实例
更新:这最近发生了变化,只需在输出上使用 ParallerlFor
即可完成。参考:
----- 以下为 KF 0.6 及之前的版本 ----
这是 Kubeflow DSL 的公认问题:使用一个组件 (A) 的输出并对其进行迭代 运行为组件中的每个条目创建一个新组件 (B)先前的输出。这很难,因为 Kubeflow 使用的 DSL 是在编译时,不可能知道那时输出中会有多少元素。
参考:
KF v0.6 支持的唯一动态(运行 时间)迭代形式是:dsl-recursion。我已经通过 2 种方式使它工作,但缺少针对上述问题的未决工作:
如果 A 结果的大小在每个 运行 中都是一个常数并且是预先知道的,那么这很简单。
CASE A:上一步输出的大小已知
- 创建轻量级合成以获取给定索引处的图像路径
# Write a python code to extract the path from
# the string of refs the previous step returns
def get_path(str_of_paths: str, idx: int) -> str:
return str_of_paths.split(" ")[idx] # or some other delimiter
- 将 python 代码包装在 Kubeflow lightweight components
中
get_img_path_comp = comp.func_to_container_op(get_path,base_image='tensorflow/tensorflow') # or any appropriate base image
然后管道 dsl 代码中的常规 for 循环将起作用
image_path_res = ContainerOP_A() # run your container Op
for idx in range(4):
path = get_path(image_path_res.output, i)
ContainerOp_B(path.output)
CASE B:当上一步的输出不是固定大小时
这有点棘手和复杂。从 KF v0.6 开始,Kubeflow 允许的唯一动态循环形式是 dsl-recursion
选项 1
- 创建 2 个轻量级组件,一个用于计算结果的大小
sizer_op
,然后重用上面相同的 get_img_path_comp
。
@dsl.component
def sizer_op(str_of_refs) -> int:
return len(str_of_refs.split("|"))
sizer_op_comp = comp.func_to_container_op(sizer_op,base_image='tensorflow/tensorflow')
然后就可以运行回溯函数
@dsl.component
def subtracter_op(cur_idx) -> int:
return cur_idx - 1
sub_op_comp = comp.func_to_container_op(subtracter_op,base_image='tensorflow/tensorflow')
@dsl.graph_component
def recursive_run(list_of_images, cur_ref):
with dsl.Condition(cur_ref >= 0):
path = get_path(image_path_res.output, i)
ContainerOp_B(path.output)
# call recursively
next_ref = sub_op_comp(cur_ref)
recursive_run(list_of_images, next_ref)
image_path_res = ContainerOP_A() # run your container Op
sizer = sizer_op_comp(image_path_res)
recursive_run(image_path_res.output, sizer.output)
选项 2
在 运行 宁 ContainerOp_A 之后,创建一个从 ContainerOp_A 读取结果的 Kubeflow 组件,在 python 代码本身中解析结果,然后生成新的 运行 运行 只是 Containerop_B 使用 kfclient。您可以使用以下方式连接到 KF 管道客户端:
kf_client = Client(host=localhost:9990)
参考:kf_client
我想在 Kubeflow 管道中编写一个具有 2 个组件的管道:A 和 B
A的输出是图片路径列表。
我想 运行 每个图像路径docker 图像 (B)
据我所知,B 的 dsl.ContainerOp
可以等待 A 的输出,但我不知道如何创建 B 的多个实例
更新:这最近发生了变化,只需在输出上使用 ParallerlFor
即可完成。参考:
----- 以下为 KF 0.6 及之前的版本 ----
这是 Kubeflow DSL 的公认问题:使用一个组件 (A) 的输出并对其进行迭代 运行为组件中的每个条目创建一个新组件 (B)先前的输出。这很难,因为 Kubeflow 使用的 DSL 是在编译时,不可能知道那时输出中会有多少元素。
参考:
KF v0.6 支持的唯一动态(运行 时间)迭代形式是:dsl-recursion。我已经通过 2 种方式使它工作,但缺少针对上述问题的未决工作:
如果 A 结果的大小在每个 运行 中都是一个常数并且是预先知道的,那么这很简单。
CASE A:上一步输出的大小已知
- 创建轻量级合成以获取给定索引处的图像路径
# Write a python code to extract the path from
# the string of refs the previous step returns
def get_path(str_of_paths: str, idx: int) -> str:
return str_of_paths.split(" ")[idx] # or some other delimiter
- 将 python 代码包装在 Kubeflow lightweight components 中
get_img_path_comp = comp.func_to_container_op(get_path,base_image='tensorflow/tensorflow') # or any appropriate base image
然后管道 dsl 代码中的常规 for 循环将起作用
image_path_res = ContainerOP_A() # run your container Op
for idx in range(4):
path = get_path(image_path_res.output, i)
ContainerOp_B(path.output)
CASE B:当上一步的输出不是固定大小时
这有点棘手和复杂。从 KF v0.6 开始,Kubeflow 允许的唯一动态循环形式是 dsl-recursion
选项 1
- 创建 2 个轻量级组件,一个用于计算结果的大小
sizer_op
,然后重用上面相同的get_img_path_comp
。
@dsl.component
def sizer_op(str_of_refs) -> int:
return len(str_of_refs.split("|"))
sizer_op_comp = comp.func_to_container_op(sizer_op,base_image='tensorflow/tensorflow')
然后就可以运行回溯函数
@dsl.component
def subtracter_op(cur_idx) -> int:
return cur_idx - 1
sub_op_comp = comp.func_to_container_op(subtracter_op,base_image='tensorflow/tensorflow')
@dsl.graph_component
def recursive_run(list_of_images, cur_ref):
with dsl.Condition(cur_ref >= 0):
path = get_path(image_path_res.output, i)
ContainerOp_B(path.output)
# call recursively
next_ref = sub_op_comp(cur_ref)
recursive_run(list_of_images, next_ref)
image_path_res = ContainerOP_A() # run your container Op
sizer = sizer_op_comp(image_path_res)
recursive_run(image_path_res.output, sizer.output)
选项 2
在 运行 宁 ContainerOp_A 之后,创建一个从 ContainerOp_A 读取结果的 Kubeflow 组件,在 python 代码本身中解析结果,然后生成新的 运行 运行 只是 Containerop_B 使用 kfclient。您可以使用以下方式连接到 KF 管道客户端:
kf_client = Client(host=localhost:9990)
参考:kf_client