气流:关于 dag 路径的并发

Airflow: Concurrency with respect to dag path

假设我有一个具有深度并发兼容路径的 dag:

   B3 <-- B2 <-- B1 <-- B0 
  / 
C 
  \
   A3 <-- A2 <-- A1 <-- A0

上面的每条路径都可以同时解决。但是,如果上述分支之一失败(例如,如果 B0, A0 是传感器并且 B0 评估为 trueA0 仍在等待,则其余分支B 分支仍应执行。

然而,虽然我能够获得任务并发,但整个 dag 都停留在 B0, A0 任务上,而不是在 A0 等待期间沿着 B0 路径前进。

如何配置 Airflow 沿着每条路径前进,而不是在一个分支被阻塞时在任务中被阻塞?


或者是创建许多迷你狗的唯一解决方案?执行者似乎更喜欢仅跨一级节点的并行化而不是垂直执行——即,它只执行广度计算。

由于命名约定,起初这有点儿花招:

// I am using the following convention: filename(variable name or description)

// conceptually, 
C = airflow.cfg(dag_concurrency) * dag.py(dag concurrency for tasks)
C <= airflow.cfg(parallelism)