Kubeflow 管道中的多个图像组件

Kubeflow Multiple images components in pipeline

我想在 Kubeflow 管道中编写一个具有 2 个组件的管道:A 和 B

A的输出是图片路径列表。

我想 运行 每个图像路径docker 图像 (B)

据我所知,B 的 dsl.ContainerOp 可以等待 A 的输出,但我不知道如何创建 B 的多个实例

更新:这最近发生了变化,只需在输出上使用 ParallerlFor 即可完成。参考:

----- 以下为 KF 0.6 及之前的版本 ----

这是 Kubeflow DSL 的公认问题:使用一个组件 (A) 的输出并对其进行迭代 运行为组件中的每个条目创建一个新组件 (B)先前的输出。这很难,因为 Kubeflow 使用的 DSL 是在编译时,不可能知道那时输出中会有多少元素。

参考:

KF v0.6 支持的唯一动态(运行 时间)迭代形式是:dsl-recursion。我已经通过 2 种方式使它工作,但缺少针对上述问题的未决工作:

如果 A 结果的大小在每个 运行 中都是一个常数并且是预先知道的,那么这很简单。

CASE A:上一步输出的大小已知

  1. 创建轻量级合成以获取给定索引处的图像路径
# Write a python code to extract the path from
# the string of refs the previous step returns 
def get_path(str_of_paths: str, idx: int) -> str:
    return str_of_paths.split(" ")[idx] # or some other delimiter
  1. 将 python 代码包装在 Kubeflow lightweight components
get_img_path_comp = comp.func_to_container_op(get_path,base_image='tensorflow/tensorflow') # or any appropriate base image

然后管道 dsl 代码中的常规 for 循环将起作用

image_path_res = ContainerOP_A() # run your container Op
for idx in range(4):
    path = get_path(image_path_res.output, i)
    ContainerOp_B(path.output)

CASE B:当上一步的输出不是固定大小时

这有点棘手和复杂。从 KF v0.6 开始,Kubeflow 允许的唯一动态循环形式是 dsl-recursion

选项 1

  1. 创建 2 个轻量级组件,一个用于计算结果的大小 sizer_op,然后重用上面相同的 get_img_path_comp
@dsl.component
def sizer_op(str_of_refs) -> int:
    return len(str_of_refs.split("|"))
sizer_op_comp = comp.func_to_container_op(sizer_op,base_image='tensorflow/tensorflow')

然后就可以运行回溯函数

@dsl.component
def subtracter_op(cur_idx) -> int:
    return cur_idx - 1
sub_op_comp = comp.func_to_container_op(subtracter_op,base_image='tensorflow/tensorflow')

@dsl.graph_component
def recursive_run(list_of_images, cur_ref):
    with dsl.Condition(cur_ref >= 0):
        path = get_path(image_path_res.output, i)
        ContainerOp_B(path.output)

        # call recursively
        next_ref = sub_op_comp(cur_ref)
        recursive_run(list_of_images, next_ref)


image_path_res = ContainerOP_A() # run your container Op
sizer = sizer_op_comp(image_path_res)
recursive_run(image_path_res.output, sizer.output)

选项 2

在 运行 宁 ContainerOp_A 之后,创建一个从 ContainerOp_A 读取结果的 Kubeflow 组件,在 python 代码本身中解析结果,然后生成新的 运行 运行 只是 Containerop_B 使用 kfclient。您可以使用以下方式连接到 KF 管道客户端:

kf_client = Client(host=localhost:9990)

参考:kf_client