气流 | DAG 是如何开始的

Airflow | How DAG got started

有谁知道如何获取 DAG 的启动方式(无论是在调度程序上还是手动启动)?我正在使用 Airflow 2.1。

我有一个 运行 按小时计算的 DAG,但有时我会 运行 手动测试它。我想捕获 DAG 是如何开始的,并将该值传递给 table 中我正在保存一些数据的列。这将允许我根据计划或手动启动进行过滤并过滤测试信息。

谢谢!

从执行上下文,例如提供给 PythonOperatorpython_callable,您可以访问与当前执行相关的 DagRun 对象:

def _print_dag_run(**kwargs):
    dag_run: DagRun = kwargs["dag_run"]
    print(f"Run type: {dag_run.run_type}")
    print(f"Externally triggered ?: {dag_run.external_trigger}")

日志输出:

[2021-09-08 18:53:52,188] {taskinstance.py:1300} INFO - Exporting the following env vars:
AIRFLOW_CTX_DAG_OWNER=airflow
AIRFLOW_CTX_DAG_ID=example_dagRun_info
AIRFLOW_CTX_TASK_ID=python_task
AIRFLOW_CTX_EXECUTION_DATE=2021-09-07T00:00:00+00:00
AIRFLOW_CTX_DAG_RUN_ID=backfill__2021-09-07T00:00:00+00:00
Run type: backfill
Externally triggered ?: False

dag_run.run_type 将是:“手动”、“预定”或“回填”。 (不知道还有没有)

external_trigger 文档:

external_trigger (bool) -- whether this dag run is externally triggered

您也可以使用 jinja 访问模板字段中的 default vairables,有一个代表 dag_run 对象的变量:

    bash_task = BashOperator(
        task_id="bash_task",
        bash_command="echo dag_run type is: {{ dag_run.run_type }}",
    )

完整的 DAG:

from airflow import DAG
from airflow.models.dagrun import DagRun
from airflow.operators.python import PythonOperator
from airflow.utils.dates import days_ago


default_args = {
    "owner": "airflow",
}

def _print_dag_run(**kwargs):
    dag_run: DagRun = kwargs["dag_run"]

    print(f"Run type: {dag_run.run_type}")
    print(f"Externally triggered ?: {dag_run.external_trigger}")


dag = DAG(
    dag_id="example_dagRun_info",
    default_args=default_args,
    start_date=days_ago(1),
    schedule_interval="@once",
    tags=["example_dags", "params"],
    catchup=False,
)
with dag:

    python_task = PythonOperator(
        task_id="python_task",
        python_callable=_print_dag_run,
    )

    bash_task = BashOperator(
        task_id="bash_task",
        bash_command="echo dag_run type is: {{ dag_run.run_type }}",
    )