Airflow dags 生命周期事件

Airflow dags lifecycle events

我正在尝试通过 java 后端管理气流 dag(创建、执行等)。目前,在创建 dag 并将其放置在气流的 dags 文件夹中后,我的后端不断尝试 运行 dag。但它不能 运行 它直到它被气流调度程序拾取,如果 dag 的数量更多,这可能需要相当长的时间。我想知道是否有气流发出的任何事件,我可以点击这些事件来检查调度程序处理的新 dag,然后触发,从我的后端执行命令。或者有没有一种方法或配置可以让气流在处理后自动启动 dag 而不是我们触发它?

is there a way or configuration where airflow will automatically start a dag once it processes it rather than we triggering it ?

是的,您可以定义的参数之一是 is_paused_upon_creation

如果您将 DAG 设置为:

DAG(
    dag_id='tutorial',
    default_args=default_args,
    description='A simple tutorial DAG',
    schedule_interval="@daily",
    start_date=datetime(2020, 12, 28),
    is_paused_upon_creation=False
)

DAG 将在调度程序拾取后立即启动(假设满足 运行 的条件)

I am wondering if there any events that airflow emits which I can tap to check for new dags processed by scheduler

在 Airflow >=2.0.0 中,您可以使用 API - list dags endpoint 获取 dagbag

中的所有 dag

在任何 Airflow 版本中,您都可以使用此代码列出 dag_ids:

from airflow.models import DagBag
print(DagBag().dag_ids())