如何在元流中创建嵌套分支?
How to create nested branches in metaflow?
我正在使用 metaflow
创建文本处理管道,如下所示:-
___F------
______ D---| |
| |___G---| |__>
____B-----| |----->H
| |______E_________________> ^
A -| |
|____C________________________________|
根据 documentation,branch
允许并行计算步骤,它用于并行计算 (B, C)、(D, E) 和 (F, G) .最后所有的分支都在H处连接。下面是实现这个逻辑的代码:-
from metaflow import FlowSpec, step
class TextProcessing(FlowSpec):
@step
def a(self):
....
self.next(self.b, self.c)
@step
def c(self):
result1 = {}
....
self.next(self.join)
@step
def b(self):
....
self.next(self.d, self.e)
@step
def e(self):
result2 = []
.....
self.next(self.join)
@step
def d(self):
....
self.next(self.f, self.g)
@step
def f(self):
result3 = []
....
self.next(self.join)
@step
def g(self):
result4 = []
.....
self.next(self.join)
@step
def join(self, results):
data = [results.c.result, results.e.result2, result.f.result3, result.g.result4]
print(data)
self.next(self.end)
@step
def end(self):
pass
etl = TextProcessing()
在 运行 python main.py run
上,我收到以下错误:-
Metaflow 2.2.10 executing TextProcessing for user:ubuntu
Validating your flow...
Validity checker found an issue on line 83:
Step join seems like a join step (it takes an extra input argument) but an incorrect number of steps (c, e, f, g) lead to it. This join was expecting 2 incoming paths, starting from splitted step(s) f, g.
谁能指出我哪里错了?
再次仔细检查 docs 后,我意识到我没有正确处理联接。根据 metaflow-2.2.10
的文档:-
Note that you can nest branches arbitrarily, that is, you can branch inside a branch. Just remember to join all the branches that you create.
这意味着每个分支都应该加入。为了连接来自分支的值,metaflow
提供了 merge_artifacts
效用函数来帮助传播明确的值。
由于工作流中有三个分支,因此添加了三个连接步骤来合并结果。
以下更改对我有用:-
from metaflow import FlowSpec, step
class TextProcessing(FlowSpec):
@step
def a(self):
....
self.next(self.b, self.c)
@step
def c(self):
result1 = {}
....
self.next(self.merge_3)
@step
def b(self):
....
self.next(self.d, self.e)
@step
def e(self):
result2 = []
.....
self.next(self.merge_2)
@step
def d(self):
....
self.next(self.f, self.g)
@step
def f(self):
result3 = []
....
self.next(self.merge_1)
@step
def g(self):
result4 = []
.....
self.next(self.merge_1)
@step
def merge_1(self, results):
self.result = {
'result4' : results.g.result4,
'result3' : results.f.result3
}
self.next(self.merge_2)
@step
def merge_2(self, results):
self.result = { 'result2' : results.e.result2, **results.merge_1.result }
self.merge_artifacts(results, include=['result'])
self.next(self.merge_3)
@step
def merge_3(self, results):
self.result = { 'c' : results.c.result1, **results.merge_2.result }
self.merge_artifacts(results, include=['result'])
self.next(self.end)
@step
def end(self):
print(self.result)
etl = TextProcessing()
我正在使用 metaflow
创建文本处理管道,如下所示:-
___F------
______ D---| |
| |___G---| |__>
____B-----| |----->H
| |______E_________________> ^
A -| |
|____C________________________________|
根据 documentation,branch
允许并行计算步骤,它用于并行计算 (B, C)、(D, E) 和 (F, G) .最后所有的分支都在H处连接。下面是实现这个逻辑的代码:-
from metaflow import FlowSpec, step
class TextProcessing(FlowSpec):
@step
def a(self):
....
self.next(self.b, self.c)
@step
def c(self):
result1 = {}
....
self.next(self.join)
@step
def b(self):
....
self.next(self.d, self.e)
@step
def e(self):
result2 = []
.....
self.next(self.join)
@step
def d(self):
....
self.next(self.f, self.g)
@step
def f(self):
result3 = []
....
self.next(self.join)
@step
def g(self):
result4 = []
.....
self.next(self.join)
@step
def join(self, results):
data = [results.c.result, results.e.result2, result.f.result3, result.g.result4]
print(data)
self.next(self.end)
@step
def end(self):
pass
etl = TextProcessing()
在 运行 python main.py run
上,我收到以下错误:-
Metaflow 2.2.10 executing TextProcessing for user:ubuntu
Validating your flow...
Validity checker found an issue on line 83:
Step join seems like a join step (it takes an extra input argument) but an incorrect number of steps (c, e, f, g) lead to it. This join was expecting 2 incoming paths, starting from splitted step(s) f, g.
谁能指出我哪里错了?
再次仔细检查 docs 后,我意识到我没有正确处理联接。根据 metaflow-2.2.10
的文档:-
Note that you can nest branches arbitrarily, that is, you can branch inside a branch. Just remember to join all the branches that you create.
这意味着每个分支都应该加入。为了连接来自分支的值,metaflow
提供了 merge_artifacts
效用函数来帮助传播明确的值。
由于工作流中有三个分支,因此添加了三个连接步骤来合并结果。
以下更改对我有用:-
from metaflow import FlowSpec, step
class TextProcessing(FlowSpec):
@step
def a(self):
....
self.next(self.b, self.c)
@step
def c(self):
result1 = {}
....
self.next(self.merge_3)
@step
def b(self):
....
self.next(self.d, self.e)
@step
def e(self):
result2 = []
.....
self.next(self.merge_2)
@step
def d(self):
....
self.next(self.f, self.g)
@step
def f(self):
result3 = []
....
self.next(self.merge_1)
@step
def g(self):
result4 = []
.....
self.next(self.merge_1)
@step
def merge_1(self, results):
self.result = {
'result4' : results.g.result4,
'result3' : results.f.result3
}
self.next(self.merge_2)
@step
def merge_2(self, results):
self.result = { 'result2' : results.e.result2, **results.merge_1.result }
self.merge_artifacts(results, include=['result'])
self.next(self.merge_3)
@step
def merge_3(self, results):
self.result = { 'c' : results.c.result1, **results.merge_2.result }
self.merge_artifacts(results, include=['result'])
self.next(self.end)
@step
def end(self):
print(self.result)
etl = TextProcessing()