基于先前任务输出的 Airflow 中的并行任务执行

Parallel task execution in Airflow based on previous task output

我习惯于使用 Argo,它能够根据先前步骤的输入并行执行流程步骤(例如,参见 https://dzone.com/articles/parallel-workflows-on-kubernetes)。

让我们举一个简单的例子:第一步请求 API 并接收匹配条目列表。第二个将下载并相应地处理条目。第二步可以并行化,因为条目彼此独立。在第三步中,结果存储在某处,具体取决于步骤 2 中的所有执行(例如“项目 X 成功处理”)。经典的分而治之,见下图。

我找不到关于如何在 Airflow (2.0+) 中执行此操作的文档。有可能吗,也许是 XComs?

编辑:一种可能的用例是卫星数据处理。考虑例如Open Access Hub of Copernicus,它提供了一个API来搜索卫星场景。我想执行例如处理Sentintel-1(雷达数据)并行用于给定查询。与查询匹配的所有场景都是相关的,但彼此不依赖。计算量很大 CPU + 可能是 GPU 密集型,因此就整体性能而言,分散在工作人员身上是有意义的。

Airflow official tutorial 中所述,DAG 定义“需要快速评估(秒,而不是分钟),因为调度程序将定期执行它以反映任何更改”。因此,不建议查询 API 来“即时”创建 DAG。

话虽如此,您可以使用 Variables 动态定义 DAG,甚至是工作人员的本地文件(如果您有多个工作人员,这会更复杂)。 None 两者都是“良好做法”,但您仍然可以使用它。对于 Variables 选项,您可以有两个 DAG,一个用于查询 API 和设置配置变量,另一个用于读取它、遍历它并创建任务。这也不是很推荐,因为每次执行 Variable.get("variable_name").

时都会创建到元数据数据库的连接

正如@Elad 所提到的,通过 XComs 是不可能的,甚至在 Airflow 2.1 中也是如此(我尝试了一些解决方案但没有奏效)。

Airflow 不支持根据前面步骤的输出动态创建任务(运行 时间)。 Airflow 中 DAG 的动态特性是在 DAG 文件解析时已知的值。 docs 状态:

Workflows are expected to be mostly static or slowly changing. You can think of the structure of the tasks in your workflow as slightly more dynamic than a database structure would be. Airflow workflows are expected to look similar from a run to the next, this allows for clarity around unit of work and continuity.

就是说 - 任务可以共享元数据(文件路径、文件名等...)

您没有对您的用例进行过多解释,因此很难给出建议。我可以提出以下建议:在 ETL 中,按日期划分数据更容易。因此,例如每个分支可以是 month/week。例如,您可以将 n 视为一年中的几周。然后你有 52 个分支 (n=52) 并根据它的 creation_date (转换为一年中的一周)将每个文件关联到分支。如果在特定的 运行 中,一个分支没有要处理的文件,它就会被跳过。 该领域有许多可能的解决方案。