如何从气流 DAG 中 运行 SQL 查询气流元数据库?

How do I run SQL queries on the airflow meta db from inside an airflow DAG?

我正在寻找一种方法来提取 DAG 中任务的最后一个成功 运行 实例的 execution_date、start_date、end_date 等,并且然后如果某个分支在一周内没有被触发,则决定引发错误。

有没有一种方法可以 运行 SQL 查询气流元数据库以查找任务的最后一个成功 运行 实例并从中提取必要的信息入口?我查看了文档,但没有任何用处。

您可以创建一个 PythonOperator 任务来从 Metastore 查询任务实例状态。 Airflow 通过 SQLAlchemy、Python ORM 框架在内部查询数据库。您也可以使用它从您的任务中查询数据库。例如:

import datetime

from airflow import DAG
from airflow.models import TaskInstance
from airflow.operators.empty import EmptyOperator
from airflow.operators.python import PythonOperator
from airflow.utils import timezone
from airflow.utils.session import provide_session
from airflow.utils.state import State

with DAG(dag_id="so_72230617", schedule_interval="@daily", start_date=datetime.datetime(2022, 5, 1)) as dag:

    first = EmptyOperator(task_id="first")

    @provide_session
    def _fetch_last_successful_ti(session=None):
        ti_exists_in_last_week = session.query(
            session.query(TaskInstance)
            .filter(
                TaskInstance.dag_id == "so_72230617",
                TaskInstance.task_id == "first",
                TaskInstance.state == State.SUCCESS,
                TaskInstance.execution_date >= timezone.utcnow() - datetime.timedelta(weeks=1),
            )
            .exists()
        ).scalar()

        if not ti_exists_in_last_week:
            raise Exception("No successful 'first' task instance found with execution_date in the last week.")

    fetch_last_successful_ti = PythonOperator(
        task_id="fetch_last_successful_ti", python_callable=_fetch_last_successful_ti
    )

    first >> fetch_last_successful_ti

这里的技巧是@provide_session,它会初始化一个数据库会话对象,您可以使用它来使用 Airflow 对象和 SQLAlchemy 查询数据库。