确定 DAG 是否正在执行
Determining if a DAG is executing
我正在使用带有自定义 SFTPOperator 的 Airflow 1.9.0。我的 DAG 中有代码可以轮询 SFTP 站点以查找新文件。如果找到任何内容,那么我会为动态创建的任务和 retrieve/delete 文件创建自定义任务 ID。
directory_list = sftp_handler('sftp-site', None, '/', None, SFTPToS3Operation.LIST)
for file_path in directory_list:
... SFTP code that GET's the remote files
那部分工作正常。似乎 airflow 网络服务器和 airflow 调度程序都在每秒迭代一次所有 DAG,并且实际上 运行 正在检索 directory_list 的代码。这意味着我正在访问 SFTP 站点 ~2 x 秒来验证并提取文件列表。我想要一些条件代码,只有在 DAG 实际上是 运行.
时才会执行
当 SFTP 站点使用密码身份验证时,我连接的次数确实不是问题。一个站点需要密钥认证,如果短时间内认证失败次数过多,则该帐户被锁定。在我的测试过程中,这似乎偶尔会发生,原因我仍在努力寻找。
但是,如果我仅在 DAG 计划执行时或手动执行时进行身份验证,这将不是问题。在没有计划的情况下花费大量时间连接到 SFTP 站点似乎也很浪费。
我见过 post 可以检查任务是否正在执行,但这并不理想,因为我必须创建一个长 运行ning 任务,耗尽我不需要的资源,只是为了执行该测试。关于如何实现这一点有什么想法吗?
您有一个很好的 Airflow 用例(SFTP 到 _____ 批处理作业),但 Airflow 不适用于动态 DAG,因为您正在尝试使用它们。
顶级 DAG 代码和调度程序循环
如您所见,DAG 中的任何顶级代码都在每个调度程序循环中执行。或者换句话说,每次调度程序循环处理 DAG 目录中的文件时,它都会解释 DAG 文件中的所有代码。任何不在任务或操作员中的东西都会立即 interpreted/executed 。这会给调度程序以及您正在调用的任何外部系统带来过度压力。
动态 DAG 和 Airflow UI
Airflow 无法通过 UI 很好地处理动态 DAG。这主要是 Airflow DAG 状态未存储在数据库中的结果。 DAG 视图和历史是根据任何给定时刻解释的 DAG 文件中存在的内容呈现的。我个人希望通过某种形式的 DAG 版本控制在未来看到这种变化。
在动态 DAG 中,您可以在 DAG 中添加和删除任务。
动态添加任务
为 DAG 添加任务时 运行 将显示(在 UI 中)所有 DAG
运行s 之前那个任务从来没有运行那个任务。将具有 None 状态
DAG 运行 将根据结果设置为 success
或 failed
DAG 运行.
动态删除任务
如果您的动态 DAG 删除了任务,您将无法查看 DAG 的历史记录。例如,如果您 运行 一个在前 20 个 DAG 运行 中具有 task_x
的 DAG,但之后将其删除,它将无法显示在 UI 中,直到它被添加回 DAG。
幂等性和气流
当 DAG 运行 为 idempotent 时,气流效果最佳。这意味着重新 运行 任何 DAG 运行 应该有相同的影响,无论你什么时候 运行 它或者你 运行 它多少次。 Airflow 中的动态 DAG 通过向先前的 DAG 运行 添加和删除任务来破坏幂等性,因此重新 运行 的结果不一样。
解决方案选项
您至少有两个选择
1.) 继续动态构建 SFTP DAG,但创建另一个 DAG,将可用的 SFTP 文件写入本地文件(如果 不 使用分布式执行器)或 Airflow Variable(这将导致对 Airflow 数据库的更多读取)并从中动态构建您的 DAG。
2.) 重载 SFTPOperator 以获取文件列表,以便在单个任务 运行 中处理存在的每个文件。这将使 DAG 幂等,您将通过日志维护准确的历史记录。
对于扩展的解释,我深表歉意,但您触及了 Airflow 的一个粗糙点,我认为对手头的问题进行概述是合适的。
我正在使用带有自定义 SFTPOperator 的 Airflow 1.9.0。我的 DAG 中有代码可以轮询 SFTP 站点以查找新文件。如果找到任何内容,那么我会为动态创建的任务和 retrieve/delete 文件创建自定义任务 ID。
directory_list = sftp_handler('sftp-site', None, '/', None, SFTPToS3Operation.LIST)
for file_path in directory_list:
... SFTP code that GET's the remote files
那部分工作正常。似乎 airflow 网络服务器和 airflow 调度程序都在每秒迭代一次所有 DAG,并且实际上 运行 正在检索 directory_list 的代码。这意味着我正在访问 SFTP 站点 ~2 x 秒来验证并提取文件列表。我想要一些条件代码,只有在 DAG 实际上是 运行.
时才会执行当 SFTP 站点使用密码身份验证时,我连接的次数确实不是问题。一个站点需要密钥认证,如果短时间内认证失败次数过多,则该帐户被锁定。在我的测试过程中,这似乎偶尔会发生,原因我仍在努力寻找。
但是,如果我仅在 DAG 计划执行时或手动执行时进行身份验证,这将不是问题。在没有计划的情况下花费大量时间连接到 SFTP 站点似乎也很浪费。
我见过 post 可以检查任务是否正在执行,但这并不理想,因为我必须创建一个长 运行ning 任务,耗尽我不需要的资源,只是为了执行该测试。关于如何实现这一点有什么想法吗?
您有一个很好的 Airflow 用例(SFTP 到 _____ 批处理作业),但 Airflow 不适用于动态 DAG,因为您正在尝试使用它们。
顶级 DAG 代码和调度程序循环
如您所见,DAG 中的任何顶级代码都在每个调度程序循环中执行。或者换句话说,每次调度程序循环处理 DAG 目录中的文件时,它都会解释 DAG 文件中的所有代码。任何不在任务或操作员中的东西都会立即 interpreted/executed 。这会给调度程序以及您正在调用的任何外部系统带来过度压力。
动态 DAG 和 Airflow UI
Airflow 无法通过 UI 很好地处理动态 DAG。这主要是 Airflow DAG 状态未存储在数据库中的结果。 DAG 视图和历史是根据任何给定时刻解释的 DAG 文件中存在的内容呈现的。我个人希望通过某种形式的 DAG 版本控制在未来看到这种变化。
在动态 DAG 中,您可以在 DAG 中添加和删除任务。
动态添加任务
为 DAG 添加任务时 运行 将显示(在 UI 中)所有 DAG
运行s 之前那个任务从来没有运行那个任务。将具有 None 状态
DAG 运行 将根据结果设置为 success
或 failed
DAG 运行.
动态删除任务
如果您的动态 DAG 删除了任务,您将无法查看 DAG 的历史记录。例如,如果您 运行 一个在前 20 个 DAG 运行 中具有 task_x
的 DAG,但之后将其删除,它将无法显示在 UI 中,直到它被添加回 DAG。
幂等性和气流 当 DAG 运行 为 idempotent 时,气流效果最佳。这意味着重新 运行 任何 DAG 运行 应该有相同的影响,无论你什么时候 运行 它或者你 运行 它多少次。 Airflow 中的动态 DAG 通过向先前的 DAG 运行 添加和删除任务来破坏幂等性,因此重新 运行 的结果不一样。
解决方案选项
您至少有两个选择
1.) 继续动态构建 SFTP DAG,但创建另一个 DAG,将可用的 SFTP 文件写入本地文件(如果 不 使用分布式执行器)或 Airflow Variable(这将导致对 Airflow 数据库的更多读取)并从中动态构建您的 DAG。
2.) 重载 SFTPOperator 以获取文件列表,以便在单个任务 运行 中处理存在的每个文件。这将使 DAG 幂等,您将通过日志维护准确的历史记录。
对于扩展的解释,我深表歉意,但您触及了 Airflow 的一个粗糙点,我认为对手头的问题进行概述是合适的。