Airflow - 按没有时间的执行日期查找特定 dag id 的 dag 运行
Airflow - find dag run of specific dag id by execution date without time
我想查找特定 dag 在特定执行日期的所有 dag 运行。
正如我在文档中看到的那样,有这个函数:
dag_runs = DagRun.find(dag_id=self.dag_name, execution_date=datetime.now())
问题在于时间也需要完全相同。有什么方法可以让我只传递日期并检索所有运行,无论白天是什么时间?
我知道我可以在之后进行过滤,从 dag_runs 开始,使用 for 循环在所需日期的所有 dags,但我想要更高效的东西,而不是从数据库中获取所有记录。
在 gcp composer 中使用 airflow 1.10.10。所以在 class DagRun 中添加一个方法对我来说不是一个选项。
对于 Airflow >= 2.0.0
你可以使用:
dag_runs = DagRun.find(
dag_id=your_dag_id,
execution_start_date=your_start_date
execution_end_date=your_end_date
)
对于 Airflow < 2.0.0
可以创建继承自 DagRun
的 MyDagRun
并向后移植所需的功能。
这是一个有效的测试代码:
from datetime import datetime
from typing import List, Optional, Union
from airflow import DAG
from airflow.models.dagrun import DagRun
from airflow.operators.python_operator import PythonOperator
from airflow.utils import timezone
from airflow.utils.db import provide_session
from sqlalchemy.orm.session import Session
class MyDagRun(DagRun):
@staticmethod
@provide_session
def find(
dag_id: Optional[Union[str, List[str]]] = None,
run_id: Optional[str] = None,
execution_date: Optional[datetime] = None,
state: Optional[str] = None,
external_trigger: Optional[bool] = None,
no_backfills: bool = False,
session: Session = None,
execution_start_date: Optional[datetime] = None,
execution_end_date: Optional[datetime] = None,
) -> List["DagRun"]:
DR = MyDagRun
qry = session.query(DR)
dag_ids = [dag_id] if isinstance(dag_id, str) else dag_id
if dag_ids:
qry = qry.filter(DR.dag_id.in_(dag_ids))
if run_id:
qry = qry.filter(DR.run_id == run_id)
if execution_date:
if isinstance(execution_date, list):
qry = qry.filter(DR.execution_date.in_(execution_date))
else:
qry = qry.filter(DR.execution_date == execution_date)
if execution_start_date and execution_end_date:
qry = qry.filter(DR.execution_date.between(execution_start_date, execution_end_date))
elif execution_start_date:
qry = qry.filter(DR.execution_date >= execution_start_date)
elif execution_end_date:
qry = qry.filter(DR.execution_date <= execution_end_date)
if state:
qry = qry.filter(DR.state == state)
if external_trigger is not None:
qry = qry.filter(DR.external_trigger == external_trigger)
if no_backfills:
# in order to prevent a circular dependency
from airflow.jobs import BackfillJob
qry = qry.filter(DR.run_id.notlike(BackfillJob.ID_PREFIX + '%'))
return qry.order_by(DR.execution_date).all()
def func(**kwargs):
dr = MyDagRun()
# Need to use timezone to avoid ValueError: naive datetime is disallowed
start = timezone.make_aware(datetime(2021, 3, 1, 9, 59, 0)) # change to your required start
end = timezone.make_aware(datetime(2021, 3, 1, 10, 1, 0)) # change to your required end
results = dr.find(execution_start_date=start,
execution_end_date=end
)
print("Execution dates met criteria are:")
for item in results:
print(item.execution_date)
return results
default_args = {
'owner': 'airflow',
'start_date': datetime(2019, 11, 1),
}
with DAG(dag_id='test',
default_args=default_args,
schedule_interval=None,
catchup=True
) as dag:
op = PythonOperator(task_id="task",
python_callable=func)
示例显示 4 个已存在的运行:
使用它选择所需运行的代码:
我想查找特定 dag 在特定执行日期的所有 dag 运行。
正如我在文档中看到的那样,有这个函数:
dag_runs = DagRun.find(dag_id=self.dag_name, execution_date=datetime.now())
问题在于时间也需要完全相同。有什么方法可以让我只传递日期并检索所有运行,无论白天是什么时间?
我知道我可以在之后进行过滤,从 dag_runs 开始,使用 for 循环在所需日期的所有 dags,但我想要更高效的东西,而不是从数据库中获取所有记录。
在 gcp composer 中使用 airflow 1.10.10。所以在 class DagRun 中添加一个方法对我来说不是一个选项。
对于 Airflow >= 2.0.0
你可以使用:
dag_runs = DagRun.find(
dag_id=your_dag_id,
execution_start_date=your_start_date
execution_end_date=your_end_date
)
对于 Airflow < 2.0.0
可以创建继承自 DagRun
的 MyDagRun
并向后移植所需的功能。
这是一个有效的测试代码:
from datetime import datetime
from typing import List, Optional, Union
from airflow import DAG
from airflow.models.dagrun import DagRun
from airflow.operators.python_operator import PythonOperator
from airflow.utils import timezone
from airflow.utils.db import provide_session
from sqlalchemy.orm.session import Session
class MyDagRun(DagRun):
@staticmethod
@provide_session
def find(
dag_id: Optional[Union[str, List[str]]] = None,
run_id: Optional[str] = None,
execution_date: Optional[datetime] = None,
state: Optional[str] = None,
external_trigger: Optional[bool] = None,
no_backfills: bool = False,
session: Session = None,
execution_start_date: Optional[datetime] = None,
execution_end_date: Optional[datetime] = None,
) -> List["DagRun"]:
DR = MyDagRun
qry = session.query(DR)
dag_ids = [dag_id] if isinstance(dag_id, str) else dag_id
if dag_ids:
qry = qry.filter(DR.dag_id.in_(dag_ids))
if run_id:
qry = qry.filter(DR.run_id == run_id)
if execution_date:
if isinstance(execution_date, list):
qry = qry.filter(DR.execution_date.in_(execution_date))
else:
qry = qry.filter(DR.execution_date == execution_date)
if execution_start_date and execution_end_date:
qry = qry.filter(DR.execution_date.between(execution_start_date, execution_end_date))
elif execution_start_date:
qry = qry.filter(DR.execution_date >= execution_start_date)
elif execution_end_date:
qry = qry.filter(DR.execution_date <= execution_end_date)
if state:
qry = qry.filter(DR.state == state)
if external_trigger is not None:
qry = qry.filter(DR.external_trigger == external_trigger)
if no_backfills:
# in order to prevent a circular dependency
from airflow.jobs import BackfillJob
qry = qry.filter(DR.run_id.notlike(BackfillJob.ID_PREFIX + '%'))
return qry.order_by(DR.execution_date).all()
def func(**kwargs):
dr = MyDagRun()
# Need to use timezone to avoid ValueError: naive datetime is disallowed
start = timezone.make_aware(datetime(2021, 3, 1, 9, 59, 0)) # change to your required start
end = timezone.make_aware(datetime(2021, 3, 1, 10, 1, 0)) # change to your required end
results = dr.find(execution_start_date=start,
execution_end_date=end
)
print("Execution dates met criteria are:")
for item in results:
print(item.execution_date)
return results
default_args = {
'owner': 'airflow',
'start_date': datetime(2019, 11, 1),
}
with DAG(dag_id='test',
default_args=default_args,
schedule_interval=None,
catchup=True
) as dag:
op = PythonOperator(task_id="task",
python_callable=func)
示例显示 4 个已存在的运行:
使用它选择所需运行的代码: