Airflow - 关于批处理作业的问题和 运行 DagRun 中的任务多次
Airflow - Questions on batch jobs and running a task in a DagRun multiple times
我正在尝试解决以下气流问题:
我有一个数据管道,我想在其中 运行 处理多个 excel 文档(例如:每天 5,000 个 excel 文件)。我对 DAG 的想法如下:
任务 1 = 获取一个 excel 文件,并向其中添加一个新的 sheet。
任务 2 = 将返回的 excel 转换为 PDF。
DAG 中的任务 1 和 2 将通过 API 调用调用处理工具 运行ning 外部气流(因此实际数据处理不会发生在气流内部)。
我似乎在兜圈子,想找出这个工作流程的最佳方法。我一直有的一些问题是:
- 每个 DagRun 应该是一个 excel,还是 DagRun 应该分批处理
excels?
- 如果批量接收(我认为这是正确的方法),建议的批量是多少?
- 我如何将任务 1 的返回值传递给任务 2。它会是一个 XCOM 字典并引用每个新保存的 excel 吗?我在某处读到 xcom 的最大大小应为 48kb。因此,如果我有 5,000 个 excel 文件路径的 XCOM,它可能会大于 48kb。
- 我遇到的最后一个也是最棘手的问题是,我显然希望在任务 1 中的 1 excel 完成后立即开始处理任务 2,因为我不想等待整个批次任务 1 在开始任务 2 之前完成。对于任务 1 产生的每个新结果,我如何在同一个 DagRun 中多次 运行 任务 2?或者任务 2 应该是它自己的 DAG?
我处理这个问题的方式是否正确?我应该如何解决这个问题?
假设
我做了一些假设,因为我不知道 Excel 文件处理的所有细节:
- 您无法合并 Excel 个文件,因为您需要将它们分开。
- Excel 文件可从 Airflow DAG(相同文件系统或类似文件系统)访问。
如有不实之处,请予以澄清。
答案
话虽这么说,我会先回答你的问题,然后评论一些想法:
- 我认为你可以分批进行,因为每个文件使用一个 运行 会非常慢(主要是因为调度程序时间,这会增加 Excel 文件处理之间的时间)。您也没有使用所有可用资源,因此最好将 Airflow 推得更忙。
- 批处理量将取决于处理负载和任务设计。根据你的问题,我假设你正在考虑将批处理放在任务中,但是如果处理 Excel 文件的服务可以处理良好的并行性,我宁愿建议每个 Excel 文件一个任务。拥有 5000 个任务(每个文件一个)将不是一个好主意(因为这会很困难,所以请参阅 UI),但每批处理的确切数量主要取决于您的资源和服务 SLA。
- 根据我的经验,我建议对所有事情使用一个任务,因为您可以并行调用服务,并且在服务完成后,您可以直接将 Excel 文件转换为 PDF。
- 问题 #3 的答案解决了这个问题。
解决方案概述
我想象的解决方案是这样的:
- 检查挂起文件是否存在的第一个任务。您可以使用 BranchPythonOperator (example here).
进行分叉
- 然后您有 X 个并行任务要处理 Excel(调用服务)并将其转换为 PDF。可能是一项
PythonOperator
任务。如果你使用 Airflow 2,你可以简单地使用 @task()
decorator 来简化代码。例如,X
可以是 10 到 100,具体取决于资源和服务吞吐量。
- 有一个最终任务会再次触发 DAG 以处理更多文件。这可以使用 TriggerDagRunOperator (example here).
来实现
我正在尝试解决以下气流问题:
我有一个数据管道,我想在其中 运行 处理多个 excel 文档(例如:每天 5,000 个 excel 文件)。我对 DAG 的想法如下:
任务 1 = 获取一个 excel 文件,并向其中添加一个新的 sheet。
任务 2 = 将返回的 excel 转换为 PDF。
DAG 中的任务 1 和 2 将通过 API 调用调用处理工具 运行ning 外部气流(因此实际数据处理不会发生在气流内部)。
我似乎在兜圈子,想找出这个工作流程的最佳方法。我一直有的一些问题是:
- 每个 DagRun 应该是一个 excel,还是 DagRun 应该分批处理 excels?
- 如果批量接收(我认为这是正确的方法),建议的批量是多少?
- 我如何将任务 1 的返回值传递给任务 2。它会是一个 XCOM 字典并引用每个新保存的 excel 吗?我在某处读到 xcom 的最大大小应为 48kb。因此,如果我有 5,000 个 excel 文件路径的 XCOM,它可能会大于 48kb。
- 我遇到的最后一个也是最棘手的问题是,我显然希望在任务 1 中的 1 excel 完成后立即开始处理任务 2,因为我不想等待整个批次任务 1 在开始任务 2 之前完成。对于任务 1 产生的每个新结果,我如何在同一个 DagRun 中多次 运行 任务 2?或者任务 2 应该是它自己的 DAG?
我处理这个问题的方式是否正确?我应该如何解决这个问题?
假设
我做了一些假设,因为我不知道 Excel 文件处理的所有细节:
- 您无法合并 Excel 个文件,因为您需要将它们分开。
- Excel 文件可从 Airflow DAG(相同文件系统或类似文件系统)访问。
如有不实之处,请予以澄清。
答案
话虽这么说,我会先回答你的问题,然后评论一些想法:
- 我认为你可以分批进行,因为每个文件使用一个 运行 会非常慢(主要是因为调度程序时间,这会增加 Excel 文件处理之间的时间)。您也没有使用所有可用资源,因此最好将 Airflow 推得更忙。
- 批处理量将取决于处理负载和任务设计。根据你的问题,我假设你正在考虑将批处理放在任务中,但是如果处理 Excel 文件的服务可以处理良好的并行性,我宁愿建议每个 Excel 文件一个任务。拥有 5000 个任务(每个文件一个)将不是一个好主意(因为这会很困难,所以请参阅 UI),但每批处理的确切数量主要取决于您的资源和服务 SLA。
- 根据我的经验,我建议对所有事情使用一个任务,因为您可以并行调用服务,并且在服务完成后,您可以直接将 Excel 文件转换为 PDF。
- 问题 #3 的答案解决了这个问题。
解决方案概述
我想象的解决方案是这样的:
- 检查挂起文件是否存在的第一个任务。您可以使用 BranchPythonOperator (example here). 进行分叉
- 然后您有 X 个并行任务要处理 Excel(调用服务)并将其转换为 PDF。可能是一项
PythonOperator
任务。如果你使用 Airflow 2,你可以简单地使用@task()
decorator 来简化代码。例如,X
可以是 10 到 100,具体取决于资源和服务吞吐量。 - 有一个最终任务会再次触发 DAG 以处理更多文件。这可以使用 TriggerDagRunOperator (example here). 来实现