Airflow external_task_sensor 永不停歇

Airflow external_task_sensor never stops poking

我需要等待另一个 dag 中的另一个任务,直到我可以触发我自己的任务。但是我的外部传感器并没有停止戳动。我已经在这里阅读了一些其他相关问题,并确保我已经调整了 execution_delta。但是,我仍然有同样的问题。

这是我的两只狗

Parent 达格:

import datetime
import pendulum
from airflow import models
from airflow.operators.python_operator import PythonOperator

local_tz = pendulum.timezone("Europe/Berlin")
args = {
    "start_date": datetime.datetime(2022, 1, 25, tzinfo=local_tz),
    "provide_context": True,
}

def start_job(process_name, **kwargs):
    print('do something: ' + process_name)
    return True


with models.DAG(
        dag_id="test_parent",
        default_args=args,
        # catchup=False,
) as dag:

    task_parent_1 = PythonOperator(
        task_id="task_parent_1",
        python_callable=start_job,
        op_kwargs={"process_name": "my parent task 1"},
        provide_context=True,
    )

    task_parent_2 = PythonOperator(
        task_id="task_parent_2",
        python_callable=start_job,
        op_kwargs={"process_name": "my parent task 2"},
        provide_context=True,
    )

    task_parent_1 >> task_parent_2

还有我的 child 狗:

import datetime
import pendulum
from airflow import models
from airflow.operators.python_operator import PythonOperator
from airflow.operators.sensors import ExternalTaskSensor

local_tz = pendulum.timezone("Europe/Berlin")
args = {
    "start_date": datetime.datetime(2022, 1, 25, tzinfo=local_tz),
    "provide_context": True,
}


def start_job(process_name, **kwargs):
    print('do something: ' + process_name)
    return True


with models.DAG(
        dag_id="test_child",
        default_args=args,
        # catchup=False,
) as dag:
    wait_for_parent_task = ExternalTaskSensor(
        task_id='wait_for_parent_task',
        external_dag_id='test_parent',
        external_task_id='task_parent_2',
        execution_delta=datetime.timedelta(hours=24),
        # execution_date_fn=lambda dt: dt - datetime.timedelta(hours=24),
    )

    task_child_1 = PythonOperator(
        task_id="task_child_1",
        python_callable=start_job,
        op_kwargs={"process_name": "my child task 1"},
        provide_context=True,
    )

    task_child_2 = PythonOperator(
        task_id="task_child_2",
        python_callable=start_job,
        op_kwargs={"process_name": "my child task 2"},
        provide_context=True,
    )

    task_child_1 >> wait_for_parent_task >> task_child_2

Code-wise 看起来正确,但 start_date 设置为今天。设置 execution_delta 后,ExternalTaskSensor 将检查执行日期为 execution_date - execution_delta 的任务。 IE。第一个 DAG 运行 将于 26 日 00:00 开始,ExternalTaskSensor 将检查 execution_date 25 日 00:00 - 24 小时 = 24 日的任务00:00。由于那是在您的 DAG 开始日期之前,因此不会有针对该 execution_date 的任务。

在日志中您应该看到 DAG/task/date 它正在检查:Poking for tasks %s in dag %s on %s ...。您可以将 DAG 的开始日期设置为几天前,或者让它 运行 几天来调试问题。

或者,我还找到了一种对我有用的方法,方法是将执行日期设置为父 dag 的预定日期。优点:您也可以手动触发 dag

假设父 dag 安排在时区“Europe/Berlin”的早上 6 点。

    wait_for_parent_task = ExternalTaskSensor(
        task_id='wait_for_parent_task ',
        external_dag_id='test_parent',
        external_task_id='task_parent_2',
        check_existence=True,
        # execution_date needs to be exact (scheduled time) and the london timezone
        # Remember: The scheduled start is always the one step further in the past - 
        # For a daily schedule: - datetime.timedelta(days=1)
        execution_date_fn=lambda dt: (datetime.datetime(year=dt.year, month=dt.month, day=dt.day, tzinfo=local_tz)
                                      + datetime.timedelta(hours=6, minutes=0)
                                      - datetime.timedelta(days=1)
                                      ).astimezone(local_tz_london),
    )