使用 BranchPythonOperator 创建的分支不合并?
Branches created using BranchPythonOperator do not merge?
我正在使用 BranchPythonOperator 在气流中创建分支。我的用例是我需要从主流中创建两个分支。当存在 somefile.csv 时,将遵循分支 A(几乎没有任务),否则将遵循分支 B(没有任务)。最后两个分支应该合并,再次成为主流。
现在我可以跟随分支 A 或分支 B,但问题是如果我跟随分支 B 执行最终主流任务,如果我跟随分支 A 最终主流任务将被跳过。
MainstreamTaskA.setDownStream(MainstreamTaskB)
MainstreamTaskB.setDownStream(BranchATaskA)
BranchATaskA.setDownStream(MainstreamTaskC)
MainstreamTaskB.setDownStream(MainstreamTaskC)
我在 MainstreamTaskB 和 MainstreamTaskC 中设置了触发规则 "all_done"。
有人可以指导我完成这个吗?
我在你的依赖项中看不到其他分支。唯一的分支是 BranchATaskA
。
但是根据您提到的内容,您应该具有以下任务依赖关系并且有两个分支任务 BranchATaskA
和 BranchATaskB
.
MainstreamTaskA >> MainstreamTaskB
MainstreamTaskB >> BranchATaskA >> MainstreamTaskC
MainstreamTaskB >> BranchATaskB >> MainstreamTaskC
您应该在 MainstreamTaskC
上设置触发规则 all_done
。
我正在使用 BranchPythonOperator 在气流中创建分支。我的用例是我需要从主流中创建两个分支。当存在 somefile.csv 时,将遵循分支 A(几乎没有任务),否则将遵循分支 B(没有任务)。最后两个分支应该合并,再次成为主流。 现在我可以跟随分支 A 或分支 B,但问题是如果我跟随分支 B 执行最终主流任务,如果我跟随分支 A 最终主流任务将被跳过。
MainstreamTaskA.setDownStream(MainstreamTaskB)
MainstreamTaskB.setDownStream(BranchATaskA)
BranchATaskA.setDownStream(MainstreamTaskC)
MainstreamTaskB.setDownStream(MainstreamTaskC)
我在 MainstreamTaskB 和 MainstreamTaskC 中设置了触发规则 "all_done"。
有人可以指导我完成这个吗?
我在你的依赖项中看不到其他分支。唯一的分支是 BranchATaskA
。
但是根据您提到的内容,您应该具有以下任务依赖关系并且有两个分支任务 BranchATaskA
和 BranchATaskB
.
MainstreamTaskA >> MainstreamTaskB
MainstreamTaskB >> BranchATaskA >> MainstreamTaskC
MainstreamTaskB >> BranchATaskB >> MainstreamTaskC
您应该在 MainstreamTaskC
上设置触发规则 all_done
。