具有不同调度程序间隔的 Airflow ExternalTaskSensor
Airflow ExternalTaskSensor with different scheduler interval
目前我有两个 DAG:DAG_A 和 DAG_B。两个 运行 和 schedule_interval=timedelta(days=1)
DAG_A 有一个任务 1,通常需要 7 个小时才能完成 运行。而DAG_B只需要3个小时。
DAG_B 有一个 ExternalTaskSensor(external_dag_id="DAG_A", external_task_id="Task1")
但也使用每小时生成的一些其他信息 X。
增加 DAG_B 频率以使其每天至少 运行 4 次的最佳方法是什么?据我所知,两个 DAG 必须具有相同的 schedule_interval。但是,我想尽可能多地更新 DAG_B 上的 X。
一种可能性是创建另一个具有 DAG_B 的 ExternalTaskSensor 的 DAG。但我认为这不是最好的方法。
如果我没理解错的话,你的条件是:
- 每天保持运行DAG_A
- 运行 DAG_B n 一天几次
- 每次DAG_B运行时都会等待DAG_A__Task_1完成
我认为您可以通过指示 ExternalTaskSensor
等待所需的执行日期 DAG_A.
来轻松调整当前的设计
来自 ExternalTaskSensor 运算符定义:
Waits for a different DAG or a task in a different DAG to complete for a specific execution_date
可以使用 execution_date_fn
参数定义 execution_date
:
execution_date_fn (Optional[Callable]) – function that receives the current execution date as the first positional argument and optionally any number of keyword arguments available in the context dictionary, and returns the desired execution dates to query. Either execution_delta or execution_date_fn can be passed to ExternalTaskSensor, but not both.
您可以这样定义传感器:
wait_for_dag_a = ExternalTaskSensor(
task_id='wait_for_dag_a',
external_task_id="external_task_1",
external_dag_id='dag_a_id',
allowed_states=['success', 'failed'],
execution_date_fn=_get_execution_date_of_dag_a,
poke_interval=30
)
其中 _get_execution_date_of_dag_a
使用 get_last_dagrun
对数据库执行查询,允许您获取 DAG_A 的最后一个 execution_date
.
from airflow.utils.db import provide_session
from airflow.models.dag import get_last_dagrun
@provide_session
def _get_execution_date_of_dag_a(exec_date, session=None, **kwargs):
dag_a_last_run = get_last_dagrun(
'dag_a_id', session)
return dag_a_last_run.execution_date
希望这种方法能帮到你。您可以在 this answer.
中找到工作示例
目前我有两个 DAG:DAG_A 和 DAG_B。两个 运行 和 schedule_interval=timedelta(days=1)
DAG_A 有一个任务 1,通常需要 7 个小时才能完成 运行。而DAG_B只需要3个小时。
DAG_B 有一个 ExternalTaskSensor(external_dag_id="DAG_A", external_task_id="Task1")
但也使用每小时生成的一些其他信息 X。
增加 DAG_B 频率以使其每天至少 运行 4 次的最佳方法是什么?据我所知,两个 DAG 必须具有相同的 schedule_interval。但是,我想尽可能多地更新 DAG_B 上的 X。
一种可能性是创建另一个具有 DAG_B 的 ExternalTaskSensor 的 DAG。但我认为这不是最好的方法。
如果我没理解错的话,你的条件是:
- 每天保持运行DAG_A
- 运行 DAG_B n 一天几次
- 每次DAG_B运行时都会等待DAG_A__Task_1完成
我认为您可以通过指示 ExternalTaskSensor
等待所需的执行日期 DAG_A.
来自 ExternalTaskSensor 运算符定义:
Waits for a different DAG or a task in a different DAG to complete for a specific execution_date
可以使用 execution_date_fn
参数定义 execution_date
:
execution_date_fn (Optional[Callable]) – function that receives the current execution date as the first positional argument and optionally any number of keyword arguments available in the context dictionary, and returns the desired execution dates to query. Either execution_delta or execution_date_fn can be passed to ExternalTaskSensor, but not both.
您可以这样定义传感器:
wait_for_dag_a = ExternalTaskSensor(
task_id='wait_for_dag_a',
external_task_id="external_task_1",
external_dag_id='dag_a_id',
allowed_states=['success', 'failed'],
execution_date_fn=_get_execution_date_of_dag_a,
poke_interval=30
)
其中 _get_execution_date_of_dag_a
使用 get_last_dagrun
对数据库执行查询,允许您获取 DAG_A 的最后一个 execution_date
.
from airflow.utils.db import provide_session
from airflow.models.dag import get_last_dagrun
@provide_session
def _get_execution_date_of_dag_a(exec_date, session=None, **kwargs):
dag_a_last_run = get_last_dagrun(
'dag_a_id', session)
return dag_a_last_run.execution_date
希望这种方法能帮到你。您可以在 this answer.
中找到工作示例