气流:如何删除 DAG?

Airflow: how to delete a DAG?

我已经启动了 Airflow 网络服务器并安排了一些 dags。我可以在 Web GUI 上看到 dags。

如何删除特定的 DAG,使其不再 运行 并显示在 Web GUI 中?是否有 Airflow CLI 命令可以执行此操作?

我环顾四周,但找不到有关在加载和计划后删除 DAG 的简单方法的答案。

Airflow 中没有任何内置功能可以为您做到这一点。要删除 DAG,请将其从存储库中删除并删除 Airflow Metastore 中的数据库条目 table - dag.

我刚刚编写了一个脚本,用于删除与特定 dag 相关的所有内容,但这仅适用于 MySQL。如果您使用的是 PostgreSQL,则可以编写不同的连接器方法。最初由 Lance 在 https://groups.google.com/forum/#!topic/airbnb_airflow/GVsNsUxPRC0 上发布的命令 我只是把它放在脚本中。希望这可以帮助。格式:pythonscript.pydag_id

import sys
import MySQLdb

dag_input = sys.argv[1]

query = {'delete from xcom where dag_id = "' + dag_input + '"',
        'delete from task_instance where dag_id = "' + dag_input + '"',
        'delete from sla_miss where dag_id = "' + dag_input + '"',
        'delete from log where dag_id = "' + dag_input + '"',
        'delete from job where dag_id = "' + dag_input + '"',
        'delete from dag_run where dag_id = "' + dag_input + '"',
        'delete from dag where dag_id = "' + dag_input + '"' }

def connect(query):
        db = MySQLdb.connect(host="hostname", user="username", passwd="password", db="database")
        cur = db.cursor()
        cur.execute(query)
        db.commit()
        db.close()
        return

for value in query:
        print value
        connect(value)

不确定为什么 Apache Airflow 没有明显且简单的方法来删除 DAG

归档https://issues.apache.org/jira/browse/AIRFLOW-1002

这是我使用默认 connection_id 的 PostgresHook 改编的代码。

import sys
from airflow.hooks.postgres_hook import PostgresHook

dag_input = sys.argv[1]
hook=PostgresHook( postgres_conn_id= "airflow_db")

for t in ["xcom", "task_instance", "sla_miss", "log", "job", "dag_run", "dag" ]:
    sql="delete from {} where dag_id='{}'".format(t, dag_input)
    hook.run(sql, True)

我编写了一个脚本,用于删除与默认 SQLite 数据库的特定 dag 相关的所有元数据。这是基于上面耶稣的回答,但从 Postgres 改编为 SQLite。用户应将 ../airflow.db 设置为相对于默认 airflow.db 文件(通常为 ~/airflow)存储 script.py 的位置。要执行,请使用 python script.py dag_id

import sqlite3
import sys

conn = sqlite3.connect('../airflow.db')
c = conn.cursor()

dag_input = sys.argv[1]

for t in ["xcom", "task_instance", "sla_miss", "log", "job", "dag_run", "dag" ]:
    query = "delete from {} where dag_id='{}'".format(t, dag_input)
    c.execute(query)

conn.commit()
conn.close()

您可以清除一组任务实例,就好像它们从未 运行 具有:

airflow clear dag_id -s 2017-1-23 -e 2017-8-31

然后从 dags 文件夹中删除 dag 文件

对于那些仍在寻找答案的人。在Airflow 1.8版本上,很难删除DAG,可以参考上面的回答。但既然 1.9 已经发布,你只需要

remove the dag on the dags folder and restart webserver

编辑 8/27/18 - Airflow 1.10 现已在 PyPI 上发布!

https://pypi.org/project/apache-airflow/1.10.0/


如何彻底删除DAG

我们现在在 Airflow ≥ 1.10 中拥有此功能!

PR #2199 (Jira: AIRFLOW-1002) 将 DAG 删除添加到 Airflow 现已合并,这允许从所有相关表中完全删除 DAG 的条目。

核心delete_dag(...) code is now part of the experimental API, and there are entrypoints available via the CLI and also via the REST API.

CLI:

airflow delete_dag my_dag_id

REST API(运行本地网络服务器):

curl -X "DELETE" http://127.0.0.1:8080/api/experimental/dags/my_dag_id

关于 REST 的警告 API:确保您的 Airflow 集群 uses authentication 在生产中。

安装/升级到 Airflow 1.10(当前)

要升级,运行要么:

export SLUGIFY_USES_TEXT_UNIDECODE=yes

或:

export AIRFLOW_GPL_UNIDECODE=yes

然后:

pip install -U apache-airflow

记得先查看 UPDATING.md 了解完整详情!

从 dags 文件夹中删除 dag(您要删除的)和 运行 airflow resetdb

或者,您可以进入 airflow_db 并从 dag 表(task_fail、xcom、task_instance、sla_miss、日志、作业中手动删除这些条目, dag_run, dag, dag_stats).

版本 >= 1.10.0:

我有 airflow 版本 1.10.2,我尝试执行 airflow delete_dag 命令,但该命令抛出以下错误:

bash-4.2# 气流 delete_dag dag_id

[2019-03-16 15:37:20,804] {settings.py:174} 信息 - settings.configure_orm():使用池设置。 pool_size=5, pool_recycle=1800, pid=28224 /usr/lib64/python2.7/site-packages/psycopg2/init.py:144:用户警告:psycopg2 轮包将从 2.8 版开始重命名;为了继续从二进制安装,请改用 "pip install psycopg2-binary"。详情见:http://initd.org/psycopg/docs/install.html#binary-install-from-pypi。 """) 这将删除与指定 DAG 相关的所有现有记录。继续? (y/n)y 追溯(最近一次通话): 文件“/usr/bin/airflow”,第 32 行,位于 args.func(参数) 文件“/usr/lib/python2.7/site-packages/airflow/utils/cli.py”,第 74 行,在包装器中 return f(*args, **kwargs) 文件“/usr/lib/python2.7/site-packages/airflow/bin/cli.py”,第 258 行,在 delete_dag 中 提高 AirflowException(错误) airflow.exceptions.AirflowException: 服务器错误

虽然我可以通过 Curl 命令删除。 如果有人知道这个命令的执行,请告诉我,这是已知的还是我做错了什么。

版本 <= 1.9.0:

没有删除dag的命令,所以你需要先删除dag文件,然后从airflow元数据数据库中删除所有对dag_id的引用。

警告

您可以重置 airflow 元数据库,您将删除所有内容,包括 dags,但请记住,您还将删除历史记录、池、变量等。

airflow resetdb 然后 airflow initdb

A​​irflow 1.10.1 已经发布。此版本添加了从文件系统中删除相应 DAG 后从 Web UI 删除 DAG 的功能。

查看此票以了解更多详情:

[AIRFLOW-2657] 添加从 Web 删除 DAG 的功能 ui

请注意,这实际上并没有从文件系统中删除 DAG,您需要先手动执行此操作,否则将重新加载 DAG。

根据@OlegYamin 的回答,我正在执行以下操作以删除由 postgres 支持的 dag,其中 airflow 使用 public 架构。

delete from public.dag_pickle where id = (
    select pickle_id from public.dag where dag_id = 'my_dag_id'
);
delete from public.dag_run where dag_id = 'my_dag_id';
delete from public.dag_stats where dag_id = 'my_dag_id';
delete from public.log where dag_id = 'my_dag_id';
delete from public.sla_miss where dag_id = 'my_dag_id';
delete from public.task_fail where dag_id = 'my_dag_id';
delete from public.task_instance where dag_id = 'my_dag_id';
delete from public.xcom where dag_id = 'my_dag_id';
delete from public.dag where dag_id = 'my_dag_id';

警告:我不知道第一个删除查询的effect/correctness。只是假设需要它。

