等待节点在 Kedro 中完成

Waiting for nodes to finish in Kedro

我在 Kedro 中有一个如下所示的管道:

from kedro.pipeline import Pipeline, node
from .nodes import *

def foo():
    return Pipeline([
        node(a, inputs=["train_x", "test_x"], outputs=dict(bar_a="bar_a"), name="A"),
        node(b, inputs=["train_x", "test_x"], outputs=dict(bar_b="bar_b"), name="B"),
        node(c, inputs=["train_x", "test_x"], outputs=dict(bar_c="bar_c"), name="C"),
        node(d, inputs=["train_x", "test_x"], outputs=dict(bar_d="bar_d"), name="D"),
        
    ])

节点 A、B 和 C 不是很占用资源,但它们需要一段时间,所以我想 运行 它们并行,另一方面,节点 D 几乎使用所有资源我的记忆,如果它与其他节点一起执行,它将失败。有没有一种方法可以告诉 Kedro 在执行节点 D 之前等待 A、B 和 C 完成并保持代码井井有条?

Kedro 根据不同节点inputs/outputs之间的相互依赖关系来决定执行顺序。在您的情况下,节点 D 不依赖于任何其他节点,因此无法保证执行顺序。同样,如果使用并联运行ner.

,也不能保证节点D不会运行与A、B、C并联。

也就是说,有几个解决方法可以用来实现特定的执行顺序。

1 [首选] 运行 节点分开

除了 kedro run --parallel,您还可以:

kedro run --pipeline foo --node A --node B --node C --parallel; kedro run --pipeline foo --node D

这可以说是首选解决方案,因为它不需要更改代码(这很好,以防您 运行 不同机器上的同一管道)。如果您希望节点 D 仅在 A、B 和 C 成功时 运行,则可以执行 && 而不是 ;。如果 运行ning 逻辑变得更复杂,您可以将其存储在 Makefile/bash 脚本中。

2 使用虚拟 inputs/outputs

您还可以通过引入虚拟数据集来强制执行顺序。类似于:

def foo():
    return Pipeline([
        node(a, inputs=["train_x", "test_x"], outputs=[dict(bar_a="bar_a"), "a_done"], name="A"),
        node(b, inputs=["train_x", "test_x"], outputs=[dict(bar_b="bar_b"), "b_done"], name="B"),
        node(c, inputs=["train_x", "test_x"], outputs=[dict(bar_c="bar_c"), "c_done"], name="C"),
        node(d, inputs=["train_x", "test_x", "a_done", "b_done", "c_done"], outputs=dict(bar_d="bar_d"), name="D"),     
    ])

空列表可以用于虚拟数据集。底层函数还必须 return/take 附加参数。

这种方法的优点是 kedro run --parallel 将立即产生所需的执行逻辑。缺点是污染了节点和底层函数的定义。

如果你走这条路,你还必须决定是否要将虚拟数据集存储在数据目录中(污染更多,但允许 运行 节点 D 自己)或不(节点 D 不能单独 运行)。


相关讨论[1, ]