气流:关于 dag 路径的并发
Airflow: Concurrency with respect to dag path
假设我有一个具有深度并发兼容路径的 dag:
B3 <-- B2 <-- B1 <-- B0
/
C
\
A3 <-- A2 <-- A1 <-- A0
上面的每条路径都可以同时解决。但是,如果上述分支之一失败(例如,如果 B0, A0
是传感器并且 B0
评估为 true
而 A0
仍在等待,则其余分支B
分支仍应执行。
然而,虽然我能够获得任务并发,但整个 dag 都停留在 B0, A0
任务上,而不是在 A0
等待期间沿着 B0
路径前进。
如何配置 Airflow 沿着每条路径前进,而不是在一个分支被阻塞时在任务中被阻塞?
或者是创建许多迷你狗的唯一解决方案?执行者似乎更喜欢仅跨一级节点的并行化而不是垂直执行——即,它只执行广度计算。
由于命名约定,起初这有点儿花招:
// I am using the following convention: filename(variable name or description)
// conceptually,
C = airflow.cfg(dag_concurrency) * dag.py(dag concurrency for tasks)
C <= airflow.cfg(parallelism)
假设我有一个具有深度并发兼容路径的 dag:
B3 <-- B2 <-- B1 <-- B0
/
C
\
A3 <-- A2 <-- A1 <-- A0
上面的每条路径都可以同时解决。但是,如果上述分支之一失败(例如,如果 B0, A0
是传感器并且 B0
评估为 true
而 A0
仍在等待,则其余分支B
分支仍应执行。
然而,虽然我能够获得任务并发,但整个 dag 都停留在 B0, A0
任务上,而不是在 A0
等待期间沿着 B0
路径前进。
如何配置 Airflow 沿着每条路径前进,而不是在一个分支被阻塞时在任务中被阻塞?
或者是创建许多迷你狗的唯一解决方案?执行者似乎更喜欢仅跨一级节点的并行化而不是垂直执行——即,它只执行广度计算。
由于命名约定,起初这有点儿花招:
// I am using the following convention: filename(variable name or description)
// conceptually,
C = airflow.cfg(dag_concurrency) * dag.py(dag concurrency for tasks)
C <= airflow.cfg(parallelism)