等待节点在 Kedro 中完成
Waiting for nodes to finish in Kedro
我在 Kedro 中有一个如下所示的管道:
from kedro.pipeline import Pipeline, node
from .nodes import *
def foo():
return Pipeline([
node(a, inputs=["train_x", "test_x"], outputs=dict(bar_a="bar_a"), name="A"),
node(b, inputs=["train_x", "test_x"], outputs=dict(bar_b="bar_b"), name="B"),
node(c, inputs=["train_x", "test_x"], outputs=dict(bar_c="bar_c"), name="C"),
node(d, inputs=["train_x", "test_x"], outputs=dict(bar_d="bar_d"), name="D"),
])
节点 A、B 和 C 不是很占用资源,但它们需要一段时间,所以我想 运行 它们并行,另一方面,节点 D 几乎使用所有资源我的记忆,如果它与其他节点一起执行,它将失败。有没有一种方法可以告诉 Kedro 在执行节点 D 之前等待 A、B 和 C 完成并保持代码井井有条?
Kedro 根据不同节点inputs/outputs之间的相互依赖关系来决定执行顺序。在您的情况下,节点 D 不依赖于任何其他节点,因此无法保证执行顺序。同样,如果使用并联运行ner.
,也不能保证节点D不会运行与A、B、C并联。
也就是说,有几个解决方法可以用来实现特定的执行顺序。
1 [首选] 运行 节点分开
除了 kedro run --parallel
,您还可以:
kedro run --pipeline foo --node A --node B --node C --parallel; kedro run --pipeline foo --node D
这可以说是首选解决方案,因为它不需要更改代码(这很好,以防您 运行 不同机器上的同一管道)。如果您希望节点 D 仅在 A、B 和 C 成功时 运行,则可以执行 &&
而不是 ;
。如果 运行ning 逻辑变得更复杂,您可以将其存储在 Makefile/bash 脚本中。
2 使用虚拟 inputs/outputs
您还可以通过引入虚拟数据集来强制执行顺序。类似于:
def foo():
return Pipeline([
node(a, inputs=["train_x", "test_x"], outputs=[dict(bar_a="bar_a"), "a_done"], name="A"),
node(b, inputs=["train_x", "test_x"], outputs=[dict(bar_b="bar_b"), "b_done"], name="B"),
node(c, inputs=["train_x", "test_x"], outputs=[dict(bar_c="bar_c"), "c_done"], name="C"),
node(d, inputs=["train_x", "test_x", "a_done", "b_done", "c_done"], outputs=dict(bar_d="bar_d"), name="D"),
])
空列表可以用于虚拟数据集。底层函数还必须 return/take 附加参数。
这种方法的优点是 kedro run --parallel
将立即产生所需的执行逻辑。缺点是污染了节点和底层函数的定义。
如果你走这条路,你还必须决定是否要将虚拟数据集存储在数据目录中(污染更多,但允许 运行 节点 D 自己)或不(节点 D 不能单独 运行)。
相关讨论[1, ]
我在 Kedro 中有一个如下所示的管道:
from kedro.pipeline import Pipeline, node
from .nodes import *
def foo():
return Pipeline([
node(a, inputs=["train_x", "test_x"], outputs=dict(bar_a="bar_a"), name="A"),
node(b, inputs=["train_x", "test_x"], outputs=dict(bar_b="bar_b"), name="B"),
node(c, inputs=["train_x", "test_x"], outputs=dict(bar_c="bar_c"), name="C"),
node(d, inputs=["train_x", "test_x"], outputs=dict(bar_d="bar_d"), name="D"),
])
节点 A、B 和 C 不是很占用资源,但它们需要一段时间,所以我想 运行 它们并行,另一方面,节点 D 几乎使用所有资源我的记忆,如果它与其他节点一起执行,它将失败。有没有一种方法可以告诉 Kedro 在执行节点 D 之前等待 A、B 和 C 完成并保持代码井井有条?
Kedro 根据不同节点inputs/outputs之间的相互依赖关系来决定执行顺序。在您的情况下,节点 D 不依赖于任何其他节点,因此无法保证执行顺序。同样,如果使用并联运行ner.
,也不能保证节点D不会运行与A、B、C并联。也就是说,有几个解决方法可以用来实现特定的执行顺序。
1 [首选] 运行 节点分开
除了 kedro run --parallel
,您还可以:
kedro run --pipeline foo --node A --node B --node C --parallel; kedro run --pipeline foo --node D
这可以说是首选解决方案,因为它不需要更改代码(这很好,以防您 运行 不同机器上的同一管道)。如果您希望节点 D 仅在 A、B 和 C 成功时 运行,则可以执行 &&
而不是 ;
。如果 运行ning 逻辑变得更复杂,您可以将其存储在 Makefile/bash 脚本中。
2 使用虚拟 inputs/outputs
您还可以通过引入虚拟数据集来强制执行顺序。类似于:
def foo():
return Pipeline([
node(a, inputs=["train_x", "test_x"], outputs=[dict(bar_a="bar_a"), "a_done"], name="A"),
node(b, inputs=["train_x", "test_x"], outputs=[dict(bar_b="bar_b"), "b_done"], name="B"),
node(c, inputs=["train_x", "test_x"], outputs=[dict(bar_c="bar_c"), "c_done"], name="C"),
node(d, inputs=["train_x", "test_x", "a_done", "b_done", "c_done"], outputs=dict(bar_d="bar_d"), name="D"),
])
空列表可以用于虚拟数据集。底层函数还必须 return/take 附加参数。
这种方法的优点是 kedro run --parallel
将立即产生所需的执行逻辑。缺点是污染了节点和底层函数的定义。
如果你走这条路,你还必须决定是否要将虚拟数据集存储在数据目录中(污染更多,但允许 运行 节点 D 自己)或不(节点 D 不能单独 运行)。
相关讨论[1,