确定 DAG 是否正在执行

Determining if a DAG is executing

我正在使用带有自定义 SFTPOperator 的 Airflow 1.9.0。我的 DAG 中有代码可以轮询 SFTP 站点以查找新文件。如果找到任何内容,那么我会为动态创建的任务和 retrieve/delete 文件创建自定义任务 ID。

directory_list = sftp_handler('sftp-site', None, '/', None, SFTPToS3Operation.LIST) for file_path in directory_list: ... SFTP code that GET's the remote files

那部分工作正常。似乎 airflow 网络服务器和 airflow 调度程序都在每秒迭代一次所有 DAG,并且实际上 运行 正在检索 directory_list 的代码。这意味着我正在访问 SFTP 站点 ~2 x 秒来验证并提取文件列表。我想要一些条件代码,只有在 DAG 实际上是 运行.

时才会执行

当 SFTP 站点使用密码身份验证时,我连接的次数确实不是问题。一个站点需要密钥认证,如果短时间内认证失败次数过多,则该帐户被锁定。在我的测试过程中,这似乎偶尔会发生,原因我仍在努力寻找。

但是,如果我仅在 DAG 计划执行时或手动执行时进行身份验证,这将不是问题。在没有计划的情况下花费大量时间连接到 SFTP 站点似乎也很浪费。

我见过 post 可以检查任务是否正在执行,但这并不理想,因为我必须创建一个长 运行ning 任务,耗尽我不需要的资源,只是为了执行该测试。关于如何实现这一点有什么想法吗?

您有一个很好的 Airflow 用例(SFTP 到 _____ 批处理作业),但 Airflow 不适用于动态 DAG,因为您正在尝试使用它们。

顶级 DAG 代码和调度程序循环

如您所见,DAG 中的任何顶级代码都在每个调度程序循环中执行。或者换句话说,每次调度程序循环处理 DAG 目录中的文件时,它都会解释 DAG 文件中的所有代码。任何不在任务或操作员中的东西都会立即 interpreted/executed 。这会给调度程序以及您正在调用的任何外部系统带来过度压力。

动态 DAG 和 Airflow UI

Airflow 无法通过 UI 很好地处理动态 DAG。这主要是 Airflow DAG 状态未存储在数据库中的结果。 DAG 视图和历史是根据任何给定时刻解释的 DAG 文件中存在的内容呈现的。我个人希望通过某种形式的 DAG 版本控制在未来看到这种变化。

在动态 DAG 中,您可以在 DAG 中添加和删除任务。

动态添加任务

为 DAG 添加任务时 运行 将显示(在 UI 中)所有 DAG 运行s 之前那个任务从来没有运行那个任务。将具有 None 状态 DAG 运行 将根据结果设置为 successfailed DAG 运行.

动态删除任务

如果您的动态 DAG 删除了任务,您将无法查看 DAG 的历史记录。例如,如果您 运行 一个在前 20 个 DAG 运行 中具有 task_x 的 DAG,但之后将其删除,它将无法显示在 UI 中,直到它被添加回 DAG。

幂等性和气流 当 DAG 运行 为 idempotent 时,气流效果最佳。这意味着重新 运行 任何 DAG 运行 应该有相同的影响,无论你什么时候 运行 它或者你 运行 它多少次。 Airflow 中的动态 DAG 通过向先前的 DAG 运行 添加和删除任务来破坏幂等性,因此重新 运行 的结果不一样。

解决方案选项

您至少有两个选择

1.) 继续动态构建 SFTP DAG,但创建另一个 DAG,将可用的 SFTP 文件写入本地文件(如果 使用分布式执行器)或 Airflow Variable(这将导致对 Airflow 数据库的更多读取)并从中动态构建您的 DAG。

2.) 重载 SFTPOperator 以获取文件列表,以便在单个任务 运行 中处理存在的每个文件。这将使 DAG 幂等,您将通过日志维护准确的历史记录。

对于扩展的解释,我深表歉意,但您触及了 Airflow 的一个粗糙点,我认为对手头的问题进行概述是合适的。