在气流上部署 dag 文件的有效方法

Efficient way to deploy dag files on airflow

在将新的 dag 部署到 airflow 时是否遵循任何最佳实践?

我在 google 论坛上看到一些评论,指出 dags 保存在 GIT 存储库中,并且会定期同步到气流集群中的本地位置。
关于这种方法,我有几个问题

  • 我们是否为不同的环境维护单独的 dag 文件? (测试。生产)
  • 如果新版本有错误,如何处理 ETL 回滚到旧版本?

    非常感谢此处的任何帮助。如果您需要更多详细信息,请告诉我?

  • 以下是我们团队的管理方式。

    首先在命名约定方面,我们的每个 DAG 文件名 与 DAG 本身内容中的 DAG Id 相匹配(包括 DAG 版本)。这很有用,因为最终它是您在 Airflow UI 中看到的 DAG Id,因此您将确切知道每个 DAG 背后使用了哪个文件。

    像这样的 DAG 示例:

    from airflow import DAG
    from datetime import datetime, timedelta
    
    default_args = {
      'owner': 'airflow',
      'depends_on_past': False,
      'start_date': datetime(2017,12,05,23,59),
      'email': ['me@mail.com'],
      'email_on_failure': True
    }
    
    dag = DAG(
      'my_nice_dag-v1.0.9', #update version whenever you change something
      default_args=default_args,
      schedule_interval="0,15,30,45 * * * *",
      dagrun_timeout=timedelta(hours=24),
      max_active_runs=1)
      [...]
    

    DAG 文件 的名称将是:my_nice_dag-v1.0.9.py

    • 我们所有的 DAG 文件都存储在 Git 存储库中(除其他外)
    • 每次在我们的主分支中完成合并请求时,我们的持续集成管道都会开始一个新的构建并将我们的 DAG 文件打包成一个 zip(我们使用 Atlassian Bamboo,但还有其他解决方案,如 Jenkins、Circle CI , 特拉维斯...)
    • 在 Bamboo 中,我们配置了一个部署脚本 (shell),它解压缩包并将 DAG 文件放在 Airflow 服务器上的 /dags 文件夹中。
    • 我们通常将 DAG 部署在 DEV 中进行测试,然后部署到 UAT,最后部署到 PROD。由于上面提到的 shell 脚本,在 Bamboo UI 中单击一个按钮即可完成部署。

    好处

    1. 因为您在文件名中包含了 DAG 版本,DAG 文件的先前版本不会在 DAG 文件夹中被覆盖,因此您可以轻松地返回它
    2. 当您的新 DAG 文件加载到 Airflow 中时,由于版本号,您可以在 UI 中识别它。
    3. 因为您的 DAG 文件名 = DAG Id,您甚至可以通过添加一些 Airflow 命令行来改进部署脚本,以便在部署新 DAG 后自动打开它们。
    4. 因为 DAG 的每个版本都在 Git 中进行了历史记录,如果需要,我们可以随时返回到以前的版本。

    到目前为止,Airflow 还没有自己的版本控制工作流功能(请参阅 this)。 但是,您可以通过在自己的 git 存储库上管理 DAG 并将其状态作为子模块提取到气流存储库中来自行管理。通过这种方式,您始终拥有包含具有特定版本的 DAG 集的单一气流版本。多看here

    文档中写了一个最佳实践:

    Deleting a task

    Never delete a task from a DAG. In case of deletion, the historical information of the task disappears from the Airflow UI. It is advised to create a new DAG in case the tasks need to be deleted

    我相信这就是为什么版本控制问题还不是那么容易解决的原因,我们必须计划一些解决方法。

    https://airflow.apache.org/docs/apache-airflow/2.0.0/best-practices.html#deleting-a-task