Airflow ExternalTask​​Sensor 卡住了

Airflow ExternalTaskSensor gets stuck

我正在尝试使用 ExternalTask​​Sensor,但它卡在了另一个 DAG 的任务上,该任务已经成功完成。

在这里,第一个 DAG "a" 完成了它的任务,然后应该触发通过 ExternalTask​​Sensor 的第二个 DAG "b"。相反,它会卡在 a.first_task 上。

第一个 DAG:

import datetime
from airflow import DAG
from airflow.operators.python_operator import PythonOperator

dag = DAG(
    dag_id='a',
    default_args={'owner': 'airflow', 'start_date': datetime.datetime.now()},
    schedule_interval=None
)

def do_first_task():
    print('First task is done')

PythonOperator(
    task_id='first_task',
    python_callable=do_first_task,
    dag=dag)

第二个 DAG:

import datetime
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.operators.sensors import ExternalTaskSensor

dag = DAG(
    dag_id='b',
    default_args={'owner': 'airflow', 'start_date': datetime.datetime.now()},
    schedule_interval=None
)

def do_second_task():
    print('Second task is done')

ExternalTaskSensor(
    task_id='wait_for_the_first_task_to_be_completed',
    external_dag_id='a',
    external_task_id='first_task',
    dag=dag) >> \
PythonOperator(
    task_id='second_task',
    python_callable=do_second_task,
    dag=dag)

我在这里错过了什么?

ExternalTaskSensor 假定您依赖于具有相同执行日期的 dag 运行 中的任务。

这意味着在您的情况下,ab 需要按相同的时间表 运行(例如,每天 9:00am 或 w/e ).

否则在实例化ExternalTaskSensor时需要使用execution_deltaexecution_date_fn

这是运算符本身内部的文档,以帮助进一步阐明:

:param execution_delta: time difference with the previous execution to
    look at, the default is the same execution_date as the current task.
    For yesterday, use [positive!] datetime.timedelta(days=1). Either
    execution_delta or execution_date_fn can be passed to
    ExternalTaskSensor, but not both.

:type execution_delta: datetime.timedelta


:param execution_date_fn: function that receives the current execution date
    and returns the desired execution date to query. Either execution_delta
    or execution_date_fn can be passed to ExternalTaskSensor, but not both.

:type execution_date_fn: callable

为了澄清我在这里和其他相关问题上看到的内容,dags 不一定要 运行 在相同的时间表上,如已接受的答案中所述。 dags 也不需要具有相同的 start_date。如果您在没有 execution_deltaexecution_date_fn 的情况下创建 ExternalTaskSensor 任务,则这两个 dag 需要具有相同的 执行日期 。碰巧的是,如果两个 dag 具有相同的调度,则每个间隔中调度的 运行s 将具有相同的执行日期。我不确定手动触发的 运行 计划 dag 的执行日期是什么。

要使此示例正常工作,dag bExternalTaskSensor 任务需要一个 execution_deltaexecution_date_fn 参数。如果使用 execution_delta 参数,它应该是 b 的执行日期 - execution_delta = a 的执行日期。如果使用 execution_date_fn,那么该函数应该 return a 的执行日期。

如果您使用 TriggerDagRunOperator,然后使用 ExternalTaskSensor 来检测该 dag 何时完成,您可以执行一些操作,例如将主 dag 的执行日期传递给触发日期 TriggerDagRunOperatorexecution_date 参数,如 execution_date='{{ execution_date }}'。然后两个 dag 的执行日期将相同,并且您不需要每个 dag 的计划都相同,或者使用 execution_deltaexecution_date_fn 传感器参数。

以上内容是在 Airflow 1.10.9 上编写和测试的

从 Airflow v1.10.7 开始,tomcm 的回答是不正确的(至少对于这个版本)。如果外部 DAG 的时间表不同,则应使用 execution_deltaexecution_date_fn 来确定日期和时间表。

来自我的成功案例:

default_args = {
    'owner': 'xx',
    'retries': 2,
    'email': ALERT_EMAIL_ADDRESSES,
    'email_on_failure': True,
    'email_on_retry': False,
    'retry_delay': timedelta(seconds=30),
    # avoid stopping tasks after one day
    'depends_on_past': False,
}

dag = DAG(
    dag_id = dag_id,
    # get the datetime type value
    start_date = pendulum.strptime(current_date, "%Y, %m, %d, %H").astimezone('Europe/London').subtract(hours=1),
    description = 'xxx',
    default_args = default_args,
    schedule_interval = timedelta(hours=1),
    )
...
    external_sensor= ExternalTaskSensor(
            task_id='ext_sensor_task_update_model',
            external_dag_id='xxx',
            external_task_id='xxx'.format(log_type),
            # set the task_id to None because of the end_task
            # external_task_id = None,
            dag=dag,
            timeout = 300,
            )
...

您可以等到任务成功自动触发。不要手动操作,start_date 会有所不同。

Airflow 默认查找相同的执行日期、时间戳。如果我们使用 execution_date_fn 参数,我们必须 return 要查找的时间戳值列表。在内部,传感器将查询气流的 task_instance table 以检查作为参数提供的 dagid、taskid、状态和执行日期时间戳的 dag 运行。因此,如果我们使用 None 计划,则必须手动触发 dag,在这种情况下,日期时间戳可能是任何可能的值。 我在这里详细解释了它: https://link.medium.com/QzXm21asokb

我创建了一个继承了 ExternalTask​​Sensor 的新传感器,它可用于监视具有 None 计划的 dag。您可以在下面的仓库中找到代码。 https://github.com/Deepaksai1919/AirflowTaskSensor

我也 运行 参与其中,但在我的情况下,两个 DAG 使用相同的 schedule_interval,因此上述 none 的建议有所帮助。

原来这是一个 Airflow 错误。 external_task_id/external_task_ids 字段中的模板目前在 v2.2.4 中已损坏:https://github.com/apache/airflow/issues/22782