Airflow schedule single dag for multiple schedules 全天

Airflow schedule single dag for multiple schedules throughout the day

我想要一个 DAG 从 FTP 下载数据,我不需要 FTP 中的所有数据,只需要某些文件。这些文件每天都会在一天中的特定时间上传,我想在这些文件在 FTP 网站上可用后不久检索它们。

Ex FTP 时间表:

/Data/US/XASE/yyyymmdd.csv #uploaded daily at 9:00 PM UTC
/Data/EU/TRWB/yyyymmdd.csv #uploaded daily at 1:00 PM UTC
...
/Data/EU/XEUR/yyyymmdd.csv #uploaded daily at 11:00 AM UTC

如何在 dag 中设置调度程序,以便我可以在上传数据时从 FTP 站点复制数据,而不是为每个上传时间设置一个单独的 dag?

我认为您可以在此处安排三个选项。

选项 1

您 运行 正好在世界标准时间上午 11 点、下午 1 点、晚上 9 点,时间表如下 0 11,13,21 * * *。或者可能在一小时后 5 分钟添加一些缓冲 (5 11,13,21 * * *)。

选项 2

您 运行 更定期地查看 DAG 并检查文件是否可用,然后在任务中下载它们。如果文件上传延迟的可能性更高,这是有道理的。

例如 */10 10-22 * * * 会在 10:00-22:00 之间每 10 分钟 运行。

选项 3

您每天安排一次 DAG (@once),然后使用 TimeDeltaSensor。我认为这个选项是最不可取的,因为你有很多任务只是“等待” - 这可能会阻止其他气流任务的执行。

除此之外,它还很大程度上取决于您希望如何处理 FTP 本身的下载。

我猜你可以为每天下载的每个文件创建一个任务,然后将基于 BranchPythonOperator 的任务放在前面,以避免尝试多次下载同一个文件。

或者您将整个逻辑放入 PythonOperator 中,包括仅根据 execution_date.

下载某些文件的逻辑