Airflow - 关于批处理作业的问题和 运行 DagRun 中的任务多次

Airflow - Questions on batch jobs and running a task in a DagRun multiple times

我正在尝试解决以下气流问题:

我有一个数据管道,我想在其中 运行 处理多个 excel 文档(例如:每天 5,000 个 excel 文件)。我对 DAG 的想法如下:

任务 1 = 获取一个 excel 文件,并向其中添加一个新的 sheet。

任务 2 = 将返回的 excel 转换为 PDF。

DAG 中的任务 1 和 2 将通过 API 调用调用处理工具 运行ning 外部气流(因此实际数据处理不会发生在气流内部)。

我似乎在兜圈子,想找出这个工作流程的最佳方法。我一直有的一些问题是:

我处理这个问题的方式是否正确?我应该如何解决这个问题?

假设

我做了一些假设,因为我不知道 Excel 文件处理的所有细节:

  1. 您无法合并 Excel 个文件,因为您需要将它们分开。
  2. Excel 文件可从 Airflow DAG(相同文件系统或类似文件系统)访问。

如有不实之处,请予以澄清。

答案

话虽这么说,我会先回答你的问题,然后评论一些想法:

  1. 我认为你可以分批进行,因为每个文件使用一个 运行 会非常慢(主要是因为调度程序时间,这会增加 Excel 文件处理之间的时间)。您也没有使用所有可用资源,因此最好将 Airflow 推得更忙。
  2. 批处理量将取决于处理负载和任务设计。根据你的问题,我假设你正在考虑将批处理放在任务中,但是如果处理 Excel 文件的服务可以处理良好的并行性,我宁愿建议每个 Excel 文件一个任务。拥有 5000 个任务(每个文件一个)将不是一个好主意(因为这会很困难,所以请参阅 UI),但每批处理的确切数量主要取决于您的资源和服务 SLA。
  3. 根据我的经验,我建议对所有事情使用一个任务,因为您可以并行调用服务,并且在服务完成后,您可以直接将 Excel 文件转换为 PDF。
  4. 问题 #3 的答案解决了这个问题。

解决方案概述

我想象的解决方案是这样的:

  • 检查挂起文件是否存在的第一个任务。您可以使用 BranchPythonOperator (example here).
  • 进行分叉
  • 然后您有 X 个并行任务要处理 Excel(调用服务)并将其转换为 PDF。可能是一项 PythonOperator 任务。如果你使用 Airflow 2,你可以简单地使用 @task() decorator 来简化代码。例如,X 可以是 10 到 100,具体取决于资源和服务吞吐量。
  • 有一个最终任务会再次触发 DAG 以处理更多文件。这可以使用 TriggerDagRunOperator (example here).
  • 来实现