Airflow 1.10可以删除DAG-s,但流程和操作顺序必须正确。 有一个 "egg and chicken problem" - 如果您从前端删除 DAG 而文件仍然存在,DAG 将重新加载(因为文件未被删除)。如果您先删除该文件并刷新页面,那么 DAG 将无法再从 web gui 中删除。 因此,让我从前端删除 DAG 的操作顺序是:

  1. 删除 DAG 文件(在我的例子中,从管道存储库中删除并部署到气流服务器,尤其是调度程序)
  2. 请勿刷新 Web GUI。
  3. 在 DAG 视图(普通首页)的 Web GUI 中,单击 "Delete dag" -> 最右侧的红色图标。
  4. 它会从数据库中清除此 DAG 的所有剩余部分。

只需将其从 mysql 中删除,对我来说效果很好。从下表中删除它们:

  • dag

  • dag_constructor

  • dag_group_ship
  • dag_pickle
  • dag_run
  • dag_stats

(未来版本可能会有更多表格) 然后重启 webserver 和 worker。

首先 --> 从 $AIRFLOW_HOME/dags 文件夹中删除 DAG 文件。 注意:根据您是否使用过子目录,您可能需要翻遍子目录才能找到 DAG 文件并将其删除。

第二 --> 使用删除按钮(圆圈中的 x)

从 Web 服务器删除 DAG UI

对于那些可以直接访问 airflow 数据库的 Postgres psql 控制台的用户,您可以简单地执行以下请求来删除 DAG:

\set dag_id YOUR_DAG_ID

delete from xcom where dag_id=:'dag_id';
delete from task_instance where dag_id=:'dag_id';
delete from sla_miss where dag_id=:'dag_id';
delete from log where dag_id=:'dag_id';
delete from job where dag_id=:'dag_id';
delete from dag_run where dag_id=:'dag_id';
delete from dag where dag_id=:'dag_id';

类似的(稍作改动)查询适用于其他数据库,例如 MySQL 和 SQLite。

在新的气流版本中,UI 中有一个删除 dag(红色 x)按钮,在 DAGs

旁边

如果您使用 Docker 到 运行 Airflow,您可以使用 DAG 中的 BashOperator 删除另一个 DAG:

t1 = BashOperator(task_id='delete_dag_task', bash_command=f'airflow dags delete -y {dag_id}')

其中 dag_id 是 dag 的名称。这使用标准的 CLI 命令而不是自己从元数据库中删除记录。您还需要使用 PythonOperator.

从 dags 目录中删除 DAG 文件

我有这样一个 DAG:

from airflow import DAG
from airflow.utils.dates import days_ago
from airflow.operators.python_operator import PythonOperator
from airflow.operators.bash import BashOperator
import os

# These args will get passed on to each operator
# You can override them on a per-task basis during operator initialization
default_args = {
    'start_date': days_ago(1),
    'owner': 'airflow',
    'retries': 1
}


def delete_dag(**context):
    conf = context["dag_run"].conf
    dag_id = conf["dag_name"]
    t1 = BashOperator(task_id='delete_dag_task', bash_command=f'airflow dags delete -y {dag_id}')
    t1.execute(context=context)


def delete_dag_file(**context):
    conf = context["dag_run"].conf
    dag_id = conf["dag_name"]
    script_dir = os.path.dirname(__file__)
    dag_file_path = os.path.join(script_dir, '{}.py'.format(dag_id))
    try:
        os.remove(dag_file_path)
    except OSError:
        pass


with DAG('dag-deleter',
         schedule_interval=None,
         default_args=default_args,
         is_paused_upon_creation=False,
         catchup=False) as dag:

    delete_dag = PythonOperator(
        task_id="delete_dag",
        python_callable=delete_dag,
        provide_context=True)

    delete_dag_file = PythonOperator(
        task_id="delete_dag_file",
        python_callable=delete_dag_file,
        provide_context=True
    )

    delete_dag >> delete_dag_file

然后我使用 REST API 触发 DAG,在 http 请求中传递以下有效负载:

{"conf": {"dag_name": "my_dag_name"} }