如何从气流 DAG 中 运行 SQL 查询气流元数据库?
How do I run SQL queries on the airflow meta db from inside an airflow DAG?
我正在寻找一种方法来提取 DAG 中任务的最后一个成功 运行 实例的 execution_date、start_date、end_date 等,并且然后如果某个分支在一周内没有被触发,则决定引发错误。
有没有一种方法可以 运行 SQL 查询气流元数据库以查找任务的最后一个成功 运行 实例并从中提取必要的信息入口?我查看了文档,但没有任何用处。
您可以创建一个 PythonOperator 任务来从 Metastore 查询任务实例状态。 Airflow 通过 SQLAlchemy、Python ORM 框架在内部查询数据库。您也可以使用它从您的任务中查询数据库。例如:
import datetime
from airflow import DAG
from airflow.models import TaskInstance
from airflow.operators.empty import EmptyOperator
from airflow.operators.python import PythonOperator
from airflow.utils import timezone
from airflow.utils.session import provide_session
from airflow.utils.state import State
with DAG(dag_id="so_72230617", schedule_interval="@daily", start_date=datetime.datetime(2022, 5, 1)) as dag:
first = EmptyOperator(task_id="first")
@provide_session
def _fetch_last_successful_ti(session=None):
ti_exists_in_last_week = session.query(
session.query(TaskInstance)
.filter(
TaskInstance.dag_id == "so_72230617",
TaskInstance.task_id == "first",
TaskInstance.state == State.SUCCESS,
TaskInstance.execution_date >= timezone.utcnow() - datetime.timedelta(weeks=1),
)
.exists()
).scalar()
if not ti_exists_in_last_week:
raise Exception("No successful 'first' task instance found with execution_date in the last week.")
fetch_last_successful_ti = PythonOperator(
task_id="fetch_last_successful_ti", python_callable=_fetch_last_successful_ti
)
first >> fetch_last_successful_ti
这里的技巧是@provide_session
,它会初始化一个数据库会话对象,您可以使用它来使用 Airflow 对象和 SQLAlchemy 查询数据库。
我正在寻找一种方法来提取 DAG 中任务的最后一个成功 运行 实例的 execution_date、start_date、end_date 等,并且然后如果某个分支在一周内没有被触发,则决定引发错误。
有没有一种方法可以 运行 SQL 查询气流元数据库以查找任务的最后一个成功 运行 实例并从中提取必要的信息入口?我查看了文档,但没有任何用处。
您可以创建一个 PythonOperator 任务来从 Metastore 查询任务实例状态。 Airflow 通过 SQLAlchemy、Python ORM 框架在内部查询数据库。您也可以使用它从您的任务中查询数据库。例如:
import datetime
from airflow import DAG
from airflow.models import TaskInstance
from airflow.operators.empty import EmptyOperator
from airflow.operators.python import PythonOperator
from airflow.utils import timezone
from airflow.utils.session import provide_session
from airflow.utils.state import State
with DAG(dag_id="so_72230617", schedule_interval="@daily", start_date=datetime.datetime(2022, 5, 1)) as dag:
first = EmptyOperator(task_id="first")
@provide_session
def _fetch_last_successful_ti(session=None):
ti_exists_in_last_week = session.query(
session.query(TaskInstance)
.filter(
TaskInstance.dag_id == "so_72230617",
TaskInstance.task_id == "first",
TaskInstance.state == State.SUCCESS,
TaskInstance.execution_date >= timezone.utcnow() - datetime.timedelta(weeks=1),
)
.exists()
).scalar()
if not ti_exists_in_last_week:
raise Exception("No successful 'first' task instance found with execution_date in the last week.")
fetch_last_successful_ti = PythonOperator(
task_id="fetch_last_successful_ti", python_callable=_fetch_last_successful_ti
)
first >> fetch_last_successful_ti
这里的技巧是@provide_session
,它会初始化一个数据库会话对象,您可以使用它来使用 Airflow 对象和 SQLAlchemy 查询数据库。